File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/util.py.tar
Back
usr/lib/python3.11/distutils/util.py 0000644 00000051050 15027502511 0013321 0 ustar 00 """distutils.util Miscellaneous utility functions -- anything that doesn't fit into one of the other *util.py modules. """ import os import re import importlib.util import string import sys import distutils from distutils.errors import DistutilsPlatformError from distutils.dep_util import newer from distutils.spawn import spawn from distutils import log from distutils.errors import DistutilsByteCompileError def get_host_platform(): """Return a string that identifies the current platform. This is used mainly to distinguish platform-specific build directories and platform-specific built distributions. Typically includes the OS name and version and the architecture (as supplied by 'os.uname()'), although the exact information included depends on the OS; eg. on Linux, the kernel version isn't particularly important. Examples of returned values: linux-i586 linux-alpha (?) solaris-2.6-sun4u Windows will return one of: win-amd64 (64bit Windows on AMD64 (aka x86_64, Intel64, EM64T, etc) win32 (all others - specifically, sys.platform is returned) For other non-POSIX platforms, currently just returns 'sys.platform'. """ if os.name == 'nt': if 'amd64' in sys.version.lower(): return 'win-amd64' if '(arm)' in sys.version.lower(): return 'win-arm32' if '(arm64)' in sys.version.lower(): return 'win-arm64' return sys.platform # Set for cross builds explicitly if "_PYTHON_HOST_PLATFORM" in os.environ: return os.environ["_PYTHON_HOST_PLATFORM"] if os.name != "posix" or not hasattr(os, 'uname'): # XXX what about the architecture? NT is Intel or Alpha, # Mac OS is M68k or PPC, etc. return sys.platform # Try to distinguish various flavours of Unix (osname, host, release, version, machine) = os.uname() # Convert the OS name to lowercase, remove '/' characters, and translate # spaces (for "Power Macintosh") osname = osname.lower().replace('/', '') machine = machine.replace(' ', '_') machine = machine.replace('/', '-') if osname[:5] == "linux": # At least on Linux/Intel, 'machine' is the processor -- # i386, etc. # XXX what about Alpha, SPARC, etc? return "%s-%s" % (osname, machine) elif osname[:5] == "sunos": if release[0] >= "5": # SunOS 5 == Solaris 2 osname = "solaris" release = "%d.%s" % (int(release[0]) - 3, release[2:]) # We can't use "platform.architecture()[0]" because a # bootstrap problem. We use a dict to get an error # if some suspicious happens. bitness = {2147483647:"32bit", 9223372036854775807:"64bit"} machine += ".%s" % bitness[sys.maxsize] # fall through to standard osname-release-machine representation elif osname[:3] == "aix": from _aix_support import aix_platform return aix_platform() elif osname[:6] == "cygwin": osname = "cygwin" rel_re = re.compile (r'[\d.]+', re.ASCII) m = rel_re.match(release) if m: release = m.group() elif osname[:6] == "darwin": import _osx_support, distutils.sysconfig osname, release, machine = _osx_support.get_platform_osx( distutils.sysconfig.get_config_vars(), osname, release, machine) return "%s-%s-%s" % (osname, release, machine) def get_platform(): if os.name == 'nt': TARGET_TO_PLAT = { 'x86' : 'win32', 'x64' : 'win-amd64', 'arm' : 'win-arm32', } return TARGET_TO_PLAT.get(os.environ.get('VSCMD_ARG_TGT_ARCH')) or get_host_platform() else: return get_host_platform() def convert_path (pathname): """Return 'pathname' as a name that will work on the native filesystem, i.e. split it on '/' and put it back together again using the current directory separator. Needed because filenames in the setup script are always supplied in Unix style, and have to be converted to the local convention before we can actually use them in the filesystem. Raises ValueError on non-Unix-ish systems if 'pathname' either starts or ends with a slash. """ if os.sep == '/': return pathname if not pathname: return pathname if pathname[0] == '/': raise ValueError("path '%s' cannot be absolute" % pathname) if pathname[-1] == '/': raise ValueError("path '%s' cannot end with '/'" % pathname) paths = pathname.split('/') while '.' in paths: paths.remove('.') if not paths: return os.curdir return os.path.join(*paths) # convert_path () def change_root (new_root, pathname): """Return 'pathname' with 'new_root' prepended. If 'pathname' is relative, this is equivalent to "os.path.join(new_root,pathname)". Otherwise, it requires making 'pathname' relative and then joining the two, which is tricky on DOS/Windows and Mac OS. """ if os.name == 'posix': if not os.path.isabs(pathname): return os.path.join(new_root, pathname) else: return os.path.join(new_root, pathname[1:]) elif os.name == 'nt': (drive, path) = os.path.splitdrive(pathname) if path[0] == '\\': path = path[1:] return os.path.join(new_root, path) else: raise DistutilsPlatformError("nothing known about platform '%s'" % os.name) _environ_checked = 0 def check_environ (): """Ensure that 'os.environ' has all the environment variables we guarantee that users can use in config files, command-line options, etc. Currently this includes: HOME - user's home directory (Unix only) PLAT - description of the current platform, including hardware and OS (see 'get_platform()') """ global _environ_checked if _environ_checked: return if os.name == 'posix' and 'HOME' not in os.environ: try: import pwd os.environ['HOME'] = pwd.getpwuid(os.getuid())[5] except (ImportError, KeyError): # bpo-10496: if the current user identifier doesn't exist in the # password database, do nothing pass if 'PLAT' not in os.environ: os.environ['PLAT'] = get_platform() _environ_checked = 1 def subst_vars (s, local_vars): """Perform shell/Perl-style variable substitution on 'string'. Every occurrence of '$' followed by a name is considered a variable, and variable is substituted by the value found in the 'local_vars' dictionary, or in 'os.environ' if it's not in 'local_vars'. 'os.environ' is first checked/augmented to guarantee that it contains certain values: see 'check_environ()'. Raise ValueError for any variables not found in either 'local_vars' or 'os.environ'. """ check_environ() def _subst (match, local_vars=local_vars): var_name = match.group(1) if var_name in local_vars: return str(local_vars[var_name]) else: return os.environ[var_name] try: return re.sub(r'\$([a-zA-Z_][a-zA-Z_0-9]*)', _subst, s) except KeyError as var: raise ValueError("invalid variable '$%s'" % var) # subst_vars () def grok_environment_error (exc, prefix="error: "): # Function kept for backward compatibility. # Used to try clever things with EnvironmentErrors, # but nowadays str(exception) produces good messages. return prefix + str(exc) # Needed by 'split_quoted()' _wordchars_re = _squote_re = _dquote_re = None def _init_regex(): global _wordchars_re, _squote_re, _dquote_re _wordchars_re = re.compile(r'[^\\\'\"%s ]*' % string.whitespace) _squote_re = re.compile(r"'(?:[^'\\]|\\.)*'") _dquote_re = re.compile(r'"(?:[^"\\]|\\.)*"') def split_quoted (s): """Split a string up according to Unix shell-like rules for quotes and backslashes. In short: words are delimited by spaces, as long as those spaces are not escaped by a backslash, or inside a quoted string. Single and double quotes are equivalent, and the quote characters can be backslash-escaped. The backslash is stripped from any two-character escape sequence, leaving only the escaped character. The quote characters are stripped from any quoted string. Returns a list of words. """ # This is a nice algorithm for splitting up a single string, since it # doesn't require character-by-character examination. It was a little # bit of a brain-bender to get it working right, though... if _wordchars_re is None: _init_regex() s = s.strip() words = [] pos = 0 while s: m = _wordchars_re.match(s, pos) end = m.end() if end == len(s): words.append(s[:end]) break if s[end] in string.whitespace: # unescaped, unquoted whitespace: now words.append(s[:end]) # we definitely have a word delimiter s = s[end:].lstrip() pos = 0 elif s[end] == '\\': # preserve whatever is being escaped; # will become part of the current word s = s[:end] + s[end+1:] pos = end+1 else: if s[end] == "'": # slurp singly-quoted string m = _squote_re.match(s, end) elif s[end] == '"': # slurp doubly-quoted string m = _dquote_re.match(s, end) else: raise RuntimeError("this can't happen (bad char '%c')" % s[end]) if m is None: raise ValueError("bad string (mismatched %s quotes?)" % s[end]) (beg, end) = m.span() s = s[:beg] + s[beg+1:end-1] + s[end:] pos = m.end() - 2 if pos >= len(s): words.append(s) break return words # split_quoted () def execute (func, args, msg=None, verbose=0, dry_run=0): """Perform some action that affects the outside world (eg. by writing to the filesystem). Such actions are special because they are disabled by the 'dry_run' flag. This method takes care of all that bureaucracy for you; all you have to do is supply the function to call and an argument tuple for it (to embody the "external action" being performed), and an optional message to print. """ if msg is None: msg = "%s%r" % (func.__name__, args) if msg[-2:] == ',)': # correct for singleton tuple msg = msg[0:-2] + ')' log.info(msg) if not dry_run: func(*args) def strtobool (val): """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 'val' is anything else. """ val = val.lower() if val in ('y', 'yes', 't', 'true', 'on', '1'): return 1 elif val in ('n', 'no', 'f', 'false', 'off', '0'): return 0 else: raise ValueError("invalid truth value %r" % (val,)) def byte_compile (py_files, optimize=0, force=0, prefix=None, base_dir=None, verbose=1, dry_run=0, direct=None): """Byte-compile a collection of Python source files to .pyc files in a __pycache__ subdirectory. 'py_files' is a list of files to compile; any files that don't end in ".py" are silently skipped. 'optimize' must be one of the following: 0 - don't optimize 1 - normal optimization (like "python -O") 2 - extra optimization (like "python -OO") If 'force' is true, all files are recompiled regardless of timestamps. The source filename encoded in each bytecode file defaults to the filenames listed in 'py_files'; you can modify these with 'prefix' and 'basedir'. 'prefix' is a string that will be stripped off of each source filename, and 'base_dir' is a directory name that will be prepended (after 'prefix' is stripped). You can supply either or both (or neither) of 'prefix' and 'base_dir', as you wish. If 'dry_run' is true, doesn't actually do anything that would affect the filesystem. Byte-compilation is either done directly in this interpreter process with the standard py_compile module, or indirectly by writing a temporary script and executing it. Normally, you should let 'byte_compile()' figure out to use direct compilation or not (see the source for details). The 'direct' flag is used by the script generated in indirect mode; unless you know what you're doing, leave it set to None. """ # Late import to fix a bootstrap issue: _posixsubprocess is built by # setup.py, but setup.py uses distutils. import subprocess # nothing is done if sys.dont_write_bytecode is True if sys.dont_write_bytecode: raise DistutilsByteCompileError('byte-compiling is disabled.') # First, if the caller didn't force us into direct or indirect mode, # figure out which mode we should be in. We take a conservative # approach: choose direct mode *only* if the current interpreter is # in debug mode and optimize is 0. If we're not in debug mode (-O # or -OO), we don't know which level of optimization this # interpreter is running with, so we can't do direct # byte-compilation and be certain that it's the right thing. Thus, # always compile indirectly if the current interpreter is in either # optimize mode, or if either optimization level was requested by # the caller. if direct is None: direct = (__debug__ and optimize == 0) # "Indirect" byte-compilation: write a temporary script and then # run it with the appropriate flags. if not direct: try: from tempfile import mkstemp (script_fd, script_name) = mkstemp(".py") except ImportError: from tempfile import mktemp (script_fd, script_name) = None, mktemp(".py") log.info("writing byte-compilation script '%s'", script_name) if not dry_run: if script_fd is not None: script = os.fdopen(script_fd, "w") else: script = open(script_name, "w") with script: script.write("""\ from distutils.util import byte_compile files = [ """) # XXX would be nice to write absolute filenames, just for # safety's sake (script should be more robust in the face of # chdir'ing before running it). But this requires abspath'ing # 'prefix' as well, and that breaks the hack in build_lib's # 'byte_compile()' method that carefully tacks on a trailing # slash (os.sep really) to make sure the prefix here is "just # right". This whole prefix business is rather delicate -- the # problem is that it's really a directory, but I'm treating it # as a dumb string, so trailing slashes and so forth matter. #py_files = map(os.path.abspath, py_files) #if prefix: # prefix = os.path.abspath(prefix) script.write(",\n".join(map(repr, py_files)) + "]\n") script.write(""" byte_compile(files, optimize=%r, force=%r, prefix=%r, base_dir=%r, verbose=%r, dry_run=0, direct=1) """ % (optimize, force, prefix, base_dir, verbose)) msg = distutils._DEPRECATION_MESSAGE cmd = [sys.executable] cmd.extend(subprocess._optim_args_from_interpreter_flags()) cmd.append(f'-Wignore:{msg}:DeprecationWarning') cmd.append(script_name) spawn(cmd, dry_run=dry_run) execute(os.remove, (script_name,), "removing %s" % script_name, dry_run=dry_run) # "Direct" byte-compilation: use the py_compile module to compile # right here, right now. Note that the script generated in indirect # mode simply calls 'byte_compile()' in direct mode, a weird sort of # cross-process recursion. Hey, it works! else: from py_compile import compile for file in py_files: if file[-3:] != ".py": # This lets us be lazy and not filter filenames in # the "install_lib" command. continue # Terminology from the py_compile module: # cfile - byte-compiled file # dfile - purported source filename (same as 'file' by default) if optimize >= 0: opt = '' if optimize == 0 else optimize cfile = importlib.util.cache_from_source( file, optimization=opt) else: cfile = importlib.util.cache_from_source(file) dfile = file if prefix: if file[:len(prefix)] != prefix: raise ValueError("invalid prefix: filename %r doesn't start with %r" % (file, prefix)) dfile = dfile[len(prefix):] if base_dir: dfile = os.path.join(base_dir, dfile) cfile_base = os.path.basename(cfile) if direct: if force or newer(file, cfile): log.info("byte-compiling %s to %s", file, cfile_base) if not dry_run: compile(file, cfile, dfile) else: log.debug("skipping byte-compilation of %s to %s", file, cfile_base) # byte_compile () def rfc822_escape (header): """Return a version of the string escaped for inclusion in an RFC-822 header, by ensuring there are 8 spaces space after each newline. """ lines = header.split('\n') sep = '\n' + 8 * ' ' return sep.join(lines) # 2to3 support def run_2to3(files, fixer_names=None, options=None, explicit=None): """Invoke 2to3 on a list of Python files. The files should all come from the build area, as the modification is done in-place. To reduce the build time, only files modified since the last invocation of this function should be passed in the files argument.""" if not files: return # Make this class local, to delay import of 2to3 from lib2to3.refactor import RefactoringTool, get_fixers_from_package class DistutilsRefactoringTool(RefactoringTool): def log_error(self, msg, *args, **kw): log.error(msg, *args) def log_message(self, msg, *args): log.info(msg, *args) def log_debug(self, msg, *args): log.debug(msg, *args) if fixer_names is None: fixer_names = get_fixers_from_package('lib2to3.fixes') r = DistutilsRefactoringTool(fixer_names, options=options) r.refactor(files, write=True) def copydir_run_2to3(src, dest, template=None, fixer_names=None, options=None, explicit=None): """Recursively copy a directory, only copying new and changed files, running run_2to3 over all newly copied Python modules afterward. If you give a template string, it's parsed like a MANIFEST.in. """ from distutils.dir_util import mkpath from distutils.file_util import copy_file from distutils.filelist import FileList filelist = FileList() curdir = os.getcwd() os.chdir(src) try: filelist.findall() finally: os.chdir(curdir) filelist.files[:] = filelist.allfiles if template: for line in template.splitlines(): line = line.strip() if not line: continue filelist.process_template_line(line) copied = [] for filename in filelist.files: outname = os.path.join(dest, filename) mkpath(os.path.dirname(outname)) res = copy_file(os.path.join(src, filename), outname, update=1) if res[1]: copied.append(outname) run_2to3([fn for fn in copied if fn.lower().endswith('.py')], fixer_names=fixer_names, options=options, explicit=explicit) return copied class Mixin2to3: '''Mixin class for commands that run 2to3. To configure 2to3, setup scripts may either change the class variables, or inherit from individual commands to override how 2to3 is invoked.''' # provide list of fixers to run; # defaults to all from lib2to3.fixers fixer_names = None # options dictionary options = None # list of fixers to invoke even though they are marked as explicit explicit = None def run_2to3(self, files): return run_2to3(files, self.fixer_names, self.options, self.explicit) usr/lib/python3.10/wsgiref/util.py 0000644 00000013333 15027503513 0012747 0 ustar 00 """Miscellaneous WSGI-related Utilities""" import posixpath __all__ = [ 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri', 'shift_path_info', 'setup_testing_defaults', ] class FileWrapper: """Wrapper to convert file-like objects to iterables""" def __init__(self, filelike, blksize=8192): self.filelike = filelike self.blksize = blksize if hasattr(filelike,'close'): self.close = filelike.close def __getitem__(self,key): import warnings warnings.warn( "FileWrapper's __getitem__ method ignores 'key' parameter. " "Use iterator protocol instead.", DeprecationWarning, stacklevel=2 ) data = self.filelike.read(self.blksize) if data: return data raise IndexError def __iter__(self): return self def __next__(self): data = self.filelike.read(self.blksize) if data: return data raise StopIteration def guess_scheme(environ): """Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https' """ if environ.get("HTTPS") in ('yes','on','1'): return 'https' else: return 'http' def application_uri(environ): """Return the application's base URI (no PATH_INFO or QUERY_STRING)""" url = environ['wsgi.url_scheme']+'://' from urllib.parse import quote if environ.get('HTTP_HOST'): url += environ['HTTP_HOST'] else: url += environ['SERVER_NAME'] if environ['wsgi.url_scheme'] == 'https': if environ['SERVER_PORT'] != '443': url += ':' + environ['SERVER_PORT'] else: if environ['SERVER_PORT'] != '80': url += ':' + environ['SERVER_PORT'] url += quote(environ.get('SCRIPT_NAME') or '/', encoding='latin1') return url def request_uri(environ, include_query=True): """Return the full request URI, optionally including the query string""" url = application_uri(environ) from urllib.parse import quote path_info = quote(environ.get('PATH_INFO',''), safe='/;=,', encoding='latin1') if not environ.get('SCRIPT_NAME'): url += path_info[1:] else: url += path_info if include_query and environ.get('QUERY_STRING'): url += '?' + environ['QUERY_STRING'] return url def shift_path_info(environ): """Shift a name from PATH_INFO to SCRIPT_NAME, returning it If there are no remaining path segments in PATH_INFO, return None. Note: 'environ' is modified in-place; use a copy if you need to keep the original PATH_INFO or SCRIPT_NAME. Note: when PATH_INFO is just a '/', this returns '' and appends a trailing '/' to SCRIPT_NAME, even though empty path segments are normally ignored, and SCRIPT_NAME doesn't normally end in a '/'. This is intentional behavior, to ensure that an application can tell the difference between '/x' and '/x/' when traversing to objects. """ path_info = environ.get('PATH_INFO','') if not path_info: return None path_parts = path_info.split('/') path_parts[1:-1] = [p for p in path_parts[1:-1] if p and p != '.'] name = path_parts[1] del path_parts[1] script_name = environ.get('SCRIPT_NAME','') script_name = posixpath.normpath(script_name+'/'+name) if script_name.endswith('/'): script_name = script_name[:-1] if not name and not script_name.endswith('/'): script_name += '/' environ['SCRIPT_NAME'] = script_name environ['PATH_INFO'] = '/'.join(path_parts) # Special case: '/.' on PATH_INFO doesn't get stripped, # because we don't strip the last element of PATH_INFO # if there's only one path part left. Instead of fixing this # above, we fix it here so that PATH_INFO gets normalized to # an empty string in the environ. if name=='.': name = None return name def setup_testing_defaults(environ): """Update 'environ' with trivial defaults for testing purposes This adds various parameters required for WSGI, including HTTP_HOST, SERVER_NAME, SERVER_PORT, REQUEST_METHOD, SCRIPT_NAME, PATH_INFO, and all of the wsgi.* variables. It only supplies default values, and does not replace any existing settings for these variables. This routine is intended to make it easier for unit tests of WSGI servers and applications to set up dummy environments. It should *not* be used by actual WSGI servers or applications, since the data is fake! """ environ.setdefault('SERVER_NAME','127.0.0.1') environ.setdefault('SERVER_PROTOCOL','HTTP/1.0') environ.setdefault('HTTP_HOST',environ['SERVER_NAME']) environ.setdefault('REQUEST_METHOD','GET') if 'SCRIPT_NAME' not in environ and 'PATH_INFO' not in environ: environ.setdefault('SCRIPT_NAME','') environ.setdefault('PATH_INFO','/') environ.setdefault('wsgi.version', (1,0)) environ.setdefault('wsgi.run_once', 0) environ.setdefault('wsgi.multithread', 0) environ.setdefault('wsgi.multiprocess', 0) from io import StringIO, BytesIO environ.setdefault('wsgi.input', BytesIO()) environ.setdefault('wsgi.errors', StringIO()) environ.setdefault('wsgi.url_scheme',guess_scheme(environ)) if environ['wsgi.url_scheme']=='http': environ.setdefault('SERVER_PORT', '80') elif environ['wsgi.url_scheme']=='https': environ.setdefault('SERVER_PORT', '443') _hoppish = { 'connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade' }.__contains__ def is_hop_by_hop(header_name): """Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header""" return _hoppish(header_name.lower()) usr/lib/python3.10/unittest/util.py 0000644 00000012137 15027504355 0013166 0 ustar 00 """Various utility functions.""" from collections import namedtuple, Counter from os.path import commonprefix __unittest = True _MAX_LENGTH = 80 _PLACEHOLDER_LEN = 12 _MIN_BEGIN_LEN = 5 _MIN_END_LEN = 5 _MIN_COMMON_LEN = 5 _MIN_DIFF_LEN = _MAX_LENGTH - \ (_MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN + _PLACEHOLDER_LEN + _MIN_END_LEN) assert _MIN_DIFF_LEN >= 0 def _shorten(s, prefixlen, suffixlen): skip = len(s) - prefixlen - suffixlen if skip > _PLACEHOLDER_LEN: s = '%s[%d chars]%s' % (s[:prefixlen], skip, s[len(s) - suffixlen:]) return s def _common_shorten_repr(*args): args = tuple(map(safe_repr, args)) maxlen = max(map(len, args)) if maxlen <= _MAX_LENGTH: return args prefix = commonprefix(args) prefixlen = len(prefix) common_len = _MAX_LENGTH - \ (maxlen - prefixlen + _MIN_BEGIN_LEN + _PLACEHOLDER_LEN) if common_len > _MIN_COMMON_LEN: assert _MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN + \ (maxlen - prefixlen) < _MAX_LENGTH prefix = _shorten(prefix, _MIN_BEGIN_LEN, common_len) return tuple(prefix + s[prefixlen:] for s in args) prefix = _shorten(prefix, _MIN_BEGIN_LEN, _MIN_COMMON_LEN) return tuple(prefix + _shorten(s[prefixlen:], _MIN_DIFF_LEN, _MIN_END_LEN) for s in args) def safe_repr(obj, short=False): try: result = repr(obj) except Exception: result = object.__repr__(obj) if not short or len(result) < _MAX_LENGTH: return result return result[:_MAX_LENGTH] + ' [truncated]...' def strclass(cls): return "%s.%s" % (cls.__module__, cls.__qualname__) def sorted_list_difference(expected, actual): """Finds elements in only one or the other of two, sorted input lists. Returns a two-element tuple of lists. The first list contains those elements in the "expected" list but not in the "actual" list, and the second contains those elements in the "actual" list but not in the "expected" list. Duplicate elements in either input list are ignored. """ i = j = 0 missing = [] unexpected = [] while True: try: e = expected[i] a = actual[j] if e < a: missing.append(e) i += 1 while expected[i] == e: i += 1 elif e > a: unexpected.append(a) j += 1 while actual[j] == a: j += 1 else: i += 1 try: while expected[i] == e: i += 1 finally: j += 1 while actual[j] == a: j += 1 except IndexError: missing.extend(expected[i:]) unexpected.extend(actual[j:]) break return missing, unexpected def unorderable_list_difference(expected, actual): """Same behavior as sorted_list_difference but for lists of unorderable items (like dicts). As it does a linear search per item (remove) it has O(n*n) performance.""" missing = [] while expected: item = expected.pop() try: actual.remove(item) except ValueError: missing.append(item) # anything left in actual is unexpected return missing, actual def three_way_cmp(x, y): """Return -1 if x < y, 0 if x == y and 1 if x > y""" return (x > y) - (x < y) _Mismatch = namedtuple('Mismatch', 'actual expected value') def _count_diff_all_purpose(actual, expected): 'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ' # elements need not be hashable s, t = list(actual), list(expected) m, n = len(s), len(t) NULL = object() result = [] for i, elem in enumerate(s): if elem is NULL: continue cnt_s = cnt_t = 0 for j in range(i, m): if s[j] == elem: cnt_s += 1 s[j] = NULL for j, other_elem in enumerate(t): if other_elem == elem: cnt_t += 1 t[j] = NULL if cnt_s != cnt_t: diff = _Mismatch(cnt_s, cnt_t, elem) result.append(diff) for i, elem in enumerate(t): if elem is NULL: continue cnt_t = 0 for j in range(i, n): if t[j] == elem: cnt_t += 1 t[j] = NULL diff = _Mismatch(0, cnt_t, elem) result.append(diff) return result def _count_diff_hashable(actual, expected): 'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ' # elements must be hashable s, t = Counter(actual), Counter(expected) result = [] for elem, cnt_s in s.items(): cnt_t = t.get(elem, 0) if cnt_s != cnt_t: diff = _Mismatch(cnt_s, cnt_t, elem) result.append(diff) for elem, cnt_t in t.items(): if elem not in s: diff = _Mismatch(0, cnt_t, elem) result.append(diff) return result usr/lib/python3.10/importlib/util.py 0000644 00000026337 15027505572 0013321 0 ustar 00 """Utility code for constructing importers, etc.""" from ._abc import Loader from ._bootstrap import module_from_spec from ._bootstrap import _resolve_name from ._bootstrap import spec_from_loader from ._bootstrap import _find_spec from ._bootstrap_external import MAGIC_NUMBER from ._bootstrap_external import _RAW_MAGIC_NUMBER from ._bootstrap_external import cache_from_source from ._bootstrap_external import decode_source from ._bootstrap_external import source_from_cache from ._bootstrap_external import spec_from_file_location from contextlib import contextmanager import _imp import functools import sys import types import warnings def source_hash(source_bytes): "Return the hash of *source_bytes* as used in hash-based pyc files." return _imp.source_hash(_RAW_MAGIC_NUMBER, source_bytes) def resolve_name(name, package): """Resolve a relative module name to an absolute one.""" if not name.startswith('.'): return name elif not package: raise ImportError(f'no package specified for {repr(name)} ' '(required for relative module names)') level = 0 for character in name: if character != '.': break level += 1 return _resolve_name(name[level:], package, level) def _find_spec_from_path(name, path=None): """Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. Dotted names do not have their parent packages implicitly imported. You will most likely need to explicitly import all parent packages in the proper order for a submodule to get the correct spec. """ if name not in sys.modules: return _find_spec(name, path) else: module = sys.modules[name] if module is None: return None try: spec = module.__spec__ except AttributeError: raise ValueError('{}.__spec__ is not set'.format(name)) from None else: if spec is None: raise ValueError('{}.__spec__ is None'.format(name)) return spec def find_spec(name, package=None): """Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. If the name is for submodule (contains a dot), the parent module is automatically imported. The name and package arguments work the same as importlib.import_module(). In other words, relative module names (with leading dots) work. """ fullname = resolve_name(name, package) if name.startswith('.') else name if fullname not in sys.modules: parent_name = fullname.rpartition('.')[0] if parent_name: parent = __import__(parent_name, fromlist=['__path__']) try: parent_path = parent.__path__ except AttributeError as e: raise ModuleNotFoundError( f"__path__ attribute not found on {parent_name!r} " f"while trying to find {fullname!r}", name=fullname) from e else: parent_path = None return _find_spec(fullname, parent_path) else: module = sys.modules[fullname] if module is None: return None try: spec = module.__spec__ except AttributeError: raise ValueError('{}.__spec__ is not set'.format(name)) from None else: if spec is None: raise ValueError('{}.__spec__ is None'.format(name)) return spec @contextmanager def _module_to_load(name): is_reload = name in sys.modules module = sys.modules.get(name) if not is_reload: # This must be done before open() is called as the 'io' module # implicitly imports 'locale' and would otherwise trigger an # infinite loop. module = type(sys)(name) # This must be done before putting the module in sys.modules # (otherwise an optimization shortcut in import.c becomes wrong) module.__initializing__ = True sys.modules[name] = module try: yield module except Exception: if not is_reload: try: del sys.modules[name] except KeyError: pass finally: module.__initializing__ = False def set_package(fxn): """Set __package__ on the returned module. This function is deprecated. """ @functools.wraps(fxn) def set_package_wrapper(*args, **kwargs): warnings.warn('The import system now takes care of this automatically; ' 'this decorator is slated for removal in Python 3.12', DeprecationWarning, stacklevel=2) module = fxn(*args, **kwargs) if getattr(module, '__package__', None) is None: module.__package__ = module.__name__ if not hasattr(module, '__path__'): module.__package__ = module.__package__.rpartition('.')[0] return module return set_package_wrapper def set_loader(fxn): """Set __loader__ on the returned module. This function is deprecated. """ @functools.wraps(fxn) def set_loader_wrapper(self, *args, **kwargs): warnings.warn('The import system now takes care of this automatically; ' 'this decorator is slated for removal in Python 3.12', DeprecationWarning, stacklevel=2) module = fxn(self, *args, **kwargs) if getattr(module, '__loader__', None) is None: module.__loader__ = self return module return set_loader_wrapper def module_for_loader(fxn): """Decorator to handle selecting the proper module for loaders. The decorated function is passed the module to use instead of the module name. The module passed in to the function is either from sys.modules if it already exists or is a new module. If the module is new, then __name__ is set the first argument to the method, __loader__ is set to self, and __package__ is set accordingly (if self.is_package() is defined) will be set before it is passed to the decorated function (if self.is_package() does not work for the module it will be set post-load). If an exception is raised and the decorator created the module it is subsequently removed from sys.modules. The decorator assumes that the decorated function takes the module name as the second argument. """ warnings.warn('The import system now takes care of this automatically; ' 'this decorator is slated for removal in Python 3.12', DeprecationWarning, stacklevel=2) @functools.wraps(fxn) def module_for_loader_wrapper(self, fullname, *args, **kwargs): with _module_to_load(fullname) as module: module.__loader__ = self try: is_package = self.is_package(fullname) except (ImportError, AttributeError): pass else: if is_package: module.__package__ = fullname else: module.__package__ = fullname.rpartition('.')[0] # If __package__ was not set above, __import__() will do it later. return fxn(self, module, *args, **kwargs) return module_for_loader_wrapper class _LazyModule(types.ModuleType): """A subclass of the module type which triggers loading upon attribute access.""" def __getattribute__(self, attr): """Trigger the load of the module and return the attribute.""" # All module metadata must be garnered from __spec__ in order to avoid # using mutated values. # Stop triggering this method. self.__class__ = types.ModuleType # Get the original name to make sure no object substitution occurred # in sys.modules. original_name = self.__spec__.name # Figure out exactly what attributes were mutated between the creation # of the module and now. attrs_then = self.__spec__.loader_state['__dict__'] attrs_now = self.__dict__ attrs_updated = {} for key, value in attrs_now.items(): # Code that set the attribute may have kept a reference to the # assigned object, making identity more important than equality. if key not in attrs_then: attrs_updated[key] = value elif id(attrs_now[key]) != id(attrs_then[key]): attrs_updated[key] = value self.__spec__.loader.exec_module(self) # If exec_module() was used directly there is no guarantee the module # object was put into sys.modules. if original_name in sys.modules: if id(self) != id(sys.modules[original_name]): raise ValueError(f"module object for {original_name!r} " "substituted in sys.modules during a lazy " "load") # Update after loading since that's what would happen in an eager # loading situation. self.__dict__.update(attrs_updated) return getattr(self, attr) def __delattr__(self, attr): """Trigger the load and then perform the deletion.""" # To trigger the load and raise an exception if the attribute # doesn't exist. self.__getattribute__(attr) delattr(self, attr) class LazyLoader(Loader): """A loader that creates a module which defers loading until attribute access.""" @staticmethod def __check_eager_loader(loader): if not hasattr(loader, 'exec_module'): raise TypeError('loader must define exec_module()') @classmethod def factory(cls, loader): """Construct a callable which returns the eager loader made lazy.""" cls.__check_eager_loader(loader) return lambda *args, **kwargs: cls(loader(*args, **kwargs)) def __init__(self, loader): self.__check_eager_loader(loader) self.loader = loader def create_module(self, spec): return self.loader.create_module(spec) def exec_module(self, module): """Make the module load lazily.""" module.__spec__.loader = self.loader module.__loader__ = self.loader # Don't need to worry about deep-copying as trying to set an attribute # on an object would have triggered the load, # e.g. ``module.__spec__.loader = None`` would trigger a load from # trying to access module.__spec__. loader_state = {} loader_state['__dict__'] = module.__dict__.copy() loader_state['__class__'] = module.__class__ module.__spec__.loader_state = loader_state module.__class__ = _LazyModule usr/lib/python3.10/multiprocessing/util.py 0000644 00000033310 15027506056 0014532 0 ustar 00 # # Module providing various facilities to other parts of the package # # multiprocessing/util.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # import os import itertools import sys import weakref import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does from subprocess import _args_from_interpreter_flags from . import process __all__ = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', ] # # Logging # NOTSET = 0 SUBDEBUG = 5 DEBUG = 10 INFO = 20 SUBWARNING = 25 LOGGER_NAME = 'multiprocessing' DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' _logger = None _log_to_stderr = False def sub_debug(msg, *args): if _logger: _logger.log(SUBDEBUG, msg, *args) def debug(msg, *args): if _logger: _logger.log(DEBUG, msg, *args) def info(msg, *args): if _logger: _logger.log(INFO, msg, *args) def sub_warning(msg, *args): if _logger: _logger.log(SUBWARNING, msg, *args) def get_logger(): ''' Returns logger used by multiprocessing ''' global _logger import logging logging._acquireLock() try: if not _logger: _logger = logging.getLogger(LOGGER_NAME) _logger.propagate = 0 # XXX multiprocessing should cleanup before logging if hasattr(atexit, 'unregister'): atexit.unregister(_exit_function) atexit.register(_exit_function) else: atexit._exithandlers.remove((_exit_function, (), {})) atexit._exithandlers.append((_exit_function, (), {})) finally: logging._releaseLock() return _logger def log_to_stderr(level=None): ''' Turn on logging and add a handler which prints to stderr ''' global _log_to_stderr import logging logger = get_logger() formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) if level: logger.setLevel(level) _log_to_stderr = True return _logger # Abstract socket support def _platform_supports_abstract_sockets(): if sys.platform == "linux": return True if hasattr(sys, 'getandroidapilevel'): return True return False def is_abstract_socket_namespace(address): if not address: return False if isinstance(address, bytes): return address[0] == 0 elif isinstance(address, str): return address[0] == "\0" raise TypeError(f'address type of {address!r} unrecognized') abstract_sockets_supported = _platform_supports_abstract_sockets() # # Function returning a temp directory which will be removed on exit # def _remove_temp_dir(rmtree, tempdir): rmtree(tempdir) current_process = process.current_process() # current_process() can be None if the finalizer is called # late during Python finalization if current_process is not None: current_process._config['tempdir'] = None def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up tempdir = process.current_process()._config.get('tempdir') if tempdir is None: import shutil, tempfile tempdir = tempfile.mkdtemp(prefix='pymp-') info('created temp directory %s', tempdir) # keep a strong reference to shutil.rmtree(), since the finalizer # can be called late during Python shutdown Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), exitpriority=-100) process.current_process()._config['tempdir'] = tempdir return tempdir # # Support for reinitialization of objects when bootstrapping a child process # _afterfork_registry = weakref.WeakValueDictionary() _afterfork_counter = itertools.count() def _run_after_forkers(): items = list(_afterfork_registry.items()) items.sort() for (index, ident, func), obj in items: try: func(obj) except Exception as e: info('after forker raised exception %s', e) def register_after_fork(obj, func): _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj # # Finalization using weakrefs # _finalizer_registry = {} _finalizer_counter = itertools.count() class Finalize(object): ''' Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): if (exitpriority is not None) and not isinstance(exitpriority,int): raise TypeError( "Exitpriority ({0!r}) must be None or int, not {1!s}".format( exitpriority, type(exitpriority))) if obj is not None: self._weakref = weakref.ref(obj, self) elif exitpriority is None: raise ValueError("Without object, exitpriority cannot be None") self._callback = callback self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, next(_finalizer_counter)) self._pid = os.getpid() _finalizer_registry[self._key] = self def __call__(self, wr=None, # Need to bind these locally because the globals can have # been cleared at shutdown _finalizer_registry=_finalizer_registry, sub_debug=sub_debug, getpid=os.getpid): ''' Run the callback unless it has already been called or cancelled ''' try: del _finalizer_registry[self._key] except KeyError: sub_debug('finalizer no longer registered') else: if self._pid != getpid(): sub_debug('finalizer ignored because different process') res = None else: sub_debug('finalizer calling %s with args %s and kwargs %s', self._callback, self._args, self._kwargs) res = self._callback(*self._args, **self._kwargs) self._weakref = self._callback = self._args = \ self._kwargs = self._key = None return res def cancel(self): ''' Cancel finalization of the object ''' try: del _finalizer_registry[self._key] except KeyError: pass else: self._weakref = self._callback = self._args = \ self._kwargs = self._key = None def still_active(self): ''' Return whether this finalizer is still waiting to invoke callback ''' return self._key in _finalizer_registry def __repr__(self): try: obj = self._weakref() except (AttributeError, TypeError): obj = None if obj is None: return '<%s object, dead>' % self.__class__.__name__ x = '<%s object, callback=%s' % ( self.__class__.__name__, getattr(self._callback, '__name__', self._callback)) if self._args: x += ', args=' + str(self._args) if self._kwargs: x += ', kwargs=' + str(self._kwargs) if self._key[0] is not None: x += ', exitpriority=' + str(self._key[0]) return x + '>' def _run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''' if _finalizer_registry is None: # This function may be called after this module's globals are # destroyed. See the _exit_function function in this module for more # notes. return if minpriority is None: f = lambda p : p[0] is not None else: f = lambda p : p[0] is not None and p[0] >= minpriority # Careful: _finalizer_registry may be mutated while this function # is running (either by a GC run or by another thread). # list(_finalizer_registry) should be atomic, while # list(_finalizer_registry.items()) is not. keys = [key for key in list(_finalizer_registry) if f(key)] keys.sort(reverse=True) for key in keys: finalizer = _finalizer_registry.get(key) # key may have been removed from the registry if finalizer is not None: sub_debug('calling %s', finalizer) try: finalizer() except Exception: import traceback traceback.print_exc() if minpriority is None: _finalizer_registry.clear() # # Clean up on exit # def is_exiting(): ''' Returns true if the process is shutting down ''' return _exiting or _exiting is None _exiting = False def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=process.active_children, current_process=process.current_process): # We hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. global _exiting if not _exiting: _exiting = True info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) if current_process() is not None: # We check if the current process is None here because if # it's None, any call to ``active_children()`` will raise # an AttributeError (active_children winds up trying to # get attributes from util._current_process). One # situation where this can happen is if someone has # manipulated sys.modules, causing this module to be # garbage collected. The destructor for the module type # then replaces all values in the module dict with None. # For instance, after setuptools runs a test it replaces # sys.modules with a copy created earlier. See issues # #9775 and #15881. Also related: #4106, #9205, and # #9207. for p in active_children(): if p.daemon: info('calling terminate() for daemon %s', p.name) p._popen.terminate() for p in active_children(): info('calling join() for process %s', p.name) p.join() debug('running the remaining "atexit" finalizers') _run_finalizers() atexit.register(_exit_function) # # Some fork aware types # class ForkAwareThreadLock(object): def __init__(self): self._lock = threading.Lock() self.acquire = self._lock.acquire self.release = self._lock.release register_after_fork(self, ForkAwareThreadLock._at_fork_reinit) def _at_fork_reinit(self): self._lock._at_fork_reinit() def __enter__(self): return self._lock.__enter__() def __exit__(self, *args): return self._lock.__exit__(*args) class ForkAwareLocal(threading.local): def __init__(self): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () # # Close fds except those specified # try: MAXFD = os.sysconf("SC_OPEN_MAX") except Exception: MAXFD = 256 def close_all_fds_except(fds): fds = list(fds) + [-1, MAXFD] fds.sort() assert fds[-1] == MAXFD, 'fd too large' for i in range(len(fds) - 1): os.closerange(fds[i]+1, fds[i+1]) # # Close sys.stdin and replace stdin with os.devnull # def _close_stdin(): if sys.stdin is None: return try: sys.stdin.close() except (OSError, ValueError): pass try: fd = os.open(os.devnull, os.O_RDONLY) try: sys.stdin = open(fd, encoding="utf-8", closefd=False) except: os.close(fd) raise except (OSError, ValueError): pass # # Flush standard streams, if any # def _flush_std_streams(): try: sys.stdout.flush() except (AttributeError, ValueError): pass try: sys.stderr.flush() except (AttributeError, ValueError): pass # # Start a program with only specified fds kept open # def spawnv_passfds(path, args, passfds): import _posixsubprocess passfds = tuple(sorted(map(int, passfds))) errpipe_read, errpipe_write = os.pipe() try: return _posixsubprocess.fork_exec( args, [os.fsencode(path)], True, passfds, None, None, -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, False, False, None, None, None, -1, None) finally: os.close(errpipe_read) os.close(errpipe_write) def close_fds(*fds): """Close each file descriptor given as an argument""" for fd in fds: os.close(fd) def _cleanup_tests(): """Cleanup multiprocessing resources when multiprocessing tests completed.""" from test import support # cleanup multiprocessing process._cleanup() # Stop the ForkServer process if it's running from multiprocessing import forkserver forkserver._forkserver._stop() # Stop the ResourceTracker process if it's running from multiprocessing import resource_tracker resource_tracker._resource_tracker._stop() # bpo-37421: Explicitly call _run_finalizers() to remove immediately # temporary directories created by multiprocessing.util.get_temp_dir(). _run_finalizers() support.gc_collect() support.reap_children() usr/lib/python3.10/ctypes/util.py 0000644 00000034250 15027506357 0012622 0 ustar 00 import os import shutil import subprocess import sys # find_library(name) returns the pathname of a library, or None. if os.name == "nt": def _get_build_version(): """Return the version of MSVC that was used to build Python. For Python 2.3 and up, the version number is included in sys.version. For earlier versions, assume the compiler is MSVC 6. """ # This function was copied from Lib/distutils/msvccompiler.py prefix = "MSC v." i = sys.version.find(prefix) if i == -1: return 6 i = i + len(prefix) s, rest = sys.version[i:].split(" ", 1) majorVersion = int(s[:-2]) - 6 if majorVersion >= 13: majorVersion += 1 minorVersion = int(s[2:3]) / 10.0 # I don't think paths are affected by minor version in version 6 if majorVersion == 6: minorVersion = 0 if majorVersion >= 6: return majorVersion + minorVersion # else we don't know what version of the compiler this is return None def find_msvcrt(): """Return the name of the VC runtime dll""" version = _get_build_version() if version is None: # better be safe than sorry return None if version <= 6: clibname = 'msvcrt' elif version <= 13: clibname = 'msvcr%d' % (version * 10) else: # CRT is no longer directly loadable. See issue23606 for the # discussion about alternative approaches. return None # If python was built with in debug mode import importlib.machinery if '_d.pyd' in importlib.machinery.EXTENSION_SUFFIXES: clibname += 'd' return clibname+'.dll' def find_library(name): if name in ('c', 'm'): return find_msvcrt() # See MSDN for the REAL search order. for directory in os.environ['PATH'].split(os.pathsep): fname = os.path.join(directory, name) if os.path.isfile(fname): return fname if fname.lower().endswith(".dll"): continue fname = fname + ".dll" if os.path.isfile(fname): return fname return None elif os.name == "posix" and sys.platform == "darwin": from ctypes.macholib.dyld import dyld_find as _dyld_find def find_library(name): possible = ['lib%s.dylib' % name, '%s.dylib' % name, '%s.framework/%s' % (name, name)] for name in possible: try: return _dyld_find(name) except ValueError: continue return None elif sys.platform.startswith("aix"): # AIX has two styles of storing shared libraries # GNU auto_tools refer to these as svr4 and aix # svr4 (System V Release 4) is a regular file, often with .so as suffix # AIX style uses an archive (suffix .a) with members (e.g., shr.o, libssl.so) # see issue#26439 and _aix.py for more details from ctypes._aix import find_library elif os.name == "posix": # Andreas Degert's find functions, using gcc, /sbin/ldconfig, objdump import re, tempfile def _is_elf(filename): "Return True if the given file is an ELF file" elf_header = b'\x7fELF' with open(filename, 'br') as thefile: return thefile.read(4) == elf_header def _findLib_gcc(name): # Run GCC's linker with the -t (aka --trace) option and examine the # library name it prints out. The GCC command will fail because we # haven't supplied a proper program with main(), but that does not # matter. expr = os.fsencode(r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)) c_compiler = shutil.which('gcc') if not c_compiler: c_compiler = shutil.which('cc') if not c_compiler: # No C compiler available, give up return None temp = tempfile.NamedTemporaryFile() try: args = [c_compiler, '-Wl,-t', '-o', temp.name, '-l' + name] env = dict(os.environ) env['LC_ALL'] = 'C' env['LANG'] = 'C' try: proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) except OSError: # E.g. bad executable return None with proc: trace = proc.stdout.read() finally: try: temp.close() except FileNotFoundError: # Raised if the file was already removed, which is the normal # behaviour of GCC if linking fails pass res = re.findall(expr, trace) if not res: return None for file in res: # Check if the given file is an elf file: gcc can report # some files that are linker scripts and not actual # shared objects. See bpo-41976 for more details if not _is_elf(file): continue return os.fsdecode(file) if sys.platform == "sunos5": # use /usr/ccs/bin/dump on solaris def _get_soname(f): if not f: return None try: proc = subprocess.Popen(("/usr/ccs/bin/dump", "-Lpv", f), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) except OSError: # E.g. command not found return None with proc: data = proc.stdout.read() res = re.search(br'\[.*\]\sSONAME\s+([^\s]+)', data) if not res: return None return os.fsdecode(res.group(1)) else: def _get_soname(f): # assuming GNU binutils / ELF if not f: return None objdump = shutil.which('objdump') if not objdump: # objdump is not available, give up return None try: proc = subprocess.Popen((objdump, '-p', '-j', '.dynamic', f), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) except OSError: # E.g. bad executable return None with proc: dump = proc.stdout.read() res = re.search(br'\sSONAME\s+([^\s]+)', dump) if not res: return None return os.fsdecode(res.group(1)) if sys.platform.startswith(("freebsd", "openbsd", "dragonfly")): def _num_version(libname): # "libxyz.so.MAJOR.MINOR" => [ MAJOR, MINOR ] parts = libname.split(b".") nums = [] try: while parts: nums.insert(0, int(parts.pop())) except ValueError: pass return nums or [sys.maxsize] def find_library(name): ename = re.escape(name) expr = r':-l%s\.\S+ => \S*/(lib%s\.\S+)' % (ename, ename) expr = os.fsencode(expr) try: proc = subprocess.Popen(('/sbin/ldconfig', '-r'), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) except OSError: # E.g. command not found data = b'' else: with proc: data = proc.stdout.read() res = re.findall(expr, data) if not res: return _get_soname(_findLib_gcc(name)) res.sort(key=_num_version) return os.fsdecode(res[-1]) elif sys.platform == "sunos5": def _findLib_crle(name, is64): if not os.path.exists('/usr/bin/crle'): return None env = dict(os.environ) env['LC_ALL'] = 'C' if is64: args = ('/usr/bin/crle', '-64') else: args = ('/usr/bin/crle',) paths = None try: proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env) except OSError: # E.g. bad executable return None with proc: for line in proc.stdout: line = line.strip() if line.startswith(b'Default Library Path (ELF):'): paths = os.fsdecode(line).split()[4] if not paths: return None for dir in paths.split(":"): libfile = os.path.join(dir, "lib%s.so" % name) if os.path.exists(libfile): return libfile return None def find_library(name, is64 = False): return _get_soname(_findLib_crle(name, is64) or _findLib_gcc(name)) else: def _findSoname_ldconfig(name): import struct # XXX this code assumes that we know all unames and that a single # ABI is supported per uname; instead we should find what the # ABI is (e.g. check ABI of current process) or simply ask libc # to load the library for us uname = os.uname() # ARM has a variety of unames, e.g. armv7l if uname.machine.startswith("arm"): machine = "arm" if struct.calcsize('l') == 4: machine = uname.machine + '-32' else: machine = uname.machine + '-64' mach_map = { 'x86_64-64': 'libc6,x86-64', 'ppc64-64': 'libc6,64bit', 'sparc64-64': 'libc6,64bit', 's390x-64': 'libc6,64bit', 'ia64-64': 'libc6,IA-64', # this actually breaks on biarch or multiarch as the first # library wins; uname doesn't tell us which ABI we're using 'arm-32': 'libc6(,hard-float)?', } abi_type = mach_map.get(machine, 'libc6') # XXX assuming GLIBC's ldconfig (with option -p) regex = r'\s+(lib%s\.[^\s]+)\s+\(%s' regex = os.fsencode(regex % (re.escape(name), abi_type)) try: with subprocess.Popen(['/sbin/ldconfig', '-p'], stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, env={'LC_ALL': 'C', 'LANG': 'C'}) as p: res = re.search(regex, p.stdout.read()) if res: return os.fsdecode(res.group(1)) except OSError: pass def _findLib_ld(name): # See issue #9998 for why this is needed expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) cmd = ['ld', '-t'] libpath = os.environ.get('LD_LIBRARY_PATH') if libpath: for d in libpath.split(':'): cmd.extend(['-L', d]) cmd.extend(['-o', os.devnull, '-l%s' % name]) result = None try: p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out, _ = p.communicate() res = re.findall(expr, os.fsdecode(out)) for file in res: # Check if the given file is an elf file: gcc can report # some files that are linker scripts and not actual # shared objects. See bpo-41976 for more details if not _is_elf(file): continue return os.fsdecode(file) except Exception: pass # result will be None return result def find_library(name): # See issue #9998 return _findSoname_ldconfig(name) or \ _get_soname(_findLib_gcc(name)) or _get_soname(_findLib_ld(name)) ################################################################ # test code def test(): from ctypes import cdll if os.name == "nt": print(cdll.msvcrt) print(cdll.load("msvcrt")) print(find_library("msvcrt")) if os.name == "posix": # find and load_version print(find_library("m")) print(find_library("c")) print(find_library("bz2")) # load if sys.platform == "darwin": print(cdll.LoadLibrary("libm.dylib")) print(cdll.LoadLibrary("libcrypto.dylib")) print(cdll.LoadLibrary("libSystem.dylib")) print(cdll.LoadLibrary("System.framework/System")) # issue-26439 - fix broken test call for AIX elif sys.platform.startswith("aix"): from ctypes import CDLL if sys.maxsize < 2**32: print(f"Using CDLL(name, os.RTLD_MEMBER): {CDLL('libc.a(shr.o)', os.RTLD_MEMBER)}") print(f"Using cdll.LoadLibrary(): {cdll.LoadLibrary('libc.a(shr.o)')}") # librpm.so is only available as 32-bit shared library print(find_library("rpm")) print(cdll.LoadLibrary("librpm.so")) else: print(f"Using CDLL(name, os.RTLD_MEMBER): {CDLL('libc.a(shr_64.o)', os.RTLD_MEMBER)}") print(f"Using cdll.LoadLibrary(): {cdll.LoadLibrary('libc.a(shr_64.o)')}") print(f"crypt\t:: {find_library('crypt')}") print(f"crypt\t:: {cdll.LoadLibrary(find_library('crypt'))}") print(f"crypto\t:: {find_library('crypto')}") print(f"crypto\t:: {cdll.LoadLibrary(find_library('crypto'))}") else: print(cdll.LoadLibrary("libm.so")) print(cdll.LoadLibrary("libcrypt.so")) print(find_library("crypt")) if __name__ == "__main__": test()
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings