Including the venv

This commit is contained in:
2020-07-18 20:14:39 +02:00
parent 822398bc37
commit 7dbbde5028
849 changed files with 146952 additions and 0 deletions

View File

@@ -0,0 +1,268 @@
"""
This code was taken from https://github.com/ActiveState/appdirs and modified
to suit our purposes.
"""
from __future__ import absolute_import
import os
import sys
from pip._vendor.six import PY2, text_type
from pip._internal.utils.compat import WINDOWS, expanduser
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List
def user_cache_dir(appname):
# type: (str) -> str
r"""
Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
Typical user cache directories are:
macOS: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Windows: C:\Users\<username>\AppData\Local\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go
in the `CSIDL_LOCAL_APPDATA` directory. This is identical to the
non-roaming app data dir (the default returned by `user_data_dir`). Apps
typically put cache data somewhere *under* the given dir here. Some
examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
"""
if WINDOWS:
# Get the base path
path = os.path.normpath(_get_win_folder("CSIDL_LOCAL_APPDATA"))
# When using Python 2, return paths as bytes on Windows like we do on
# other operating systems. See helper function docs for more details.
if PY2 and isinstance(path, text_type):
path = _win_path_to_bytes(path)
# Add our app name and Cache directory to it
path = os.path.join(path, appname, "Cache")
elif sys.platform == "darwin":
# Get the base path
path = expanduser("~/Library/Caches")
# Add our app name to it
path = os.path.join(path, appname)
else:
# Get the base path
path = os.getenv("XDG_CACHE_HOME", expanduser("~/.cache"))
# Add our app name to it
path = os.path.join(path, appname)
return path
def user_data_dir(appname, roaming=False):
# type: (str, bool) -> str
r"""
Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
macOS: ~/Library/Application Support/<AppName>
if it exists, else ~/.config/<AppName>
Unix: ~/.local/share/<AppName> # or in
$XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\ ...
...Application Data\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local ...
...Settings\Application Data\<AppName>
Win 7 (not roaming): C:\\Users\<username>\AppData\Local\<AppName>
Win 7 (roaming): C:\\Users\<username>\AppData\Roaming\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if WINDOWS:
const = roaming and "CSIDL_APPDATA" or "CSIDL_LOCAL_APPDATA"
path = os.path.join(os.path.normpath(_get_win_folder(const)), appname)
elif sys.platform == "darwin":
path = os.path.join(
expanduser('~/Library/Application Support/'),
appname,
) if os.path.isdir(os.path.join(
expanduser('~/Library/Application Support/'),
appname,
)
) else os.path.join(
expanduser('~/.config/'),
appname,
)
else:
path = os.path.join(
os.getenv('XDG_DATA_HOME', expanduser("~/.local/share")),
appname,
)
return path
def user_config_dir(appname, roaming=True):
# type: (str, bool) -> str
"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"roaming" (boolean, default True) can be set False to not use the
Windows roaming appdata directory. That means that for users on a
Windows network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
macOS: same as user_data_dir
Unix: ~/.config/<AppName>
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by default "~/.config/<AppName>".
"""
if WINDOWS:
path = user_data_dir(appname, roaming=roaming)
elif sys.platform == "darwin":
path = user_data_dir(appname)
else:
path = os.getenv('XDG_CONFIG_HOME', expanduser("~/.config"))
path = os.path.join(path, appname)
return path
# for the discussion regarding site_config_dirs locations
# see <https://github.com/pypa/pip/issues/1733>
def site_config_dirs(appname):
# type: (str) -> List[str]
r"""Return a list of potential user-shared config dirs for this application.
"appname" is the name of application.
Typical user config directories are:
macOS: /Library/Application Support/<AppName>/
Unix: /etc or $XDG_CONFIG_DIRS[i]/<AppName>/ for each value in
$XDG_CONFIG_DIRS
Win XP: C:\Documents and Settings\All Users\Application ...
...Data\<AppName>\
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory
on Vista.)
Win 7: Hidden, but writeable on Win 7:
C:\ProgramData\<AppName>\
"""
if WINDOWS:
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
pathlist = [os.path.join(path, appname)]
elif sys.platform == 'darwin':
pathlist = [os.path.join('/Library/Application Support', appname)]
else:
# try looking in $XDG_CONFIG_DIRS
xdg_config_dirs = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
if xdg_config_dirs:
pathlist = [
os.path.join(expanduser(x), appname)
for x in xdg_config_dirs.split(os.pathsep)
]
else:
pathlist = []
# always look in /etc directly as well
pathlist.append('/etc')
return pathlist
# -- Windows support functions --
def _get_win_folder_from_registry(csidl_name):
# type: (str) -> str
"""
This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
import _winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = _winreg.OpenKey(
_winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"
)
directory, _type = _winreg.QueryValueEx(key, shell_folder_name)
return directory
def _get_win_folder_with_ctypes(csidl_name):
# type: (str) -> str
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in buf:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return buf.value
if WINDOWS:
try:
import ctypes
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
_get_win_folder = _get_win_folder_from_registry
def _win_path_to_bytes(path):
"""Encode Windows paths to bytes. Only used on Python 2.
Motivation is to be consistent with other operating systems where paths
are also returned as bytes. This avoids problems mixing bytes and Unicode
elsewhere in the codebase. For more details and discussion see
<https://github.com/pypa/pip/issues/3463>.
If encoding using ASCII and MBCS fails, return the original Unicode path.
"""
for encoding in ('ASCII', 'MBCS'):
try:
return path.encode(encoding)
except (UnicodeEncodeError, LookupError):
pass
return path

View File

@@ -0,0 +1,293 @@
"""Stuff that differs in different Python versions and platform
distributions."""
from __future__ import absolute_import, division
import codecs
import locale
import logging
import os
import shutil
import sys
from pip._vendor.six import text_type
from pip._vendor.urllib3.util import IS_PYOPENSSL
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Text, Tuple, Union
try:
import _ssl # noqa
except ImportError:
ssl = None
else:
# This additional assignment was needed to prevent a mypy error.
ssl = _ssl
try:
import ipaddress
except ImportError:
try:
from pip._vendor import ipaddress # type: ignore
except ImportError:
import ipaddr as ipaddress # type: ignore
ipaddress.ip_address = ipaddress.IPAddress # type: ignore
ipaddress.ip_network = ipaddress.IPNetwork # type: ignore
__all__ = [
"ipaddress", "uses_pycache", "console_to_str", "native_str",
"get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size",
"get_extension_suffixes",
]
logger = logging.getLogger(__name__)
HAS_TLS = (ssl is not None) or IS_PYOPENSSL
if sys.version_info >= (3, 4):
uses_pycache = True
from importlib.util import cache_from_source
else:
import imp
try:
cache_from_source = imp.cache_from_source # type: ignore
except AttributeError:
# does not use __pycache__
cache_from_source = None
uses_pycache = cache_from_source is not None
if sys.version_info >= (3, 5):
backslashreplace_decode = "backslashreplace"
else:
# In version 3.4 and older, backslashreplace exists
# but does not support use for decoding.
# We implement our own replace handler for this
# situation, so that we can consistently use
# backslash replacement for all versions.
def backslashreplace_decode_fn(err):
raw_bytes = (err.object[i] for i in range(err.start, err.end))
if sys.version_info[0] == 2:
# Python 2 gave us characters - convert to numeric bytes
raw_bytes = (ord(b) for b in raw_bytes)
return u"".join(u"\\x%x" % c for c in raw_bytes), err.end
codecs.register_error(
"backslashreplace_decode",
backslashreplace_decode_fn,
)
backslashreplace_decode = "backslashreplace_decode"
def str_to_display(data, desc=None):
# type: (Union[bytes, Text], Optional[str]) -> Text
"""
For display or logging purposes, convert a bytes object (or text) to
text (e.g. unicode in Python 2) safe for output.
:param desc: An optional phrase describing the input data, for use in
the log message if a warning is logged. Defaults to "Bytes object".
This function should never error out and so can take a best effort
approach. It is okay to be lossy if needed since the return value is
just for display.
We assume the data is in the locale preferred encoding. If it won't
decode properly, we warn the user but decode as best we can.
We also ensure that the output can be safely written to standard output
without encoding errors.
"""
if isinstance(data, text_type):
return data
# Otherwise, data is a bytes object (str in Python 2).
# First, get the encoding we assume. This is the preferred
# encoding for the locale, unless that is not found, or
# it is ASCII, in which case assume UTF-8
encoding = locale.getpreferredencoding()
if (not encoding) or codecs.lookup(encoding).name == "ascii":
encoding = "utf-8"
# Now try to decode the data - if we fail, warn the user and
# decode with replacement.
try:
decoded_data = data.decode(encoding)
except UnicodeDecodeError:
if desc is None:
desc = 'Bytes object'
msg_format = '{} does not appear to be encoded as %s'.format(desc)
logger.warning(msg_format, encoding)
decoded_data = data.decode(encoding, errors=backslashreplace_decode)
# Make sure we can print the output, by encoding it to the output
# encoding with replacement of unencodable characters, and then
# decoding again.
# We use stderr's encoding because it's less likely to be
# redirected and if we don't find an encoding we skip this
# step (on the assumption that output is wrapped by something
# that won't fail).
# The double getattr is to deal with the possibility that we're
# being called in a situation where sys.__stderr__ doesn't exist,
# or doesn't have an encoding attribute. Neither of these cases
# should occur in normal pip use, but there's no harm in checking
# in case people use pip in (unsupported) unusual situations.
output_encoding = getattr(getattr(sys, "__stderr__", None),
"encoding", None)
if output_encoding:
output_encoded = decoded_data.encode(
output_encoding,
errors="backslashreplace"
)
decoded_data = output_encoded.decode(output_encoding)
return decoded_data
def console_to_str(data):
# type: (bytes) -> Text
"""Return a string, safe for output, of subprocess output.
"""
return str_to_display(data, desc='Subprocess output')
if sys.version_info >= (3,):
def native_str(s, replace=False):
# type: (str, bool) -> str
if isinstance(s, bytes):
return s.decode('utf-8', 'replace' if replace else 'strict')
return s
else:
def native_str(s, replace=False):
# type: (str, bool) -> str
# Replace is ignored -- unicode to UTF-8 can't fail
if isinstance(s, text_type):
return s.encode('utf-8')
return s
def get_path_uid(path):
# type: (str) -> int
"""
Return path's uid.
Does not follow symlinks:
https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in compat due to differences on AIX and
Jython, that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_NOFOLLOW'):
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: # AIX and Jython
# WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError(
"%s is a symlink; Will not return uid for symlinks" % path
)
return file_uid
if sys.version_info >= (3, 4):
from importlib.machinery import EXTENSION_SUFFIXES
def get_extension_suffixes():
return EXTENSION_SUFFIXES
else:
from imp import get_suffixes
def get_extension_suffixes():
return [suffix[0] for suffix in get_suffixes()]
def expanduser(path):
# type: (str) -> str
"""
Expand ~ and ~user constructions.
Includes a workaround for https://bugs.python.org/issue14768
"""
expanded = os.path.expanduser(path)
if path.startswith('~/') and expanded.startswith('//'):
expanded = expanded[1:]
return expanded
# packages in the stdlib that may have installation metadata, but should not be
# considered 'installed'. this theoretically could be determined based on
# dist.location (py27:`sysconfig.get_paths()['stdlib']`,
# py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may
# make this ineffective, so hard-coding
stdlib_pkgs = {"python", "wsgiref", "argparse"}
# windows detection, covers cpython and ironpython
WINDOWS = (sys.platform.startswith("win") or
(sys.platform == 'cli' and os.name == 'nt'))
def samefile(file1, file2):
# type: (str, str) -> bool
"""Provide an alternative for os.path.samefile on Windows/Python2"""
if hasattr(os.path, 'samefile'):
return os.path.samefile(file1, file2)
else:
path1 = os.path.normcase(os.path.abspath(file1))
path2 = os.path.normcase(os.path.abspath(file2))
return path1 == path2
if hasattr(shutil, 'get_terminal_size'):
def get_terminal_size():
# type: () -> Tuple[int, int]
"""
Returns a tuple (x, y) representing the width(x) and the height(y)
in characters of the terminal window.
"""
return tuple(shutil.get_terminal_size()) # type: ignore
else:
def get_terminal_size():
# type: () -> Tuple[int, int]
"""
Returns a tuple (x, y) representing the width(x) and the height(y)
in characters of the terminal window.
"""
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack_from(
'hh',
fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678')
)
except Exception:
return None
if cr == (0, 0):
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except Exception:
pass
if not cr:
cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
return int(cr[1]), int(cr[0])

View File

@@ -0,0 +1,100 @@
"""
A module that implements tooling to enable easy warnings about deprecations.
"""
from __future__ import absolute_import
import logging
import warnings
from pip._vendor.packaging.version import parse
from pip import __version__ as current_version
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Optional
DEPRECATION_MSG_PREFIX = "DEPRECATION: "
class PipDeprecationWarning(Warning):
pass
_original_showwarning = None # type: Any
# Warnings <-> Logging Integration
def _showwarning(message, category, filename, lineno, file=None, line=None):
if file is not None:
if _original_showwarning is not None:
_original_showwarning(
message, category, filename, lineno, file, line,
)
elif issubclass(category, PipDeprecationWarning):
# We use a specially named logger which will handle all of the
# deprecation messages for pip.
logger = logging.getLogger("pip._internal.deprecations")
logger.warning(message)
else:
_original_showwarning(
message, category, filename, lineno, file, line,
)
def install_warning_logger():
# type: () -> None
# Enable our Deprecation Warnings
warnings.simplefilter("default", PipDeprecationWarning, append=True)
global _original_showwarning
if _original_showwarning is None:
_original_showwarning = warnings.showwarning
warnings.showwarning = _showwarning
def deprecated(reason, replacement, gone_in, issue=None):
# type: (str, Optional[str], Optional[str], Optional[int]) -> None
"""Helper to deprecate existing functionality.
reason:
Textual reason shown to the user about why this functionality has
been deprecated.
replacement:
Textual suggestion shown to the user about what alternative
functionality they can use.
gone_in:
The version of pip does this functionality should get removed in.
Raises errors if pip's current version is greater than or equal to
this.
issue:
Issue number on the tracker that would serve as a useful place for
users to find related discussion and provide feedback.
Always pass replacement, gone_in and issue as keyword arguments for clarity
at the call site.
"""
# Construct a nice message.
# This is eagerly formatted as we want it to get logged as if someone
# typed this entire message out.
sentences = [
(reason, DEPRECATION_MSG_PREFIX + "{}"),
(gone_in, "pip {} will remove support for this functionality."),
(replacement, "A possible replacement is {}."),
(issue, (
"You can find discussion regarding this at "
"https://github.com/pypa/pip/issues/{}."
)),
]
message = " ".join(
template.format(val) for val, template in sentences if val is not None
)
# Raise as an error if it has to be removed.
if gone_in is not None and parse(current_version) >= parse(gone_in):
raise PipDeprecationWarning(message)
warnings.warn(message, category=PipDeprecationWarning, stacklevel=2)

View File

@@ -0,0 +1,39 @@
import codecs
import locale
import re
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List, Tuple, Text
BOMS = [
(codecs.BOM_UTF8, 'utf-8'),
(codecs.BOM_UTF16, 'utf-16'),
(codecs.BOM_UTF16_BE, 'utf-16-be'),
(codecs.BOM_UTF16_LE, 'utf-16-le'),
(codecs.BOM_UTF32, 'utf-32'),
(codecs.BOM_UTF32_BE, 'utf-32-be'),
(codecs.BOM_UTF32_LE, 'utf-32-le'),
] # type: List[Tuple[bytes, Text]]
ENCODING_RE = re.compile(br'coding[:=]\s*([-\w.]+)')
def auto_decode(data):
# type: (bytes) -> Text
"""Check a bytes string for a BOM to correctly detect the encoding
Fallback to locale.getpreferredencoding(False) like open() on Python3"""
for bom, encoding in BOMS:
if data.startswith(bom):
return data[len(bom):].decode(encoding)
# Lets check the first two lines as in PEP263
for line in data.split(b'\n')[:2]:
if line[0:1] == b'#' and ENCODING_RE.search(line):
encoding = ENCODING_RE.search(line).groups()[0].decode('ascii')
return data.decode(encoding)
return data.decode(
locale.getpreferredencoding(False) or sys.getdefaultencoding(),
)

View File

@@ -0,0 +1,30 @@
import os
import os.path
from pip._internal.utils.compat import get_path_uid
def check_path_owner(path):
# type: (str) -> bool
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
return False # assume we don't own the path

View File

@@ -0,0 +1,120 @@
from __future__ import absolute_import
import os
import re
import warnings
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
def glibc_version_string():
# type: () -> Optional[str]
"Returns glibc version string, or None if not using glibc."
return glibc_version_string_confstr() or glibc_version_string_ctypes()
def glibc_version_string_confstr():
# type: () -> Optional[str]
"Primary implementation of glibc_version_string using os.confstr."
# os.confstr is quite a bit faster than ctypes.DLL. It's also less likely
# to be broken or missing. This strategy is used in the standard library
# platform module:
# https://github.com/python/cpython/blob/fcf1d003bf4f0100c9d0921ff3d70e1127ca1b71/Lib/platform.py#L175-L183
try:
# os.confstr("CS_GNU_LIBC_VERSION") returns a string like "glibc 2.17":
_, version = os.confstr("CS_GNU_LIBC_VERSION").split()
except (AttributeError, OSError, ValueError):
# os.confstr() or CS_GNU_LIBC_VERSION not available (or a bad value)...
return None
return version
def glibc_version_string_ctypes():
# type: () -> Optional[str]
"Fallback implementation of glibc_version_string using ctypes."
try:
import ctypes
except ImportError:
return None
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return None
# Call gnu_get_libc_version, which returns a string like "2.5"
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return version_str
# Separated out from have_compatible_glibc for easier unit testing
def check_glibc_version(version_str, required_major, minimum_minor):
# type: (str, int, int) -> bool
# Parse string and check against requested version.
#
# We use a regexp instead of str.split because we want to discard any
# random junk that might come after the minor version -- this might happen
# in patched/forked versions of glibc (e.g. Linaro's version of glibc
# uses version strings like "2.20-2014.11"). See gh-3588.
m = re.match(r"(?P<major>[0-9]+)\.(?P<minor>[0-9]+)", version_str)
if not m:
warnings.warn("Expected glibc version with 2 components major.minor,"
" got: %s" % version_str, RuntimeWarning)
return False
return (int(m.group("major")) == required_major and
int(m.group("minor")) >= minimum_minor)
def have_compatible_glibc(required_major, minimum_minor):
# type: (int, int) -> bool
version_str = glibc_version_string()
if version_str is None:
return False
return check_glibc_version(version_str, required_major, minimum_minor)
# platform.libc_ver regularly returns completely nonsensical glibc
# versions. E.g. on my computer, platform says:
#
# ~$ python2.7 -c 'import platform; print(platform.libc_ver())'
# ('glibc', '2.7')
# ~$ python3.5 -c 'import platform; print(platform.libc_ver())'
# ('glibc', '2.9')
#
# But the truth is:
#
# ~$ ldd --version
# ldd (Debian GLIBC 2.22-11) 2.22
#
# This is unfortunate, because it means that the linehaul data on libc
# versions that was generated by pip 8.1.2 and earlier is useless and
# misleading. Solution: instead of using platform, use our code that actually
# works.
def libc_ver():
# type: () -> Tuple[str, str]
"""Try to determine the glibc version
Returns a tuple of strings (lib, version) which default to empty strings
in case the lookup fails.
"""
glibc_version = glibc_version_string()
if glibc_version is None:
return ("", "")
else:
return ("glibc", glibc_version)

View File

@@ -0,0 +1,128 @@
from __future__ import absolute_import
import hashlib
from pip._vendor.six import iteritems, iterkeys, itervalues
from pip._internal.exceptions import (
HashMismatch, HashMissing, InstallationError,
)
from pip._internal.utils.misc import read_chunks
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import (
Dict, List, BinaryIO, NoReturn, Iterator
)
from pip._vendor.six import PY3
if PY3:
from hashlib import _Hash
else:
from hashlib import _hash as _Hash
# The recommended hash algo of the moment. Change this whenever the state of
# the art changes; it won't hurt backward compatibility.
FAVORITE_HASH = 'sha256'
# Names of hashlib algorithms allowed by the --hash option and ``pip hash``
# Currently, those are the ones at least as collision-resistant as sha256.
STRONG_HASHES = ['sha256', 'sha384', 'sha512']
class Hashes(object):
"""A wrapper that builds multiple hashes at once and checks them against
known-good values
"""
def __init__(self, hashes=None):
# type: (Dict[str, List[str]]) -> None
"""
:param hashes: A dict of algorithm names pointing to lists of allowed
hex digests
"""
self._allowed = {} if hashes is None else hashes
@property
def digest_count(self):
# type: () -> int
return sum(len(digests) for digests in self._allowed.values())
def is_hash_allowed(
self,
hash_name, # type: str
hex_digest, # type: str
):
"""Return whether the given hex digest is allowed."""
return hex_digest in self._allowed.get(hash_name, [])
def check_against_chunks(self, chunks):
# type: (Iterator[bytes]) -> None
"""Check good hashes against ones built from iterable of chunks of
data.
Raise HashMismatch if none match.
"""
gots = {}
for hash_name in iterkeys(self._allowed):
try:
gots[hash_name] = hashlib.new(hash_name)
except (ValueError, TypeError):
raise InstallationError('Unknown hash name: %s' % hash_name)
for chunk in chunks:
for hash in itervalues(gots):
hash.update(chunk)
for hash_name, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
raise HashMismatch(self._allowed, gots)
def check_against_file(self, file):
# type: (BinaryIO) -> None
"""Check good hashes against a file-like object
Raise HashMismatch if none match.
"""
return self.check_against_chunks(read_chunks(file))
def check_against_path(self, path):
# type: (str) -> None
with open(path, 'rb') as file:
return self.check_against_file(file)
def __nonzero__(self):
# type: () -> bool
"""Return whether I know any known-good hashes."""
return bool(self._allowed)
def __bool__(self):
# type: () -> bool
return self.__nonzero__()
class MissingHashes(Hashes):
"""A workalike for Hashes used when we're missing a hash for a requirement
It computes the actual hash of the requirement and raises a HashMissing
exception showing it to the user.
"""
def __init__(self):
# type: () -> None
"""Don't offer the ``hashes`` kwarg."""
# Pass our favorite hash in to generate a "gotten hash". With the
# empty list, it will never match, so an error will always raise.
super(MissingHashes, self).__init__(hashes={FAVORITE_HASH: []})
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
raise HashMissing(gots[FAVORITE_HASH].hexdigest())

View File

@@ -0,0 +1,394 @@
from __future__ import absolute_import
import contextlib
import errno
import logging
import logging.handlers
import os
import sys
from logging import Filter
from pip._vendor.six import PY2
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
from pip._internal.utils.misc import ensure_dir, subprocess_logger
try:
import threading
except ImportError:
import dummy_threading as threading # type: ignore
try:
# Use "import as" and set colorama in the else clause to avoid mypy
# errors and get the following correct revealed type for colorama:
# `Union[_importlib_modulespec.ModuleType, None]`
# Otherwise, we get an error like the following in the except block:
# > Incompatible types in assignment (expression has type "None",
# variable has type Module)
# TODO: eliminate the need to use "import as" once mypy addresses some
# of its issues with conditional imports. Here is an umbrella issue:
# https://github.com/python/mypy/issues/1297
from pip._vendor import colorama as _colorama
# Lots of different errors can come from this, including SystemError and
# ImportError.
except Exception:
colorama = None
else:
# Import Fore explicitly rather than accessing below as colorama.Fore
# to avoid the following error running mypy:
# > Module has no attribute "Fore"
# TODO: eliminate the need to import Fore once mypy addresses some of its
# issues with conditional imports. This particular case could be an
# instance of the following issue (but also see the umbrella issue above):
# https://github.com/python/mypy/issues/3500
from pip._vendor.colorama import Fore
colorama = _colorama
_log_state = threading.local()
_log_state.indentation = 0
class BrokenStdoutLoggingError(Exception):
"""
Raised if BrokenPipeError occurs for the stdout stream while logging.
"""
pass
# BrokenPipeError does not exist in Python 2 and, in addition, manifests
# differently in Windows and non-Windows.
if WINDOWS:
# In Windows, a broken pipe can show up as EINVAL rather than EPIPE:
# https://bugs.python.org/issue19612
# https://bugs.python.org/issue30418
if PY2:
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return (exc_class is IOError and
exc.errno in (errno.EINVAL, errno.EPIPE))
else:
# In Windows, a broken pipe IOError became OSError in Python 3.
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return ((exc_class is BrokenPipeError) or # noqa: F821
(exc_class is OSError and
exc.errno in (errno.EINVAL, errno.EPIPE)))
elif PY2:
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows Python 3 below."""
return (exc_class is IOError and exc.errno == errno.EPIPE)
else:
# Then we are in the non-Windows Python 3 case.
def _is_broken_pipe_error(exc_class, exc):
"""
Return whether an exception is a broken pipe error.
Args:
exc_class: an exception class.
exc: an exception instance.
"""
return (exc_class is BrokenPipeError) # noqa: F821
@contextlib.contextmanager
def indent_log(num=2):
"""
A context manager which will cause the log output to be indented for any
log messages emitted inside it.
"""
_log_state.indentation += num
try:
yield
finally:
_log_state.indentation -= num
def get_indentation():
return getattr(_log_state, 'indentation', 0)
class IndentingFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
"""
A logging.Formatter that obeys the indent_log() context manager.
:param add_timestamp: A bool indicating output lines should be prefixed
with their record's timestamp.
"""
self.add_timestamp = kwargs.pop("add_timestamp", False)
super(IndentingFormatter, self).__init__(*args, **kwargs)
def get_message_start(self, formatted, levelno):
"""
Return the start of the formatted log message (not counting the
prefix to add to each line).
"""
if levelno < logging.WARNING:
return ''
if formatted.startswith(DEPRECATION_MSG_PREFIX):
# Then the message already has a prefix. We don't want it to
# look like "WARNING: DEPRECATION: ...."
return ''
if levelno < logging.ERROR:
return 'WARNING: '
return 'ERROR: '
def format(self, record):
"""
Calls the standard formatter, but will indent all of the log message
lines by our current indentation level.
"""
formatted = super(IndentingFormatter, self).format(record)
message_start = self.get_message_start(formatted, record.levelno)
formatted = message_start + formatted
prefix = ''
if self.add_timestamp:
# TODO: Use Formatter.default_time_format after dropping PY2.
t = self.formatTime(record, "%Y-%m-%dT%H:%M:%S")
prefix = '%s,%03d ' % (t, record.msecs)
prefix += " " * get_indentation()
formatted = "".join([
prefix + line
for line in formatted.splitlines(True)
])
return formatted
def _color_wrap(*colors):
def wrapped(inp):
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
return wrapped
class ColorizedStreamHandler(logging.StreamHandler):
# Don't build up a list of colors if we don't have colorama
if colorama:
COLORS = [
# This needs to be in order from highest logging level to lowest.
(logging.ERROR, _color_wrap(Fore.RED)),
(logging.WARNING, _color_wrap(Fore.YELLOW)),
]
else:
COLORS = []
def __init__(self, stream=None, no_color=None):
logging.StreamHandler.__init__(self, stream)
self._no_color = no_color
if WINDOWS and colorama:
self.stream = colorama.AnsiToWin32(self.stream)
def _using_stdout(self):
"""
Return whether the handler is using sys.stdout.
"""
if WINDOWS and colorama:
# Then self.stream is an AnsiToWin32 object.
return self.stream.wrapped is sys.stdout
return self.stream is sys.stdout
def should_color(self):
# Don't colorize things if we do not have colorama or if told not to
if not colorama or self._no_color:
return False
real_stream = (
self.stream if not isinstance(self.stream, colorama.AnsiToWin32)
else self.stream.wrapped
)
# If the stream is a tty we should color it
if hasattr(real_stream, "isatty") and real_stream.isatty():
return True
# If we have an ANSI term we should color it
if os.environ.get("TERM") == "ANSI":
return True
# If anything else we should not color it
return False
def format(self, record):
msg = logging.StreamHandler.format(self, record)
if self.should_color():
for level, color in self.COLORS:
if record.levelno >= level:
msg = color(msg)
break
return msg
# The logging module says handleError() can be customized.
def handleError(self, record):
exc_class, exc = sys.exc_info()[:2]
# If a broken pipe occurred while calling write() or flush() on the
# stdout stream in logging's Handler.emit(), then raise our special
# exception so we can handle it in main() instead of logging the
# broken pipe error and continuing.
if (exc_class and self._using_stdout() and
_is_broken_pipe_error(exc_class, exc)):
raise BrokenStdoutLoggingError()
return super(ColorizedStreamHandler, self).handleError(record)
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
def _open(self):
ensure_dir(os.path.dirname(self.baseFilename))
return logging.handlers.RotatingFileHandler._open(self)
class MaxLevelFilter(Filter):
def __init__(self, level):
self.level = level
def filter(self, record):
return record.levelno < self.level
class ExcludeLoggerFilter(Filter):
"""
A logging Filter that excludes records from a logger (or its children).
"""
def filter(self, record):
# The base Filter class allows only records from a logger (or its
# children).
return not super(ExcludeLoggerFilter, self).filter(record)
def setup_logging(verbosity, no_color, user_log_file):
"""Configures and sets up all of the logging
Returns the requested logging level, as its integer value.
"""
# Determine the level to be logging at.
if verbosity >= 1:
level = "DEBUG"
elif verbosity == -1:
level = "WARNING"
elif verbosity == -2:
level = "ERROR"
elif verbosity <= -3:
level = "CRITICAL"
else:
level = "INFO"
level_number = getattr(logging, level)
# The "root" logger should match the "console" level *unless* we also need
# to log to a user log file.
include_user_log = user_log_file is not None
if include_user_log:
additional_log_file = user_log_file
root_level = "DEBUG"
else:
additional_log_file = "/dev/null"
root_level = level
# Disable any logging besides WARNING unless we have DEBUG level logging
# enabled for vendored libraries.
vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
# Shorthands for clarity
log_streams = {
"stdout": "ext://sys.stdout",
"stderr": "ext://sys.stderr",
}
handler_classes = {
"stream": "pip._internal.utils.logging.ColorizedStreamHandler",
"file": "pip._internal.utils.logging.BetterRotatingFileHandler",
}
handlers = ["console", "console_errors", "console_subprocess"] + (
["user_log"] if include_user_log else []
)
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"filters": {
"exclude_warnings": {
"()": "pip._internal.utils.logging.MaxLevelFilter",
"level": logging.WARNING,
},
"restrict_to_subprocess": {
"()": "logging.Filter",
"name": subprocess_logger.name,
},
"exclude_subprocess": {
"()": "pip._internal.utils.logging.ExcludeLoggerFilter",
"name": subprocess_logger.name,
},
},
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": "%(message)s",
},
"indent_with_timestamp": {
"()": IndentingFormatter,
"format": "%(message)s",
"add_timestamp": True,
},
},
"handlers": {
"console": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stdout"],
"filters": ["exclude_subprocess", "exclude_warnings"],
"formatter": "indent",
},
"console_errors": {
"level": "WARNING",
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["exclude_subprocess"],
"formatter": "indent",
},
# A handler responsible for logging to the console messages
# from the "subprocessor" logger.
"console_subprocess": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["restrict_to_subprocess"],
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": handler_classes["file"],
"filename": additional_log_file,
"delay": True,
"formatter": "indent_with_timestamp",
},
},
"root": {
"level": root_level,
"handlers": handlers,
},
"loggers": {
"pip._vendor": {
"level": vendored_log_level
}
},
})
return level_number

View File

@@ -0,0 +1,20 @@
import os.path
DELETE_MARKER_MESSAGE = '''\
This file is placed here by pip to indicate the source was put
here by pip.
Once this package is successfully installed this source code will be
deleted (unless you remove this file).
'''
PIP_DELETE_MARKER_FILENAME = 'pip-delete-this-directory.txt'
def write_delete_marker_file(directory):
# type: (str) -> None
"""
Write the pip delete marker file into this directory.
"""
filepath = os.path.join(directory, PIP_DELETE_MARKER_FILENAME)
with open(filepath, 'w') as marker_fp:
marker_fp.write(DELETE_MARKER_MESSAGE)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,40 @@
"""Utilities for defining models
"""
import operator
class KeyBasedCompareMixin(object):
"""Provides comparison capabilities that is based on a key
"""
def __init__(self, key, defining_class):
self._compare_key = key
self._defining_class = defining_class
def __hash__(self):
return hash(self._compare_key)
def __lt__(self, other):
return self._compare(other, operator.__lt__)
def __le__(self, other):
return self._compare(other, operator.__le__)
def __gt__(self, other):
return self._compare(other, operator.__gt__)
def __ge__(self, other):
return self._compare(other, operator.__ge__)
def __eq__(self, other):
return self._compare(other, operator.__eq__)
def __ne__(self, other):
return self._compare(other, operator.__ne__)
def _compare(self, other, method):
if not isinstance(other, self._defining_class):
return NotImplemented
return method(self._compare_key, other._compare_key)

View File

@@ -0,0 +1,178 @@
from __future__ import absolute_import
import datetime
import json
import logging
import os.path
import sys
from pip._vendor import lockfile, pkg_resources
from pip._vendor.packaging import version as packaging_version
from pip._internal.cli.cmdoptions import make_search_scope
from pip._internal.index import PackageFinder
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.filesystem import check_path_owner
from pip._internal.utils.misc import ensure_dir, get_installed_version
from pip._internal.utils.packaging import get_installer
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
import optparse
from typing import Any, Dict
from pip._internal.download import PipSession
SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
logger = logging.getLogger(__name__)
class SelfCheckState(object):
def __init__(self, cache_dir):
# type: (str) -> None
self.state = {} # type: Dict[str, Any]
self.statefile_path = None
# Try to load the existing state
if cache_dir:
self.statefile_path = os.path.join(cache_dir, "selfcheck.json")
try:
with open(self.statefile_path) as statefile:
self.state = json.load(statefile)[sys.prefix]
except (IOError, ValueError, KeyError):
# Explicitly suppressing exceptions, since we don't want to
# error out if the cache file is invalid.
pass
def save(self, pypi_version, current_time):
# type: (str, datetime.datetime) -> None
# If we do not have a path to cache in, don't bother saving.
if not self.statefile_path:
return
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def was_installed_by_pip(pkg):
# type: (str) -> bool
"""Checks whether pkg was installed by pip
This is used not to display the upgrade message when pip is in fact
installed by system package manager, such as dnf on Fedora.
"""
try:
dist = pkg_resources.get_distribution(pkg)
return "pip" == get_installer(dist)
except pkg_resources.DistributionNotFound:
return False
def pip_version_check(session, options):
# type: (PipSession, optparse.Values) -> None
"""Check for an update for pip.
Limit the frequency of checks to once per week. State is stored either in
the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
of the pip script path.
"""
installed_version = get_installed_version("pip")
if not installed_version:
return
pip_version = packaging_version.parse(installed_version)
pypi_version = None
try:
state = SelfCheckState(cache_dir=options.cache_dir)
current_time = datetime.datetime.utcnow()
# Determine if we need to refresh the state
if "last_check" in state.state and "pypi_version" in state.state:
last_check = datetime.datetime.strptime(
state.state["last_check"],
SELFCHECK_DATE_FMT
)
if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
pypi_version = state.state["pypi_version"]
# Refresh the version if we need to or just see if we need to warn
if pypi_version is None:
# Lets use PackageFinder to see what the latest pip version is
search_scope = make_search_scope(options, suppress_no_index=True)
# Pass allow_yanked=False so we don't suggest upgrading to a
# yanked version.
selection_prefs = SelectionPreferences(
allow_yanked=False,
allow_all_prereleases=False, # Explicitly set to False
)
finder = PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=options.trusted_hosts,
session=session,
)
candidate = finder.find_candidates("pip").get_best()
if candidate is None:
return
pypi_version = str(candidate.version)
# save that we've performed a check
state.save(pypi_version, current_time)
remote_version = packaging_version.parse(pypi_version)
local_version_is_older = (
pip_version < remote_version and
pip_version.base_version != remote_version.base_version and
was_installed_by_pip('pip')
)
# Determine if our pypi_version is older
if not local_version_is_older:
return
# Advise "python -m pip" on Windows to avoid issues
# with overwriting pip.exe.
if WINDOWS:
pip_cmd = "python -m pip"
else:
pip_cmd = "pip"
logger.warning(
"You are using pip version %s, however version %s is "
"available.\nYou should consider upgrading via the "
"'%s install --upgrade pip' command.",
pip_version, pypi_version, pip_cmd
)
except Exception:
logger.debug(
"There was an error checking the latest version of pip",
exc_info=True,
)

View File

@@ -0,0 +1,94 @@
from __future__ import absolute_import
import logging
from email.parser import FeedParser
from pip._vendor import pkg_resources
from pip._vendor.packaging import specifiers, version
from pip._internal.exceptions import NoneMetadataError
from pip._internal.utils.misc import display_path
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
from email.message import Message
from pip._vendor.pkg_resources import Distribution
logger = logging.getLogger(__name__)
def check_requires_python(requires_python, version_info):
# type: (Optional[str], Tuple[int, ...]) -> bool
"""
Check if the given Python version matches a "Requires-Python" specifier.
:param version_info: A 3-tuple of ints representing a Python
major-minor-micro version to check (e.g. `sys.version_info[:3]`).
:return: `True` if the given Python version satisfies the requirement.
Otherwise, return `False`.
:raises InvalidSpecifier: If `requires_python` has an invalid format.
"""
if requires_python is None:
# The package provides no information
return True
requires_python_specifier = specifiers.SpecifierSet(requires_python)
python_version = version.parse('.'.join(map(str, version_info)))
return python_version in requires_python_specifier
def get_metadata(dist):
# type: (Distribution) -> Message
"""
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
metadata_name = 'METADATA'
if (isinstance(dist, pkg_resources.DistInfoDistribution) and
dist.has_metadata(metadata_name)):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata('PKG-INFO'):
metadata_name = 'PKG-INFO'
metadata = dist.get_metadata(metadata_name)
else:
logger.warning("No metadata found in %s", display_path(dist.location))
metadata = ''
if metadata is None:
raise NoneMetadataError(dist, metadata_name)
feed_parser = FeedParser()
# The following line errors out if with a "NoneType" TypeError if
# passed metadata=None.
feed_parser.feed(metadata)
return feed_parser.close()
def get_requires_python(dist):
# type: (pkg_resources.Distribution) -> Optional[str]
"""
Return the "Requires-Python" metadata for a distribution, or None
if not present.
"""
pkg_info_dict = get_metadata(dist)
requires_python = pkg_info_dict.get('Requires-Python')
if requires_python is not None:
# Convert to a str to satisfy the type checker, since requires_python
# can be a Header object.
requires_python = str(requires_python)
return requires_python
def get_installer(dist):
# type: (Distribution) -> str
if dist.has_metadata('INSTALLER'):
for line in dist.get_metadata_lines('INSTALLER'):
if line.strip():
return line.strip()
return ''

View File

@@ -0,0 +1,36 @@
import sys
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import List
# Shim to wrap setup.py invocation with setuptools
#
# We set sys.argv[0] to the path to the underlying setup.py file so
# setuptools / distutils don't take the path to the setup.py to be "-c" when
# invoking via the shim. This avoids e.g. the following manifest_maker
# warning: "warning: manifest_maker: standard file '-c' not found".
_SETUPTOOLS_SHIM = (
"import sys, setuptools, tokenize; sys.argv[0] = {0!r}; __file__={0!r};"
"f=getattr(tokenize, 'open', open)(__file__);"
"code=f.read().replace('\\r\\n', '\\n');"
"f.close();"
"exec(compile(code, __file__, 'exec'))"
)
def make_setuptools_shim_args(setup_py_path, unbuffered_output=False):
# type: (str, bool) -> List[str]
"""
Get setuptools command arguments with shim wrapped setup file invocation.
:param setup_py_path: The path to setup.py to be wrapped.
:param unbuffered_output: If True, adds the unbuffered switch to the
argument list.
"""
args = [sys.executable]
if unbuffered_output:
args.append('-u')
args.extend(['-c', _SETUPTOOLS_SHIM.format(setup_py_path)])
return args

View File

@@ -0,0 +1,155 @@
from __future__ import absolute_import
import errno
import itertools
import logging
import os.path
import tempfile
from pip._internal.utils.misc import rmtree
logger = logging.getLogger(__name__)
class TempDirectory(object):
"""Helper class that owns and cleans up a temporary directory.
This class can be used as a context manager or as an OO representation of a
temporary directory.
Attributes:
path
Location to the created temporary directory or None
delete
Whether the directory should be deleted when exiting
(when used as a contextmanager)
Methods:
create()
Creates a temporary directory and stores its path in the path
attribute.
cleanup()
Deletes the temporary directory and sets path attribute to None
When used as a context manager, a temporary directory is created on
entering the context and, if the delete attribute is True, on exiting the
context the created directory is deleted.
"""
def __init__(self, path=None, delete=None, kind="temp"):
super(TempDirectory, self).__init__()
if path is None and delete is None:
# If we were not given an explicit directory, and we were not given
# an explicit delete option, then we'll default to deleting.
delete = True
self.path = path
self.delete = delete
self.kind = kind
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.path)
def __enter__(self):
self.create()
return self
def __exit__(self, exc, value, tb):
if self.delete:
self.cleanup()
def create(self):
"""Create a temporary directory and store its path in self.path
"""
if self.path is not None:
logger.debug(
"Skipped creation of temporary directory: {}".format(self.path)
)
return
# We realpath here because some systems have their default tmpdir
# symlinked to another directory. This tends to confuse build
# scripts, so we canonicalize the path by traversing potential
# symlinks here.
self.path = os.path.realpath(
tempfile.mkdtemp(prefix="pip-{}-".format(self.kind))
)
logger.debug("Created temporary directory: {}".format(self.path))
def cleanup(self):
"""Remove the temporary directory created and reset state
"""
if self.path is not None and os.path.exists(self.path):
rmtree(self.path)
self.path = None
class AdjacentTempDirectory(TempDirectory):
"""Helper class that creates a temporary directory adjacent to a real one.
Attributes:
original
The original directory to create a temp directory for.
path
After calling create() or entering, contains the full
path to the temporary directory.
delete
Whether the directory should be deleted when exiting
(when used as a contextmanager)
"""
# The characters that may be used to name the temp directory
# We always prepend a ~ and then rotate through these until
# a usable name is found.
# pkg_resources raises a different error for .dist-info folder
# with leading '-' and invalid metadata
LEADING_CHARS = "-~.=%0123456789"
def __init__(self, original, delete=None):
super(AdjacentTempDirectory, self).__init__(delete=delete)
self.original = original.rstrip('/\\')
@classmethod
def _generate_names(cls, name):
"""Generates a series of temporary names.
The algorithm replaces the leading characters in the name
with ones that are valid filesystem characters, but are not
valid package names (for both Python and pip definitions of
package).
"""
for i in range(1, len(name)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i - 1):
new_name = '~' + ''.join(candidate) + name[i:]
if new_name != name:
yield new_name
# If we make it this far, we will have to make a longer name
for i in range(len(cls.LEADING_CHARS)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i):
new_name = '~' + ''.join(candidate) + name
if new_name != name:
yield new_name
def create(self):
root, name = os.path.split(self.original)
for candidate in self._generate_names(name):
path = os.path.join(root, candidate)
try:
os.mkdir(path)
except OSError as ex:
# Continue if the name exists already
if ex.errno != errno.EEXIST:
raise
else:
self.path = os.path.realpath(path)
break
if not self.path:
# Final fallback on the default behavior.
self.path = os.path.realpath(
tempfile.mkdtemp(prefix="pip-{}-".format(self.kind))
)
logger.debug("Created temporary directory: {}".format(self.path))

View File

@@ -0,0 +1,29 @@
"""For neatly implementing static typing in pip.
`mypy` - the static type analysis tool we use - uses the `typing` module, which
provides core functionality fundamental to mypy's functioning.
Generally, `typing` would be imported at runtime and used in that fashion -
it acts as a no-op at runtime and does not have any run-time overhead by
design.
As it turns out, `typing` is not vendorable - it uses separate sources for
Python 2/Python 3. Thus, this codebase can not expect it to be present.
To work around this, mypy allows the typing import to be behind a False-y
optional to prevent it from running at runtime and type-comments can be used
to remove the need for the types to be accessible directly during runtime.
This module provides the False-y guard in a nicely named fashion so that a
curious maintainer can reach here to read this.
In pip, all static-typing related imports should be guarded as follows:
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import ...
Ref: https://github.com/python/mypy/issues/3216
"""
MYPY_CHECK_RUNNING = False

View File

@@ -0,0 +1,424 @@
from __future__ import absolute_import, division
import contextlib
import itertools
import logging
import sys
import time
from signal import SIGINT, default_int_handler, signal
from pip._vendor import six
from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
from pip._vendor.progress.spinner import Spinner
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.logging import get_indentation
from pip._internal.utils.misc import format_size
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Iterator, IO
try:
from pip._vendor import colorama
# Lots of different errors can come from this, including SystemError and
# ImportError.
except Exception:
colorama = None
logger = logging.getLogger(__name__)
def _select_progress_class(preferred, fallback):
encoding = getattr(preferred.file, "encoding", None)
# If we don't know what encoding this file is in, then we'll just assume
# that it doesn't support unicode and use the ASCII bar.
if not encoding:
return fallback
# Collect all of the possible characters we want to use with the preferred
# bar.
characters = [
getattr(preferred, "empty_fill", six.text_type()),
getattr(preferred, "fill", six.text_type()),
]
characters += list(getattr(preferred, "phases", []))
# Try to decode the characters we're using for the bar using the encoding
# of the given file, if this works then we'll assume that we can use the
# fancier bar and if not we'll fall back to the plaintext bar.
try:
six.text_type().join(characters).encode(encoding)
except UnicodeEncodeError:
return fallback
else:
return preferred
_BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any
class InterruptibleMixin(object):
"""
Helper to ensure that self.finish() gets called on keyboard interrupt.
This allows downloads to be interrupted without leaving temporary state
(like hidden cursors) behind.
This class is similar to the progress library's existing SigIntMixin
helper, but as of version 1.2, that helper has the following problems:
1. It calls sys.exit().
2. It discards the existing SIGINT handler completely.
3. It leaves its own handler in place even after an uninterrupted finish,
which will have unexpected delayed effects if the user triggers an
unrelated keyboard interrupt some time after a progress-displaying
download has already completed, for example.
"""
def __init__(self, *args, **kwargs):
"""
Save the original SIGINT handler for later.
"""
super(InterruptibleMixin, self).__init__(*args, **kwargs)
self.original_handler = signal(SIGINT, self.handle_sigint)
# If signal() returns None, the previous handler was not installed from
# Python, and we cannot restore it. This probably should not happen,
# but if it does, we must restore something sensible instead, at least.
# The least bad option should be Python's default SIGINT handler, which
# just raises KeyboardInterrupt.
if self.original_handler is None:
self.original_handler = default_int_handler
def finish(self):
"""
Restore the original SIGINT handler after finishing.
This should happen regardless of whether the progress display finishes
normally, or gets interrupted.
"""
super(InterruptibleMixin, self).finish()
signal(SIGINT, self.original_handler)
def handle_sigint(self, signum, frame):
"""
Call self.finish() before delegating to the original SIGINT handler.
This handler should only be in place while the progress display is
active.
"""
self.finish()
self.original_handler(signum, frame)
class SilentBar(Bar):
def update(self):
pass
class BlueEmojiBar(IncrementalBar):
suffix = "%(percent)d%%"
bar_prefix = " "
bar_suffix = " "
phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535") # type: Any
class DownloadProgressMixin(object):
def __init__(self, *args, **kwargs):
super(DownloadProgressMixin, self).__init__(*args, **kwargs)
self.message = (" " * (get_indentation() + 2)) + self.message
@property
def downloaded(self):
return format_size(self.index)
@property
def download_speed(self):
# Avoid zero division errors...
if self.avg == 0.0:
return "..."
return format_size(1 / self.avg) + "/s"
@property
def pretty_eta(self):
if self.eta:
return "eta %s" % self.eta_td
return ""
def iter(self, it, n=1):
for x in it:
yield x
self.next(n)
self.finish()
class WindowsMixin(object):
def __init__(self, *args, **kwargs):
# The Windows terminal does not support the hide/show cursor ANSI codes
# even with colorama. So we'll ensure that hide_cursor is False on
# Windows.
# This call needs to go before the super() call, so that hide_cursor
# is set in time. The base progress bar class writes the "hide cursor"
# code to the terminal in its init, so if we don't set this soon
# enough, we get a "hide" with no corresponding "show"...
if WINDOWS and self.hide_cursor:
self.hide_cursor = False
super(WindowsMixin, self).__init__(*args, **kwargs)
# Check if we are running on Windows and we have the colorama module,
# if we do then wrap our file with it.
if WINDOWS and colorama:
self.file = colorama.AnsiToWin32(self.file)
# The progress code expects to be able to call self.file.isatty()
# but the colorama.AnsiToWin32() object doesn't have that, so we'll
# add it.
self.file.isatty = lambda: self.file.wrapped.isatty()
# The progress code expects to be able to call self.file.flush()
# but the colorama.AnsiToWin32() object doesn't have that, so we'll
# add it.
self.file.flush = lambda: self.file.wrapped.flush()
class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin,
DownloadProgressMixin):
file = sys.stdout
message = "%(percent)d%%"
suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
# NOTE: The "type: ignore" comments on the following classes are there to
# work around https://github.com/python/typing/issues/241
class DefaultDownloadProgressBar(BaseDownloadProgressBar,
_BaseBar):
pass
class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): # type: ignore
pass
class DownloadBar(BaseDownloadProgressBar, # type: ignore
Bar):
pass
class DownloadFillingCirclesBar(BaseDownloadProgressBar, # type: ignore
FillingCirclesBar):
pass
class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, # type: ignore
BlueEmojiBar):
pass
class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin,
DownloadProgressMixin, Spinner):
file = sys.stdout
suffix = "%(downloaded)s %(download_speed)s"
def next_phase(self):
if not hasattr(self, "_phaser"):
self._phaser = itertools.cycle(self.phases)
return next(self._phaser)
def update(self):
message = self.message % self
phase = self.next_phase()
suffix = self.suffix % self
line = ''.join([
message,
" " if message else "",
phase,
" " if suffix else "",
suffix,
])
self.writeln(line)
BAR_TYPES = {
"off": (DownloadSilentBar, DownloadSilentBar),
"on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
"ascii": (DownloadBar, DownloadProgressSpinner),
"pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
"emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner)
}
def DownloadProgressProvider(progress_bar, max=None):
if max is None or max == 0:
return BAR_TYPES[progress_bar][1]().iter
else:
return BAR_TYPES[progress_bar][0](max=max).iter
################################################################
# Generic "something is happening" spinners
#
# We don't even try using progress.spinner.Spinner here because it's actually
# simpler to reimplement from scratch than to coerce their code into doing
# what we need.
################################################################
@contextlib.contextmanager
def hidden_cursor(file):
# type: (IO) -> Iterator[None]
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
class RateLimiter(object):
def __init__(self, min_update_interval_seconds):
# type: (float) -> None
self._min_update_interval_seconds = min_update_interval_seconds
self._last_update = 0 # type: float
def ready(self):
# type: () -> bool
now = time.time()
delta = now - self._last_update
return delta >= self._min_update_interval_seconds
def reset(self):
# type: () -> None
self._last_update = time.time()
class SpinnerInterface(object):
def spin(self):
# type: () -> None
raise NotImplementedError()
def finish(self, final_status):
# type: (str) -> None
raise NotImplementedError()
class InteractiveSpinner(SpinnerInterface):
def __init__(self, message, file=None, spin_chars="-\\|/",
# Empirically, 8 updates/second looks nice
min_update_interval_seconds=0.125):
self._message = message
if file is None:
file = sys.stdout
self._file = file
self._rate_limiter = RateLimiter(min_update_interval_seconds)
self._finished = False
self._spin_cycle = itertools.cycle(spin_chars)
self._file.write(" " * get_indentation() + self._message + " ... ")
self._width = 0
def _write(self, status):
assert not self._finished
# Erase what we wrote before by backspacing to the beginning, writing
# spaces to overwrite the old text, and then backspacing again
backup = "\b" * self._width
self._file.write(backup + " " * self._width + backup)
# Now we have a blank slate to add our status
self._file.write(status)
self._width = len(status)
self._file.flush()
self._rate_limiter.reset()
def spin(self):
# type: () -> None
if self._finished:
return
if not self._rate_limiter.ready():
return
self._write(next(self._spin_cycle))
def finish(self, final_status):
# type: (str) -> None
if self._finished:
return
self._write(final_status)
self._file.write("\n")
self._file.flush()
self._finished = True
# Used for dumb terminals, non-interactive installs (no tty), etc.
# We still print updates occasionally (once every 60 seconds by default) to
# act as a keep-alive for systems like Travis-CI that take lack-of-output as
# an indication that a task has frozen.
class NonInteractiveSpinner(SpinnerInterface):
def __init__(self, message, min_update_interval_seconds=60):
# type: (str, float) -> None
self._message = message
self._finished = False
self._rate_limiter = RateLimiter(min_update_interval_seconds)
self._update("started")
def _update(self, status):
assert not self._finished
self._rate_limiter.reset()
logger.info("%s: %s", self._message, status)
def spin(self):
# type: () -> None
if self._finished:
return
if not self._rate_limiter.ready():
return
self._update("still running...")
def finish(self, final_status):
# type: (str) -> None
if self._finished:
return
self._update("finished with status '%s'" % (final_status,))
self._finished = True
@contextlib.contextmanager
def open_spinner(message):
# type: (str) -> Iterator[SpinnerInterface]
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message) # type: SpinnerInterface
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")

View File

@@ -0,0 +1,34 @@
import os.path
import site
import sys
def running_under_virtualenv():
# type: () -> bool
"""
Return True if we're running inside a virtualenv, False otherwise.
"""
if hasattr(sys, 'real_prefix'):
# pypa/virtualenv case
return True
elif sys.prefix != getattr(sys, "base_prefix", sys.prefix):
# PEP 405 venv
return True
return False
def virtualenv_no_global():
# type: () -> bool
"""
Return True if in a venv and no system site packages.
"""
# this mirrors the logic in virtualenv.py for locating the
# no-global-site-packages.txt file
site_mod_dir = os.path.dirname(os.path.abspath(site.__file__))
no_global_file = os.path.join(site_mod_dir, 'no-global-site-packages.txt')
if running_under_virtualenv() and os.path.isfile(no_global_file):
return True
else:
return False