File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/utils.tar
Back
fnmatch.py 0000644 00000006337 15027746032 0006555 0 ustar 00 """Filename matching with shell patterns. fnmatch(FILENAME, PATTERN) matches according to the local convention. fnmatchcase(FILENAME, PATTERN) always takes case in account. The functions operate by translating the pattern into a regular expression. They cache the compiled regular expressions for speed. The function translate(PATTERN) returns a regular expression corresponding to PATTERN. (It does not compile it.) """ import re __all__ = ["fnmatch", "fnmatchcase", "translate"] _cache = {} _MAXCACHE = 100 def _purge(): """Clear the pattern cache""" _cache.clear() def fnmatch(name, pat): """Test whether FILENAME matches PATTERN. Patterns are Unix shell style: * matches everything ? matches any single character [seq] matches any character in seq [!seq] matches any char not in seq An initial period in FILENAME is not special. Both FILENAME and PATTERN are first case-normalized if the operating system requires it. If you don't want this, use fnmatchcase(FILENAME, PATTERN). """ name = name.lower() pat = pat.lower() return fnmatchcase(name, pat) def fnmatchcase(name, pat): """Test whether FILENAME matches PATTERN, including case. This is a version of fnmatch() which doesn't case-normalize its arguments. """ try: re_pat = _cache[pat] except KeyError: res = translate(pat) if len(_cache) >= _MAXCACHE: _cache.clear() _cache[pat] = re_pat = re.compile(res) return re_pat.match(name) is not None def translate(pat): """Translate a shell PATTERN to a regular expression. There is no way to quote meta-characters. """ i, n = 0, len(pat) res = '^' while i < n: c = pat[i] i = i + 1 if c == '*': if i < n and pat[i] == '*': # is some flavor of "**" i = i + 1 # Treat **/ as ** so eat the "/" if i < n and pat[i] == '/': i = i + 1 if i >= n: # is "**EOF" - to align with .gitignore just accept all res = f"{res}.*" else: # is "**" # Note that this allows for any # of /'s (even 0) because # the .* will eat everything, even /'s res = f"{res}(.*/)?" else: # is "*" so map it to anything but "/" res = f"{res}[^/]*" elif c == '?': # "?" is any char except "/" res = f"{res}[^/]" elif c == '[': j = i if j < n and pat[j] == '!': j = j + 1 if j < n and pat[j] == ']': j = j + 1 while j < n and pat[j] != ']': j = j + 1 if j >= n: res = f"{res}\\[" else: stuff = pat[i:j].replace('\\', '\\\\') i = j + 1 if stuff[0] == '!': stuff = f"^{stuff[1:]}" elif stuff[0] == '^': stuff = f"\\{stuff}" res = f'{res}[{stuff}]' else: res = res + re.escape(c) return f"{res}$" socket.py 0000644 00000011721 15027746032 0006416 0 ustar 00 import errno import os import select import socket as pysocket import struct try: from ..transport import NpipeSocket except ImportError: NpipeSocket = type(None) STDOUT = 1 STDERR = 2 class SocketError(Exception): pass # NpipeSockets have their own error types # pywintypes.error: (109, 'ReadFile', 'The pipe has been ended.') NPIPE_ENDED = 109 def read(socket, n=4096): """ Reads at most n bytes from socket """ recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK) if not isinstance(socket, NpipeSocket): if not hasattr(select, "poll"): # Limited to 1024 select.select([socket], [], []) else: poll = select.poll() poll.register(socket, select.POLLIN | select.POLLPRI) poll.poll() try: if hasattr(socket, 'recv'): return socket.recv(n) if isinstance(socket, pysocket.SocketIO): return socket.read(n) return os.read(socket.fileno(), n) except OSError as e: if e.errno not in recoverable_errors: raise except Exception as e: is_pipe_ended = (isinstance(socket, NpipeSocket) and len(e.args) > 0 and e.args[0] == NPIPE_ENDED) if is_pipe_ended: # npipes don't support duplex sockets, so we interpret # a PIPE_ENDED error as a close operation (0-length read). return '' raise def read_exactly(socket, n): """ Reads exactly n bytes from socket Raises SocketError if there isn't enough data """ data = b"" while len(data) < n: next_data = read(socket, n - len(data)) if not next_data: raise SocketError("Unexpected EOF") data += next_data return data def next_frame_header(socket): """ Returns the stream and size of the next frame of data waiting to be read from socket, according to the protocol defined here: https://docs.docker.com/engine/api/v1.24/#attach-to-a-container """ try: data = read_exactly(socket, 8) except SocketError: return (-1, -1) stream, actual = struct.unpack('>BxxxL', data) return (stream, actual) def frames_iter(socket, tty): """ Return a generator of frames read from socket. A frame is a tuple where the first item is the stream number and the second item is a chunk of data. If the tty setting is enabled, the streams are multiplexed into the stdout stream. """ if tty: return ((STDOUT, frame) for frame in frames_iter_tty(socket)) else: return frames_iter_no_tty(socket) def frames_iter_no_tty(socket): """ Returns a generator of data read from the socket when the tty setting is not enabled. """ while True: (stream, n) = next_frame_header(socket) if n < 0: break while n > 0: result = read(socket, n) if result is None: continue data_length = len(result) if data_length == 0: # We have reached EOF return n -= data_length yield (stream, result) def frames_iter_tty(socket): """ Return a generator of data read from the socket when the tty setting is enabled. """ while True: result = read(socket) if len(result) == 0: # We have reached EOF return yield result def consume_socket_output(frames, demux=False): """ Iterate through frames read from the socket and return the result. Args: demux (bool): If False, stdout and stderr are multiplexed, and the result is the concatenation of all the frames. If True, the streams are demultiplexed, and the result is a 2-tuple where each item is the concatenation of frames belonging to the same stream. """ if demux is False: # If the streams are multiplexed, the generator returns strings, that # we just need to concatenate. return b"".join(frames) # If the streams are demultiplexed, the generator yields tuples # (stdout, stderr) out = [None, None] for frame in frames: # It is guaranteed that for each frame, one and only one stream # is not None. assert frame != (None, None) if frame[0] is not None: if out[0] is None: out[0] = frame[0] else: out[0] += frame[0] else: if out[1] is None: out[1] = frame[1] else: out[1] += frame[1] return tuple(out) def demux_adaptor(stream_id, data): """ Utility to demultiplex stdout and stderr when reading frames from the socket. """ if stream_id == STDOUT: return (data, None) elif stream_id == STDERR: return (None, data) else: raise ValueError(f'{stream_id} is not a valid stream') proxy.py 0000644 00000004306 15027746032 0006310 0 ustar 00 from .utils import format_environment class ProxyConfig(dict): ''' Hold the client's proxy configuration ''' @property def http(self): return self.get('http') @property def https(self): return self.get('https') @property def ftp(self): return self.get('ftp') @property def no_proxy(self): return self.get('no_proxy') @staticmethod def from_dict(config): ''' Instantiate a new ProxyConfig from a dictionary that represents a client configuration, as described in `the documentation`_. .. _the documentation: https://docs.docker.com/network/proxy/#configure-the-docker-client ''' return ProxyConfig( http=config.get('httpProxy'), https=config.get('httpsProxy'), ftp=config.get('ftpProxy'), no_proxy=config.get('noProxy'), ) def get_environment(self): ''' Return a dictionary representing the environment variables used to set the proxy settings. ''' env = {} if self.http: env['http_proxy'] = env['HTTP_PROXY'] = self.http if self.https: env['https_proxy'] = env['HTTPS_PROXY'] = self.https if self.ftp: env['ftp_proxy'] = env['FTP_PROXY'] = self.ftp if self.no_proxy: env['no_proxy'] = env['NO_PROXY'] = self.no_proxy return env def inject_proxy_environment(self, environment): ''' Given a list of strings representing environment variables, prepend the environment variables corresponding to the proxy settings. ''' if not self: return environment proxy_env = format_environment(self.get_environment()) if not environment: return proxy_env # It is important to prepend our variables, because we want the # variables defined in "environment" to take precedence. return proxy_env + environment def __str__(self): return ( 'ProxyConfig(' f'http={self.http}, https={self.https}, ' f'ftp={self.ftp}, no_proxy={self.no_proxy}' ')' ) ports.py 0000644 00000005357 15027746032 0006305 0 ustar 00 import re PORT_SPEC = re.compile( "^" # Match full string "(" # External part r"(\[?(?P<host>[a-fA-F\d.:]+)\]?:)?" # Address r"(?P<ext>[\d]*)(-(?P<ext_end>[\d]+))?:" # External range ")?" r"(?P<int>[\d]+)(-(?P<int_end>[\d]+))?" # Internal range "(?P<proto>/(udp|tcp|sctp))?" # Protocol "$" # Match full string ) def add_port_mapping(port_bindings, internal_port, external): if internal_port in port_bindings: port_bindings[internal_port].append(external) else: port_bindings[internal_port] = [external] def add_port(port_bindings, internal_port_range, external_range): if external_range is None: for internal_port in internal_port_range: add_port_mapping(port_bindings, internal_port, None) else: ports = zip(internal_port_range, external_range) for internal_port, external_port in ports: add_port_mapping(port_bindings, internal_port, external_port) def build_port_bindings(ports): port_bindings = {} for port in ports: internal_port_range, external_range = split_port(port) add_port(port_bindings, internal_port_range, external_range) return port_bindings def _raise_invalid_port(port): raise ValueError('Invalid port "%s", should be ' '[[remote_ip:]remote_port[-remote_port]:]' 'port[/protocol]' % port) def port_range(start, end, proto, randomly_available_port=False): if not start: return start if not end: return [start + proto] if randomly_available_port: return [f"{start}-{end}{proto}"] return [str(port) + proto for port in range(int(start), int(end) + 1)] def split_port(port): if hasattr(port, 'legacy_repr'): # This is the worst hack, but it prevents a bug in Compose 1.14.0 # https://github.com/docker/docker-py/issues/1668 # TODO: remove once fixed in Compose stable port = port.legacy_repr() port = str(port) match = PORT_SPEC.match(port) if match is None: _raise_invalid_port(port) parts = match.groupdict() host = parts['host'] proto = parts['proto'] or '' internal = port_range(parts['int'], parts['int_end'], proto) external = port_range( parts['ext'], parts['ext_end'], '', len(internal) == 1) if host is None: if external is not None and len(internal) != len(external): raise ValueError('Port ranges don\'t match in length') return internal, external else: if not external: external = [None] * len(internal) elif len(internal) != len(external): raise ValueError('Port ranges don\'t match in length') return internal, [(host, ext_port) for ext_port in external] decorators.py 0000644 00000002724 15027746032 0007276 0 ustar 00 import functools from .. import errors from . import utils def check_resource(resource_name): def decorator(f): @functools.wraps(f) def wrapped(self, resource_id=None, *args, **kwargs): if resource_id is None and kwargs.get(resource_name): resource_id = kwargs.pop(resource_name) if isinstance(resource_id, dict): resource_id = resource_id.get('Id', resource_id.get('ID')) if not resource_id: raise errors.NullResource( 'Resource ID was not provided' ) return f(self, resource_id, *args, **kwargs) return wrapped return decorator def minimum_version(version): def decorator(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): if utils.version_lt(self._version, version): raise errors.InvalidVersion( f'{f.__name__} is not available for version < {version}', ) return f(self, *args, **kwargs) return wrapper return decorator def update_headers(f): def inner(self, *args, **kwargs): if 'HttpHeaders' in self._general_configs: if not kwargs.get('headers'): kwargs['headers'] = self._general_configs['HttpHeaders'] else: kwargs['headers'].update(self._general_configs['HttpHeaders']) return f(self, *args, **kwargs) return inner __pycache__/decorators.cpython-310.pyc 0000644 00000003505 15027746032 0013633 0 ustar 00 o �h� � @ s<