File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/progress.zip
Back
PK �A�Z��<�( �( text.pynu �[��� # Copyright (c) 2009 Julian Andres Klode <jak@debian.org> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA """Progress reporting for text interfaces.""" from __future__ import print_function import io import os import signal import sys import types from typing import Callable, Optional, Union import apt_pkg from apt.progress import base __all__ = ['AcquireProgress', 'CdromProgress', 'OpProgress'] def _(msg): # type: (str) -> str """Translate the message, also try apt if translation is missing.""" res = apt_pkg.gettext(msg) if res == msg: res = apt_pkg.gettext(msg, "apt") return res class TextProgress(object): """Internal Base class for text progress classes.""" def __init__(self, outfile=None): # type: (Optional[io.TextIOBase]) -> None self._file = outfile or sys.stdout self._width = 0 def _write(self, msg, newline=True, maximize=False): # type: (str, bool, bool) -> None """Write the message on the terminal, fill remaining space.""" self._file.write("\r") self._file.write(msg) # Fill remaining stuff with whitespace if self._width > len(msg): self._file.write((self._width - len(msg)) * ' ') elif maximize: # Needed for OpProgress. self._width = max(self._width, len(msg)) if newline: self._file.write("\n") else: #self._file.write("\r") self._file.flush() class OpProgress(base.OpProgress, TextProgress): """Operation progress reporting. This closely resembles OpTextProgress in libapt-pkg. """ def __init__(self, outfile=None): # type: (Optional[io.TextIOBase]) -> None TextProgress.__init__(self, outfile) base.OpProgress.__init__(self) self.old_op = "" def update(self, percent=None): # type: (Optional[float]) -> None """Called periodically to update the user interface.""" base.OpProgress.update(self, percent) if self.major_change and self.old_op: self._write(self.old_op) self._write("%s... %i%%\r" % (self.op, self.percent), False, True) self.old_op = self.op def done(self): # type: () -> None """Called once an operation has been completed.""" base.OpProgress.done(self) if self.old_op: self._write(_("%c%s... Done") % ('\r', self.old_op), True, True) self.old_op = "" class AcquireProgress(base.AcquireProgress, TextProgress): """AcquireProgress for the text interface.""" def __init__(self, outfile=None): # type: (Optional[io.TextIOBase]) -> None TextProgress.__init__(self, outfile) base.AcquireProgress.__init__(self) self._signal = None # type: Union[Callable[[int, Optional[types.FrameType]], None], int, signal.Handlers, None] # noqa self._width = 80 self._id = 1 def start(self): # type: () -> None """Start an Acquire progress. In this case, the function sets up a signal handler for SIGWINCH, i.e. window resize signals. And it also sets id to 1. """ base.AcquireProgress.start(self) self._signal = signal.signal(signal.SIGWINCH, self._winch) # Get the window size. self._winch() self._id = 1 def _winch(self, *dummy): # type: (object) -> None """Signal handler for window resize signals.""" if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()): import fcntl import termios import struct buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b' ') # noqa dummy, col, dummy, dummy = struct.unpack('hhhh', buf) self._width = col - 1 # 1 for the cursor def ims_hit(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Called when an item is update (e.g. not modified on the server).""" base.AcquireProgress.ims_hit(self, item) line = _('Hit ') + item.description if item.owner.filesize: line += ' [%sB]' % apt_pkg.size_to_str(item.owner.filesize) self._write(line) def fail(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Called when an item is failed.""" base.AcquireProgress.fail(self, item) if item.owner.status == item.owner.STAT_DONE: self._write(_("Ign ") + item.description) else: self._write(_("Err ") + item.description) self._write(" %s" % item.owner.error_text) def fetch(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Called when some of the item's data is fetched.""" base.AcquireProgress.fetch(self, item) # It's complete already (e.g. Hit) if item.owner.complete: return item.owner.id = self._id self._id += 1 line = _("Get:") + "%s %s" % (item.owner.id, item.description) if item.owner.filesize: line += (" [%sB]" % apt_pkg.size_to_str(item.owner.filesize)) self._write(line) def pulse(self, owner): # type: (apt_pkg.Acquire) -> bool """Periodically invoked while the Acquire process is underway. Return False if the user asked to cancel the whole Acquire process.""" base.AcquireProgress.pulse(self, owner) # only show progress on a tty to not clutter log files etc if (hasattr(self._file, "fileno") and not os.isatty(self._file.fileno())): return True # calculate progress percent = (((self.current_bytes + self.current_items) * 100.0) / float(self.total_bytes + self.total_items)) shown = False tval = '%i%%' % percent end = "" if self.current_cps: eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps) end = " %sB/s %s" % (apt_pkg.size_to_str(self.current_cps), apt_pkg.time_to_str(eta)) for worker in owner.workers: val = '' if not worker.current_item: if worker.status: val = ' [%s]' % worker.status if len(tval) + len(val) + len(end) >= self._width: break tval += val shown = True continue shown = True if worker.current_item.owner.id: val += " [%i %s" % (worker.current_item.owner.id, worker.current_item.shortdesc) else: val += ' [%s' % worker.current_item.description if worker.current_item.owner.active_subprocess: val += ' %s' % worker.current_item.owner.active_subprocess val += ' %sB' % apt_pkg.size_to_str(worker.current_size) # Add the total size and percent if worker.total_size and not worker.current_item.owner.complete: val += "/%sB %i%%" % ( apt_pkg.size_to_str(worker.total_size), worker.current_size * 100.0 / worker.total_size) val += ']' if len(tval) + len(val) + len(end) >= self._width: # Display as many items as screen width break else: tval += val if not shown: tval += _(" [Working]") if self.current_cps: tval += (self._width - len(end) - len(tval)) * ' ' + end self._write(tval, False) return True def media_change(self, medium, drive): # type: (str, str) -> bool """Prompt the user to change the inserted removable media.""" base.AcquireProgress.media_change(self, medium, drive) self._write(_("Media change: please insert the disc labeled\n" " '%s'\n" "in the drive '%s' and press enter\n") % (medium, drive)) return input() not in ('c', 'C') def stop(self): # type: () -> None """Invoked when the Acquire process stops running.""" base.AcquireProgress.stop(self) # Trick for getting a translation from apt self._write((_("Fetched %sB in %s (%sB/s)\n") % ( apt_pkg.size_to_str(self.fetched_bytes), apt_pkg.time_to_str(self.elapsed_time), apt_pkg.size_to_str(self.current_cps))).rstrip("\n")) # Delete the signal again. import signal signal.signal(signal.SIGWINCH, self._signal) class CdromProgress(base.CdromProgress, TextProgress): """Text CD-ROM progress.""" def ask_cdrom_name(self): # type: () -> Optional[str] """Ask the user to provide a name for the disc.""" base.CdromProgress.ask_cdrom_name(self) self._write(_("Please provide a name for this medium, such as " "'Debian 2.1r1 Disk 1'"), False) try: return str(input(":")) except KeyboardInterrupt: return None def update(self, text, current): # type: (str, int) -> None """Set the current progress.""" base.CdromProgress.update(self, text, current) if text: self._write(text, False) def change_cdrom(self): # type: () -> bool """Ask the user to change the CD-ROM.""" base.CdromProgress.change_cdrom(self) self._write(_("Please insert an installation medium and press enter"), False) try: return bool(input() == '') except KeyboardInterrupt: return False PK �A�Z1��*W1 W1 base.pynu �[��� # apt/progress/base.py - Base classes for progress reporting. # # Copyright (C) 2009 Julian Andres Klode <jak@debian.org> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA # pylint: disable-msg = R0201 """Base classes for progress reporting. Custom progress classes should inherit from these classes. They can also be used as dummy progress classes which simply do nothing. """ from __future__ import print_function import errno import fcntl import io import os import re import select import sys from typing import Optional, Union import apt_pkg __all__ = ['AcquireProgress', 'CdromProgress', 'InstallProgress', 'OpProgress'] class AcquireProgress(object): """Monitor object for downloads controlled by the Acquire class. This is an mostly abstract class. You should subclass it and implement the methods to get something useful. """ current_bytes = current_cps = fetched_bytes = last_bytes = total_bytes \ = 0.0 current_items = elapsed_time = total_items = 0 def done(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Invoked when an item is successfully and completely fetched.""" def fail(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Invoked when an item could not be fetched.""" def fetch(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Invoked when some of the item's data is fetched.""" def ims_hit(self, item): # type: (apt_pkg.AcquireItemDesc) -> None """Invoked when an item is confirmed to be up-to-date. Invoked when an item is confirmed to be up-to-date. For instance, when an HTTP download is informed that the file on the server was not modified. """ def media_change(self, media, drive): # type: (str, str) -> bool """Prompt the user to change the inserted removable media. The parameter 'media' decribes the name of the media type that should be changed, whereas the parameter 'drive' should be the identifying name of the drive whose media should be changed. This method should not return until the user has confirmed to the user interface that the media change is complete. It must return True if the user confirms the media change, or False to cancel it. """ return False def pulse(self, owner): # type: (apt_pkg.Acquire) -> bool """Periodically invoked while the Acquire process is underway. This method gets invoked while the Acquire progress given by the parameter 'owner' is underway. It should display information about the current state. This function returns a boolean value indicating whether the acquisition should be continued (True) or cancelled (False). """ return True def start(self): # type: () -> None """Invoked when the Acquire process starts running.""" # Reset all our values. self.current_bytes = 0.0 self.current_cps = 0.0 self.current_items = 0 self.elapsed_time = 0 self.fetched_bytes = 0.0 self.last_bytes = 0.0 self.total_bytes = 0.0 self.total_items = 0 def stop(self): # type: () -> None """Invoked when the Acquire process stops running.""" class CdromProgress(object): """Base class for reporting the progress of adding a cdrom. Can be used with apt_pkg.Cdrom to produce an utility like apt-cdrom. The attribute 'total_steps' defines the total number of steps and can be used in update() to display the current progress. """ total_steps = 0 def ask_cdrom_name(self): # type: () -> Optional[str] """Ask for the name of the cdrom. If a name has been provided, return it. Otherwise, return None to cancel the operation. """ def change_cdrom(self): # type: () -> bool """Ask for the CD-ROM to be changed. Return True once the cdrom has been changed or False to cancel the operation. """ def update(self, text, current): # type: (str, int) -> None """Periodically invoked to update the interface. The string 'text' defines the text which should be displayed. The integer 'current' defines the number of completed steps. """ class InstallProgress(object): """Class to report the progress of installing packages.""" child_pid, percent, select_timeout, status = 0, 0.0, 0.1, "" def __init__(self): # type: () -> None (self.statusfd, self.writefd) = os.pipe() # These will leak fds, but fixing this safely requires API changes. self.write_stream = os.fdopen(self.writefd, "w") # type: io.TextIOBase self.status_stream = os.fdopen(self.statusfd, "r") # type: io.TextIOBase # noqa fcntl.fcntl(self.statusfd, fcntl.F_SETFL, os.O_NONBLOCK) def start_update(self): # type: () -> None """(Abstract) Start update.""" def finish_update(self): # type: () -> None """(Abstract) Called when update has finished.""" def __enter__(self): # type: () -> InstallProgress return self def __exit__(self, type, value, traceback): # type: (object, object, object) -> None self.write_stream.close() self.status_stream.close() def error(self, pkg, errormsg): # type: (str, str) -> None """(Abstract) Called when a error is detected during the install.""" def conffile(self, current, new): # type: (str, str) -> None """(Abstract) Called when a conffile question from dpkg is detected.""" def status_change(self, pkg, percent, status): # type: (str, float, str) -> None """(Abstract) Called when the APT status changed.""" def dpkg_status_change(self, pkg, status): # type: (str, str) -> None """(Abstract) Called when the dpkg status changed.""" def processing(self, pkg, stage): # type: (str, str) -> None """(Abstract) Sent just before a processing stage starts. The parameter 'stage' is one of "upgrade", "install" (both sent before unpacking), "configure", "trigproc", "remove", "purge". This method is used for dpkg only. """ def run(self, obj): # type: (Union[apt_pkg.PackageManager, Union[bytes, str]]) -> int """Install using the object 'obj'. This functions runs install actions. The parameter 'obj' may either be a PackageManager object in which case its do_install() method is called or the path to a deb file. If the object is a PackageManager, the functions returns the result of calling its do_install() method. Otherwise, the function returns the exit status of dpkg. In both cases, 0 means that there were no problems. """ pid = self.fork() if pid == 0: try: # PEP-446 implemented in Python 3.4 made all descriptors # CLOEXEC, but we need to be able to pass writefd to dpkg # when we spawn it os.set_inheritable(self.writefd, True) except AttributeError: # if we don't have os.set_inheritable() pass # pm.do_install might raise a exception, # when this happens, we need to catch # it, otherwise os._exit() is not run # and the execution continues in the # parent code leading to very confusing bugs try: os._exit(obj.do_install(self.write_stream.fileno())) # type: ignore # noqa except AttributeError: os._exit(os.spawnlp(os.P_WAIT, "dpkg", "dpkg", "--status-fd", str(self.write_stream.fileno()), "-i", obj)) # type: ignore # noqa except Exception as e: sys.stderr.write("%s\n" % e) os._exit(apt_pkg.PackageManager.RESULT_FAILED) self.child_pid = pid res = self.wait_child() return os.WEXITSTATUS(res) def fork(self): # type: () -> int """Fork.""" return os.fork() def update_interface(self): # type: () -> None """Update the interface.""" try: line = self.status_stream.readline() except IOError as err: # resource temporarly unavailable is ignored if err.errno != errno.EAGAIN and err.errno != errno.EWOULDBLOCK: print(err.strerror) return pkgname = status = status_str = percent = base = "" if line.startswith('pm'): try: (status, pkgname, percent, status_str) = line.split(":", 3) except ValueError: # silently ignore lines that can't be parsed return elif line.startswith('status'): try: (base, pkgname, status, status_str) = line.split(":", 3) except ValueError: (base, pkgname, status) = line.split(":", 2) elif line.startswith('processing'): (status, status_str, pkgname) = line.split(":", 2) self.processing(pkgname.strip(), status_str.strip()) # Always strip the status message pkgname = pkgname.strip() status_str = status_str.strip() status = status.strip() if status == 'pmerror' or status == 'error': self.error(pkgname, status_str) elif status == 'conffile-prompt' or status == 'pmconffile': match = re.match("\\s*\'(.*)\'\\s*\'(.*)\'.*", status_str) if match: self.conffile(match.group(1), match.group(2)) elif status == "pmstatus": # FIXME: Float comparison if float(percent) != self.percent or status_str != self.status: self.status_change(pkgname, float(percent), status_str.strip()) self.percent = float(percent) self.status = status_str.strip() elif base == "status": self.dpkg_status_change(pkgname, status) def wait_child(self): # type: () -> int """Wait for child progress to exit. This method is responsible for calling update_interface() from time to time. It exits once the child has exited. The return values is the full status returned from os.waitpid() (not only the return code). """ (pid, res) = (0, 0) while True: try: select.select([self.status_stream], [], [], self.select_timeout) except select.error as error: (errno_, _errstr) = error.args if errno_ != errno.EINTR: raise self.update_interface() try: (pid, res) = os.waitpid(self.child_pid, os.WNOHANG) if pid == self.child_pid: break except OSError as err: if err.errno == errno.ECHILD: break if err.errno != errno.EINTR: raise return res class OpProgress(object): """Monitor objects for operations. Display the progress of operations such as opening the cache.""" major_change, op, percent, subop = False, "", 0.0, "" def update(self, percent=None): # type: (Optional[float]) -> None """Called periodically to update the user interface. You may use the optional argument 'percent' to set the attribute 'percent' in this call. """ if percent is not None: self.percent = percent def done(self): # type: () -> None """Called once an operation has been completed.""" PK �A�Z=�6� � __pycache__/text.cpython-310.pycnu �[��� o 4��f�( � @ s� d Z ddlmZ ddlZddlZddlZddlZddlZddlm Z m Z mZ ddlZddl mZ g d�Zdd� ZG d d � d e�ZG dd� deje�ZG d d� deje�ZG dd� deje�ZdS )z'Progress reporting for text interfaces.� )�print_functionN)�Callable�Optional�Union)�base)�AcquireProgress� CdromProgress� OpProgressc C s"