File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/creator.py.tar
Back
usr/local/lib/python3.10/dist-packages/virtualenv/create/creator.py 0000644 00000021765 15030020104 0021226 0 ustar 00 from __future__ import annotations import json import logging import os import sys import textwrap from abc import ABC, abstractmethod from argparse import ArgumentTypeError from ast import literal_eval from collections import OrderedDict from pathlib import Path from virtualenv.discovery.cached_py_info import LogCmd from virtualenv.util.path import safe_delete from virtualenv.util.subprocess import run_cmd from virtualenv.version import __version__ from .pyenv_cfg import PyEnvCfg HERE = Path(os.path.abspath(__file__)).parent DEBUG_SCRIPT = HERE / "debug.py" LOGGER = logging.getLogger(__name__) class CreatorMeta: def __init__(self) -> None: self.error = None class Creator(ABC): """A class that given a python Interpreter creates a virtual environment.""" def __init__(self, options, interpreter) -> None: """ Construct a new virtual environment creator. :param options: the CLI option as parsed from :meth:`add_parser_arguments` :param interpreter: the interpreter to create virtual environment from """ self.interpreter = interpreter self._debug = None self.dest = Path(options.dest) self.clear = options.clear self.no_vcs_ignore = options.no_vcs_ignore self.pyenv_cfg = PyEnvCfg.from_folder(self.dest) self.app_data = options.app_data self.env = options.env def __repr__(self) -> str: return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in self._args())})" def _args(self): return [ ("dest", str(self.dest)), ("clear", self.clear), ("no_vcs_ignore", self.no_vcs_ignore), ] @classmethod def can_create(cls, interpreter): # noqa: ARG003 """ Determine if we can create a virtual environment. :param interpreter: the interpreter in question :return: ``None`` if we can't create, any other object otherwise that will be forwarded to \ :meth:`add_parser_arguments` """ return True @classmethod def add_parser_arguments(cls, parser, interpreter, meta, app_data): # noqa: ARG003 """ Add CLI arguments for the creator. :param parser: the CLI parser :param app_data: the application data folder :param interpreter: the interpreter we're asked to create virtual environment for :param meta: value as returned by :meth:`can_create` """ parser.add_argument( "dest", help="directory to create virtualenv at", type=cls.validate_dest, ) parser.add_argument( "--clear", dest="clear", action="store_true", help="remove the destination directory if exist before starting (will overwrite files otherwise)", default=False, ) parser.add_argument( "--no-vcs-ignore", dest="no_vcs_ignore", action="store_true", help="don't create VCS ignore directive in the destination directory", default=False, ) @abstractmethod def create(self): """Perform the virtual environment creation.""" raise NotImplementedError @classmethod def validate_dest(cls, raw_value): # noqa: C901 """No path separator in the path, valid chars and must be write-able.""" def non_write_able(dest, value): common = Path(*os.path.commonprefix([value.parts, dest.parts])) msg = f"the destination {dest.relative_to(common)} is not write-able at {common}" raise ArgumentTypeError(msg) # the file system must be able to encode # note in newer CPython this is always utf-8 https://www.python.org/dev/peps/pep-0529/ encoding = sys.getfilesystemencoding() refused = OrderedDict() kwargs = {"errors": "ignore"} if encoding != "mbcs" else {} for char in str(raw_value): try: trip = char.encode(encoding, **kwargs).decode(encoding) if trip == char: continue raise ValueError(trip) # noqa: TRY301 except ValueError: refused[char] = None if refused: bad = "".join(refused.keys()) msg = f"the file system codec ({encoding}) cannot handle characters {bad!r} within {raw_value!r}" raise ArgumentTypeError(msg) if os.pathsep in raw_value: msg = ( f"destination {raw_value!r} must not contain the path separator ({os.pathsep})" f" as this would break the activation scripts" ) raise ArgumentTypeError(msg) value = Path(raw_value) if value.exists() and value.is_file(): msg = f"the destination {value} already exists and is a file" raise ArgumentTypeError(msg) dest = Path(os.path.abspath(str(value))).resolve() # on Windows absolute does not imply resolve so use both value = dest while dest: if dest.exists(): if os.access(str(dest), os.W_OK): break non_write_able(dest, value) base, _ = dest.parent, dest.name if base == dest: non_write_able(dest, value) # pragma: no cover dest = base return str(value) def run(self): if self.dest.exists() and self.clear: LOGGER.debug("delete %s", self.dest) safe_delete(self.dest) self.create() self.add_cachedir_tag() self.set_pyenv_cfg() if not self.no_vcs_ignore: self.setup_ignore_vcs() def add_cachedir_tag(self): """Generate a file indicating that this is not meant to be backed up.""" cachedir_tag_file = self.dest / "CACHEDIR.TAG" if not cachedir_tag_file.exists(): cachedir_tag_text = textwrap.dedent(""" Signature: 8a477f597d28d172789f06886806bc55 # This file is a cache directory tag created by Python virtualenv. # For information about cache directory tags, see: # https://bford.info/cachedir/ """).strip() cachedir_tag_file.write_text(cachedir_tag_text, encoding="utf-8") def set_pyenv_cfg(self): self.pyenv_cfg.content = OrderedDict() self.pyenv_cfg["home"] = os.path.dirname(os.path.abspath(self.interpreter.system_executable)) self.pyenv_cfg["implementation"] = self.interpreter.implementation self.pyenv_cfg["version_info"] = ".".join(str(i) for i in self.interpreter.version_info) self.pyenv_cfg["virtualenv"] = __version__ def setup_ignore_vcs(self): """Generate ignore instructions for version control systems.""" # mark this folder to be ignored by VCS, handle https://www.python.org/dev/peps/pep-0610/#registered-vcs git_ignore = self.dest / ".gitignore" if not git_ignore.exists(): git_ignore.write_text("# created by virtualenv automatically\n*\n", encoding="utf-8") # Mercurial - does not support the .hgignore file inside a subdirectory directly, but only if included via the # subinclude directive from root, at which point on might as well ignore the directory itself, see # https://www.selenic.com/mercurial/hgignore.5.html for more details # Bazaar - does not support ignore files in sub-directories, only at root level via .bzrignore # Subversion - does not support ignore files, requires direct manipulation with the svn tool @property def debug(self): """:return: debug information about the virtual environment (only valid after :meth:`create` has run)""" if self._debug is None and self.exe is not None: self._debug = get_env_debug_info(self.exe, self.debug_script(), self.app_data, self.env) return self._debug @staticmethod def debug_script(): return DEBUG_SCRIPT def get_env_debug_info(env_exe, debug_script, app_data, env): env = env.copy() env.pop("PYTHONPATH", None) with app_data.ensure_extracted(debug_script) as debug_script_extracted: cmd = [str(env_exe), str(debug_script_extracted)] LOGGER.debug("debug via %r", LogCmd(cmd)) code, out, err = run_cmd(cmd) try: if code != 0: if out: result = literal_eval(out) else: if code == 2 and "file" in err: # noqa: PLR2004 # Re-raise FileNotFoundError from `run_cmd()` raise OSError(err) # noqa: TRY301 raise Exception(err) # noqa: TRY002, TRY301 else: result = json.loads(out) if err: result["err"] = err except Exception as exception: # noqa: BLE001 return {"out": out, "err": err, "returncode": code, "exception": repr(exception)} if "sys" in result and "path" in result["sys"]: del result["sys"]["path"][0] return result __all__ = [ "Creator", "CreatorMeta", ] usr/lib/python3/dist-packages/CommandNotFound/db/creator.py 0000644 00000027637 15030036462 0017715 0 ustar 00 #!/usr/bin/python3 import errno import json import logging import os import sqlite3 import subprocess import sys import time import apt_pkg apt_pkg.init() # TODO: # - add apt.conf.d snippet for download handling # - add apt::update::post-invoke-success handler component_priorities = { 'main': 120, 'universe': 100, 'contrib': 80, 'restricted': 60, 'non-free': 40, 'multiverse': 20, } # pkgnames in here are blacklisted create_db_sql=""" CREATE TABLE IF NOT EXISTS "commands" ( [cmdID] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, [pkgID] INTEGER NOT NULL, [command] TEXT, FOREIGN KEY ([pkgID]) REFERENCES "pkgs" ([pkgID]) ); CREATE TABLE IF NOT EXISTS "packages" ( [pkgID] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, [name] TEXT, [version] TEXT, [component] TEXT, [priority] INTEGER ); CREATE INDEX IF NOT EXISTS idx_commands_command ON commands (command); CREATE INDEX IF NOT EXISTS idx_packages_name ON packages (name); """ # FIXME: # - add support for foreign arch in DB (add foreign commands if there # is not native command) # - addd support for -backports: pkgname must be appended with /backports # and only be shown if there is no other command available # - do not re-create the DB everytime, only if sources.list changed # - add "last-mtime" into the DB, then we can skip all packages files # where the mtime is older and we only need to update the DB class measure: def __init__(self, what, stats): self.what = what self.stats = stats def __enter__(self): self.now = time.time() def __exit__(self, *args): if not self.what in self.stats: self.stats[self.what] = 0 self.stats[self.what] += time.time() - self.now def rm_f(path): try: os.remove(path) except OSError as e: if e.errno != errno.ENOENT: raise class DbCreator: def __init__(self, files): self.files = files self.primary_arch = apt_pkg.get_architectures()[0] self.stats = {"total": 0,"total_time": time.time()} def create(self, dbname): metadata_file = dbname+".metadata" if not self._db_update_needed(metadata_file): logging.info( "%s does not require an update (inputs unchanged)", dbname) return tmpdb = dbname+".tmp" with sqlite3.connect(tmpdb) as con: try: con.executescript(create_db_sql) self._fill_commands(con) except sqlite3.OperationalError as e: # There might be a parallel cnf-update-db process, updating the # tmpdb. Just stop execution in this case, as the other process # should produce the correct result. if str(e) == "database is locked": logging.warning( "%s is locked by another process. Ignoring.", tmpdb) sys.exit(0) else: raise e # remove now stale metadata rm_f(metadata_file) # put database in place os.rename(tmpdb, dbname) # add new metadata with open(metadata_file, "w") as fp: json.dump(self._calc_input_metadata(), fp) def _db_update_needed(self, metadata_file): if not os.path.exists(metadata_file): return True try: with open(metadata_file) as fp: meta = json.load(fp) return meta != self._calc_input_metadata() except Exception as e: logging.warning("cannot read %s: %s", metadata_file, e) return True def _calc_input_metadata(self): meta = {} for p in self.files: st = os.stat(p) meta[p] = { 'st_ino': st.st_ino, 'st_dev': st.st_dev, 'st_uid': st.st_uid, 'st_gid': st.st_gid, 'st_size': st.st_size, 'st_mtime': st.st_mtime, } return meta def _fill_commands(self, con): for f in self.files: with subprocess.Popen(["/usr/lib/apt/apt-helper", "cat-file", f], stdout=subprocess.PIPE) as sub: if "Contents" in f: self._parse_single_contents_file(con, f, sub.stdout) else: self._parse_single_commands_file(con, sub.stdout) if sub.wait() != 0: raise subprocess.CalledProcessError(returncode=sub.returncode, cmd="/usr/lib/apt/apt-helper cat-file {}".format(f)) self.stats["total_time"] = time.time() - self.stats["total_time"] logging.info("processed %i packages in %.2fs" % ( self.stats["total"], self.stats["total_time"])) def _in_db(self, con, command, pkgname): already_in_db = con.execute( """ SELECT packages.pkgID, name, version FROM commands INNER JOIN packages on packages.pkgID = commands.pkgID WHERE commands.command=? AND packages.name=?; """, (command, pkgname)).fetchone() return already_in_db def _delete_pkgid(self, con, pkgid): con.execute("DELETE FROM packages WHERE pkgID=?", (pkgid,) ) con.execute("DELETE FROM commands WHERE pkgID=?", (pkgid,) ) def _get_pkgid(self, con, pkgname): have_pkg = con.execute( "SELECT pkgID from packages WHERE name=?", (pkgname,)).fetchone() if have_pkg: return have_pkg[0] return None def _insert_package(self, con, pkgname, version, component, priority): cur=con.execute(""" INSERT INTO packages (name, version, component, priority) VALUES (?, ?, ?, ?); """, (pkgname, version, component, priority)) return cur.lastrowid def _insert_command(self, con, command, pkg_id): con.execute(""" INSERT INTO commands (command, pkgID) VALUES (?, ?); """, (command, pkg_id)) def _parse_single_commands_file(self, con, fp): tagf = apt_pkg.TagFile(fp) # file empty if not tagf.step(): return # read header suite=tagf.section["suite"] # FIXME: support backports if suite.endswith("-backports"): return component=tagf.section["component"] arch=tagf.section["arch"] # FIXME: add code for secondary arch handling! if arch != "all" and arch != self.primary_arch: return # step over the pkgs while tagf.step(): self.stats["total"] += 1 pkgname=tagf.section["name"] # allow to override the viisble pkgname to accomodate for # cases like "python2.7" which is part of python2.7-minimal # but users should just install python2.7 if tagf.section.get("visible-pkgname"): pkgname = tagf.section["visible-pkgname"] version=tagf.section.get("version", "") ignore_commands=set() if tagf.section.get("ignore-commands", ""): ignore_commands=set(tagf.section.get("ignore-commands", "").split(",")) for command in tagf.section["commands"].split(","): if command in ignore_commands: continue # see if we have the command already with measure("sql_already_db", self.stats): already_in_db=self._in_db(con, command, pkgname) if already_in_db: # we found a version that is higher what we have # in the DB -> remove current, insert higher if apt_pkg.version_compare(version, already_in_db[2]) > 0: logging.debug("replacing exiting %s in DB (higher version)" % command) with measure("sql_delete_already_in_db", self.stats): self._delete_pkgid(con, already_in_db[0]) else: logging.debug("skipping %s from %s (lower/same version)" % (command, suite)) continue logging.debug("adding %s from %s/%s (%s)" % ( command, pkgname, version, suite)) # insert new data with measure("sql_have_pkg", self.stats): pkg_id = self._get_pkgid(con, pkgname) if not pkg_id: priority = component_priorities[component] priority += int(tagf.section.get("priority-bonus", "0")) with measure("sql_insert_pkg", self.stats): pkg_id = self._insert_package(con, pkgname, version, component, priority) with measure("sql_insert_cmd", self.stats): self._insert_command(con, command, pkg_id) def _parse_single_contents_file(self, con, f, fp): # read header suite=None # FIXME for l in fp: l = l.decode("utf-8") if not (l.startswith('usr/sbin') or l.startswith('usr/bin') or l.startswith('bin') or l.startswith('sbin')): continue try: command, pkgnames = l.split(None, 1) except ValueError: continue command = os.path.basename(command) for pkgname in pkgnames.split(','): try: section, pkgname = pkgname.strip().rsplit('/', 1) except ValueError: pkgname = pkgname.strip() section = "unknown" if len(section.split('/')) == 2: component, section = section.split('/') else: component = 'main' # FIXME - Don't really know how. version = None # see if we have the command already with measure("sql_already_db", self.stats): already_in_db=self._in_db(con, command, pkgname) if already_in_db: # we found a version that is higher what we have # in the DB -> remove current, insert higher if False and apt_pkg.version_compare(version, already_in_db[2]) > 0: logging.debug("replacing exiting %s in DB (higher version)" % command) with measure("sql_delete_already_in_db", self.stats): self._delete_pkgid(con, already_in_db[0]) else: logging.debug("skipping %s from %s (lower/same version)" % (command, suite)) continue logging.debug("adding %s from %s/%s (%s)" % ( command, pkgname, version, suite)) # insert new data with measure("sql_have_pkg", self.stats): pkg_id = self._get_pkgid(con, pkgname) if not pkg_id: priority = component_priorities[component] with measure("sql_insert_pkg", self.stats): pkg_id = self._insert_package(con, pkgname, version, component, priority) with measure("sql_insert_cmd", self.stats): self._insert_command(con, command, pkg_id) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) if len(sys.argv) < 3: print("usage: %s <output-db-path> <files...>" % sys.argv[0]) print(" e.g.: %s commands.db ./dists/*/*/*/Commands-*" % sys.argv[0]) print(" e.g.: %s /var/lib/command-not-found/commands.db /var/lib/apt/lists/*Commands-*", sys.argv[0]) sys.exit(1) col = DbCreator(sys.argv[2:]) col.create(sys.argv[1]) for stat, amount in col.stats.items(): logging.debug("%s: %s" % (stat, amount))
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings