File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/psutil.tar
Back
tests/test_misc.py 0000644 00000071750 15030211765 0010264 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Miscellaneous tests.""" import collections import contextlib import io import json import os import pickle import socket import sys from unittest import mock import psutil import psutil.tests from psutil import WINDOWS from psutil._common import bcat from psutil._common import cat from psutil._common import debug from psutil._common import isfile_strict from psutil._common import memoize from psutil._common import memoize_when_activated from psutil._common import parse_environ_block from psutil._common import supports_ipv6 from psutil._common import wrap_numbers from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import PsutilTestCase from psutil.tests import process_namespace from psutil.tests import pytest from psutil.tests import reload_module from psutil.tests import system_namespace # =================================================================== # --- Test classes' repr(), str(), ... # =================================================================== class TestSpecialMethods(PsutilTestCase): def test_check_pid_range(self): with pytest.raises(OverflowError): psutil._psplatform.cext.check_pid_range(2**128) with pytest.raises(psutil.NoSuchProcess): psutil.Process(2**128) def test_process__repr__(self, func=repr): p = psutil.Process(self.spawn_testproc().pid) r = func(p) assert "psutil.Process" in r assert f"pid={p.pid}" in r assert f"name='{p.name()}'" in r.replace("name=u'", "name='") assert "status=" in r assert "exitcode=" not in r p.terminate() p.wait() r = func(p) assert "status='terminated'" in r assert "exitcode=" in r with mock.patch.object( psutil.Process, "name", side_effect=psutil.ZombieProcess(os.getpid()), ): p = psutil.Process() r = func(p) assert f"pid={p.pid}" in r assert "status='zombie'" in r assert "name=" not in r with mock.patch.object( psutil.Process, "name", side_effect=psutil.NoSuchProcess(os.getpid()), ): p = psutil.Process() r = func(p) assert f"pid={p.pid}" in r assert "terminated" in r assert "name=" not in r with mock.patch.object( psutil.Process, "name", side_effect=psutil.AccessDenied(os.getpid()), ): p = psutil.Process() r = func(p) assert f"pid={p.pid}" in r assert "name=" not in r def test_process__str__(self): self.test_process__repr__(func=str) def test_error__repr__(self): assert repr(psutil.Error()) == "psutil.Error()" def test_error__str__(self): assert str(psutil.Error()) == "" def test_no_such_process__repr__(self): assert ( repr(psutil.NoSuchProcess(321)) == "psutil.NoSuchProcess(pid=321, msg='process no longer exists')" ) assert ( repr(psutil.NoSuchProcess(321, name="name", msg="msg")) == "psutil.NoSuchProcess(pid=321, name='name', msg='msg')" ) def test_no_such_process__str__(self): assert ( str(psutil.NoSuchProcess(321)) == "process no longer exists (pid=321)" ) assert ( str(psutil.NoSuchProcess(321, name="name", msg="msg")) == "msg (pid=321, name='name')" ) def test_zombie_process__repr__(self): assert ( repr(psutil.ZombieProcess(321)) == 'psutil.ZombieProcess(pid=321, msg="PID still ' 'exists but it\'s a zombie")' ) assert ( repr(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")) == "psutil.ZombieProcess(pid=321, ppid=320, name='name'," " msg='foo')" ) def test_zombie_process__str__(self): assert ( str(psutil.ZombieProcess(321)) == "PID still exists but it's a zombie (pid=321)" ) assert ( str(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")) == "foo (pid=321, ppid=320, name='name')" ) def test_access_denied__repr__(self): assert repr(psutil.AccessDenied(321)) == "psutil.AccessDenied(pid=321)" assert ( repr(psutil.AccessDenied(321, name="name", msg="msg")) == "psutil.AccessDenied(pid=321, name='name', msg='msg')" ) def test_access_denied__str__(self): assert str(psutil.AccessDenied(321)) == "(pid=321)" assert ( str(psutil.AccessDenied(321, name="name", msg="msg")) == "msg (pid=321, name='name')" ) def test_timeout_expired__repr__(self): assert ( repr(psutil.TimeoutExpired(5)) == "psutil.TimeoutExpired(seconds=5, msg='timeout after 5" " seconds')" ) assert ( repr(psutil.TimeoutExpired(5, pid=321, name="name")) == "psutil.TimeoutExpired(pid=321, name='name', seconds=5, " "msg='timeout after 5 seconds')" ) def test_timeout_expired__str__(self): assert str(psutil.TimeoutExpired(5)) == "timeout after 5 seconds" assert ( str(psutil.TimeoutExpired(5, pid=321, name="name")) == "timeout after 5 seconds (pid=321, name='name')" ) def test_process__eq__(self): p1 = psutil.Process() p2 = psutil.Process() assert p1 == p2 p2._ident = (0, 0) assert p1 != p2 assert p1 != 'foo' def test_process__hash__(self): s = {psutil.Process(), psutil.Process()} assert len(s) == 1 # =================================================================== # --- Misc, generic, corner cases # =================================================================== class TestMisc(PsutilTestCase): def test__all__(self): dir_psutil = dir(psutil) for name in dir_psutil: if name in { 'debug', 'tests', 'test', 'PermissionError', 'ProcessLookupError', }: continue if not name.startswith('_'): try: __import__(name) except ImportError: if name not in psutil.__all__: fun = getattr(psutil, name) if fun is None: continue if ( fun.__doc__ is not None and 'deprecated' not in fun.__doc__.lower() ): raise self.fail(f"{name!r} not in psutil.__all__") # Import 'star' will break if __all__ is inconsistent, see: # https://github.com/giampaolo/psutil/issues/656 # Can't do `from psutil import *` as it won't work # so we simply iterate over __all__. for name in psutil.__all__: assert name in dir_psutil def test_version(self): assert ( '.'.join([str(x) for x in psutil.version_info]) == psutil.__version__ ) def test_process_as_dict_no_new_names(self): # See https://github.com/giampaolo/psutil/issues/813 p = psutil.Process() p.foo = '1' assert 'foo' not in p.as_dict() def test_serialization(self): def check(ret): json.loads(json.dumps(ret)) a = pickle.dumps(ret) b = pickle.loads(a) assert ret == b # --- process APIs proc = psutil.Process() check(psutil.Process().as_dict()) ns = process_namespace(proc) for fun, name in ns.iter(ns.getters, clear_cache=True): with self.subTest(proc=proc, name=name): try: ret = fun() except psutil.Error: pass else: check(ret) # --- system APIs ns = system_namespace() for fun, name in ns.iter(ns.getters): if name in {"win_service_iter", "win_service_get"}: continue with self.subTest(name=name): try: ret = fun() except psutil.AccessDenied: pass else: check(ret) # --- exception classes b = pickle.loads( pickle.dumps( psutil.NoSuchProcess(pid=4567, name='name', msg='msg') ) ) assert isinstance(b, psutil.NoSuchProcess) assert b.pid == 4567 assert b.name == 'name' assert b.msg == 'msg' b = pickle.loads( pickle.dumps( psutil.ZombieProcess(pid=4567, name='name', ppid=42, msg='msg') ) ) assert isinstance(b, psutil.ZombieProcess) assert b.pid == 4567 assert b.ppid == 42 assert b.name == 'name' assert b.msg == 'msg' b = pickle.loads( pickle.dumps(psutil.AccessDenied(pid=123, name='name', msg='msg')) ) assert isinstance(b, psutil.AccessDenied) assert b.pid == 123 assert b.name == 'name' assert b.msg == 'msg' b = pickle.loads( pickle.dumps( psutil.TimeoutExpired(seconds=33, pid=4567, name='name') ) ) assert isinstance(b, psutil.TimeoutExpired) assert b.seconds == 33 assert b.pid == 4567 assert b.name == 'name' def test_ad_on_process_creation(self): # We are supposed to be able to instantiate Process also in case # of zombie processes or access denied. with mock.patch.object( psutil.Process, '_get_ident', side_effect=psutil.AccessDenied ) as meth: psutil.Process() assert meth.called with mock.patch.object( psutil.Process, '_get_ident', side_effect=psutil.ZombieProcess(1) ) as meth: psutil.Process() assert meth.called with mock.patch.object( psutil.Process, '_get_ident', side_effect=ValueError ) as meth: with pytest.raises(ValueError): psutil.Process() assert meth.called with mock.patch.object( psutil.Process, '_get_ident', side_effect=psutil.NoSuchProcess(1) ) as meth: with self.assertRaises(psutil.NoSuchProcess): psutil.Process() assert meth.called def test_sanity_version_check(self): # see: https://github.com/giampaolo/psutil/issues/564 with mock.patch( "psutil._psplatform.cext.version", return_value="0.0.0" ): with pytest.raises(ImportError) as cm: reload_module(psutil) assert "version conflict" in str(cm.value).lower() # =================================================================== # --- psutil/_common.py utils # =================================================================== class TestMemoizeDecorator(PsutilTestCase): def setUp(self): self.calls = [] tearDown = setUp def run_against(self, obj, expected_retval=None): # no args for _ in range(2): ret = obj() assert self.calls == [((), {})] if expected_retval is not None: assert ret == expected_retval # with args for _ in range(2): ret = obj(1) assert self.calls == [((), {}), ((1,), {})] if expected_retval is not None: assert ret == expected_retval # with args + kwargs for _ in range(2): ret = obj(1, bar=2) assert self.calls == [((), {}), ((1,), {}), ((1,), {'bar': 2})] if expected_retval is not None: assert ret == expected_retval # clear cache assert len(self.calls) == 3 obj.cache_clear() ret = obj() if expected_retval is not None: assert ret == expected_retval assert len(self.calls) == 4 # docstring assert obj.__doc__ == "My docstring." def test_function(self): @memoize def foo(*args, **kwargs): """My docstring.""" baseclass.calls.append((args, kwargs)) return 22 baseclass = self self.run_against(foo, expected_retval=22) def test_class(self): @memoize class Foo: """My docstring.""" def __init__(self, *args, **kwargs): baseclass.calls.append((args, kwargs)) def bar(self): return 22 baseclass = self self.run_against(Foo, expected_retval=None) assert Foo().bar() == 22 def test_class_singleton(self): # @memoize can be used against classes to create singletons @memoize class Bar: def __init__(self, *args, **kwargs): pass assert Bar() is Bar() assert id(Bar()) == id(Bar()) assert id(Bar(1)) == id(Bar(1)) assert id(Bar(1, foo=3)) == id(Bar(1, foo=3)) assert id(Bar(1)) != id(Bar(2)) def test_staticmethod(self): class Foo: @staticmethod @memoize def bar(*args, **kwargs): """My docstring.""" baseclass.calls.append((args, kwargs)) return 22 baseclass = self self.run_against(Foo().bar, expected_retval=22) def test_classmethod(self): class Foo: @classmethod @memoize def bar(cls, *args, **kwargs): """My docstring.""" baseclass.calls.append((args, kwargs)) return 22 baseclass = self self.run_against(Foo().bar, expected_retval=22) def test_original(self): # This was the original test before I made it dynamic to test it # against different types. Keeping it anyway. @memoize def foo(*args, **kwargs): """Foo docstring.""" calls.append(None) return (args, kwargs) calls = [] # no args for _ in range(2): ret = foo() expected = ((), {}) assert ret == expected assert len(calls) == 1 # with args for _ in range(2): ret = foo(1) expected = ((1,), {}) assert ret == expected assert len(calls) == 2 # with args + kwargs for _ in range(2): ret = foo(1, bar=2) expected = ((1,), {'bar': 2}) assert ret == expected assert len(calls) == 3 # clear cache foo.cache_clear() ret = foo() expected = ((), {}) assert ret == expected assert len(calls) == 4 # docstring assert foo.__doc__ == "Foo docstring." class TestCommonModule(PsutilTestCase): def test_memoize_when_activated(self): class Foo: @memoize_when_activated def foo(self): calls.append(None) f = Foo() calls = [] f.foo() f.foo() assert len(calls) == 2 # activate calls = [] f.foo.cache_activate(f) f.foo() f.foo() assert len(calls) == 1 # deactivate calls = [] f.foo.cache_deactivate(f) f.foo() f.foo() assert len(calls) == 2 def test_parse_environ_block(self): def k(s): return s.upper() if WINDOWS else s assert parse_environ_block("a=1\0") == {k("a"): "1"} assert parse_environ_block("a=1\0b=2\0\0") == { k("a"): "1", k("b"): "2", } assert parse_environ_block("a=1\0b=\0\0") == {k("a"): "1", k("b"): ""} # ignore everything after \0\0 assert parse_environ_block("a=1\0b=2\0\0c=3\0") == { k("a"): "1", k("b"): "2", } # ignore everything that is not an assignment assert parse_environ_block("xxx\0a=1\0") == {k("a"): "1"} assert parse_environ_block("a=1\0=b=2\0") == {k("a"): "1"} # do not fail if the block is incomplete assert parse_environ_block("a=1\0b=2") == {k("a"): "1"} def test_supports_ipv6(self): self.addCleanup(supports_ipv6.cache_clear) if supports_ipv6(): with mock.patch('psutil._common.socket') as s: s.has_ipv6 = False supports_ipv6.cache_clear() assert not supports_ipv6() supports_ipv6.cache_clear() with mock.patch( 'psutil._common.socket.socket', side_effect=OSError ) as s: assert not supports_ipv6() assert s.called supports_ipv6.cache_clear() with mock.patch( 'psutil._common.socket.socket', side_effect=socket.gaierror ) as s: assert not supports_ipv6() supports_ipv6.cache_clear() assert s.called supports_ipv6.cache_clear() with mock.patch( 'psutil._common.socket.socket.bind', side_effect=socket.gaierror, ) as s: assert not supports_ipv6() supports_ipv6.cache_clear() assert s.called else: with pytest.raises(OSError): sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) try: sock.bind(("::1", 0)) finally: sock.close() def test_isfile_strict(self): this_file = os.path.abspath(__file__) assert isfile_strict(this_file) assert not isfile_strict(os.path.dirname(this_file)) with mock.patch('psutil._common.os.stat', side_effect=PermissionError): with pytest.raises(OSError): isfile_strict(this_file) with mock.patch( 'psutil._common.os.stat', side_effect=FileNotFoundError ): assert not isfile_strict(this_file) with mock.patch('psutil._common.stat.S_ISREG', return_value=False): assert not isfile_strict(this_file) def test_debug(self): with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): with contextlib.redirect_stderr(io.StringIO()) as f: debug("hello") sys.stderr.flush() msg = f.getvalue() assert msg.startswith("psutil-debug"), msg assert "hello" in msg assert __file__.replace('.pyc', '.py') in msg # supposed to use repr(exc) with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): with contextlib.redirect_stderr(io.StringIO()) as f: debug(ValueError("this is an error")) msg = f.getvalue() assert "ignoring ValueError" in msg assert "'this is an error'" in msg # supposed to use str(exc), because of extra info about file name with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): with contextlib.redirect_stderr(io.StringIO()) as f: exc = OSError(2, "no such file") exc.filename = "/foo" debug(exc) msg = f.getvalue() assert "no such file" in msg assert "/foo" in msg def test_cat_bcat(self): testfn = self.get_testfn() with open(testfn, "w") as f: f.write("foo") assert cat(testfn) == "foo" assert bcat(testfn) == b"foo" with pytest.raises(FileNotFoundError): cat(testfn + '-invalid') with pytest.raises(FileNotFoundError): bcat(testfn + '-invalid') assert cat(testfn + '-invalid', fallback="bar") == "bar" assert bcat(testfn + '-invalid', fallback="bar") == "bar" # =================================================================== # --- Tests for wrap_numbers() function. # =================================================================== nt = collections.namedtuple('foo', 'a b c') class TestWrapNumbers(PsutilTestCase): def setUp(self): wrap_numbers.cache_clear() tearDown = setUp def test_first_call(self): input = {'disk1': nt(5, 5, 5)} assert wrap_numbers(input, 'disk_io') == input def test_input_hasnt_changed(self): input = {'disk1': nt(5, 5, 5)} assert wrap_numbers(input, 'disk_io') == input assert wrap_numbers(input, 'disk_io') == input def test_increase_but_no_wrap(self): input = {'disk1': nt(5, 5, 5)} assert wrap_numbers(input, 'disk_io') == input input = {'disk1': nt(10, 15, 20)} assert wrap_numbers(input, 'disk_io') == input input = {'disk1': nt(20, 25, 30)} assert wrap_numbers(input, 'disk_io') == input input = {'disk1': nt(20, 25, 30)} assert wrap_numbers(input, 'disk_io') == input def test_wrap(self): # let's say 100 is the threshold input = {'disk1': nt(100, 100, 100)} assert wrap_numbers(input, 'disk_io') == input # first wrap restarts from 10 input = {'disk1': nt(100, 100, 10)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)} # then it remains the same input = {'disk1': nt(100, 100, 10)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)} # then it goes up input = {'disk1': nt(100, 100, 90)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 190)} # then it wraps again input = {'disk1': nt(100, 100, 20)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)} # and remains the same input = {'disk1': nt(100, 100, 20)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)} # now wrap another num input = {'disk1': nt(50, 100, 20)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(150, 100, 210)} # and again input = {'disk1': nt(40, 100, 20)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)} # keep it the same input = {'disk1': nt(40, 100, 20)} assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)} def test_changing_keys(self): # Emulate a case where the second call to disk_io() # (or whatever) provides a new disk, then the new disk # disappears on the third call. input = {'disk1': nt(5, 5, 5)} assert wrap_numbers(input, 'disk_io') == input input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)} assert wrap_numbers(input, 'disk_io') == input input = {'disk1': nt(8, 8, 8)} assert wrap_numbers(input, 'disk_io') == input def test_changing_keys_w_wrap(self): input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} assert wrap_numbers(input, 'disk_io') == input # disk 2 wraps input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)} assert wrap_numbers(input, 'disk_io') == { 'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 110), } # disk 2 disappears input = {'disk1': nt(50, 50, 50)} assert wrap_numbers(input, 'disk_io') == input # then it appears again; the old wrap is supposed to be # gone. input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} assert wrap_numbers(input, 'disk_io') == input # remains the same input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} assert wrap_numbers(input, 'disk_io') == input # and then wraps again input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)} assert wrap_numbers(input, 'disk_io') == { 'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 110), } def test_real_data(self): d = { 'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348), } assert wrap_numbers(d, 'disk_io') == d assert wrap_numbers(d, 'disk_io') == d # decrease this ↓ d = { 'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348), } out = wrap_numbers(d, 'disk_io') assert out['nvme0n1'][0] == 400 # --- cache tests def test_cache_first_call(self): input = {'disk1': nt(5, 5, 5)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} assert cache[1] == {'disk_io': {}} assert cache[2] == {'disk_io': {}} def test_cache_call_twice(self): input = {'disk1': nt(5, 5, 5)} wrap_numbers(input, 'disk_io') input = {'disk1': nt(10, 10, 10)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} assert cache[1] == { 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0} } assert cache[2] == {'disk_io': {}} def test_cache_wrap(self): # let's say 100 is the threshold input = {'disk1': nt(100, 100, 100)} wrap_numbers(input, 'disk_io') # first wrap restarts from 10 input = {'disk1': nt(100, 100, 10)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} assert cache[1] == { 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100} } assert cache[2] == {'disk_io': {'disk1': {('disk1', 2)}}} def check_cache_info(): cache = wrap_numbers.cache_info() assert cache[1] == { 'disk_io': { ('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100, } } assert cache[2] == {'disk_io': {'disk1': {('disk1', 2)}}} # then it remains the same input = {'disk1': nt(100, 100, 10)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} check_cache_info() # then it goes up input = {'disk1': nt(100, 100, 90)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} check_cache_info() # then it wraps again input = {'disk1': nt(100, 100, 20)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} assert cache[1] == { 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190} } assert cache[2] == {'disk_io': {'disk1': {('disk1', 2)}}} def test_cache_changing_keys(self): input = {'disk1': nt(5, 5, 5)} wrap_numbers(input, 'disk_io') input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)} wrap_numbers(input, 'disk_io') cache = wrap_numbers.cache_info() assert cache[0] == {'disk_io': input} assert cache[1] == { 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0} } assert cache[2] == {'disk_io': {}} def test_cache_clear(self): input = {'disk1': nt(5, 5, 5)} wrap_numbers(input, 'disk_io') wrap_numbers(input, 'disk_io') wrap_numbers.cache_clear('disk_io') assert wrap_numbers.cache_info() == ({}, {}, {}) wrap_numbers.cache_clear('disk_io') wrap_numbers.cache_clear('?!?') @pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported") def test_cache_clear_public_apis(self): if not psutil.disk_io_counters() or not psutil.net_io_counters(): raise pytest.skip("no disks or NICs available") psutil.disk_io_counters() psutil.net_io_counters() caches = wrap_numbers.cache_info() for cache in caches: assert 'psutil.disk_io_counters' in cache assert 'psutil.net_io_counters' in cache psutil.disk_io_counters.cache_clear() caches = wrap_numbers.cache_info() for cache in caches: assert 'psutil.net_io_counters' in cache assert 'psutil.disk_io_counters' not in cache psutil.net_io_counters.cache_clear() caches = wrap_numbers.cache_info() assert caches == ({}, {}, {}) tests/test_scripts.py 0000644 00000017055 15030211765 0011016 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Test various scripts.""" import ast import os import shutil import stat import subprocess import pytest from psutil import POSIX from psutil import WINDOWS from psutil.tests import CI_TESTING from psutil.tests import HAS_BATTERY from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import PYTHON_EXE from psutil.tests import PYTHON_EXE_ENV from psutil.tests import ROOT_DIR from psutil.tests import SCRIPTS_DIR from psutil.tests import PsutilTestCase from psutil.tests import import_module_by_path from psutil.tests import psutil from psutil.tests import sh INTERNAL_SCRIPTS_DIR = os.path.join(SCRIPTS_DIR, "internal") SETUP_PY = os.path.join(ROOT_DIR, 'setup.py') # =================================================================== # --- Tests scripts in scripts/ directory # =================================================================== @pytest.mark.skipif( CI_TESTING and not os.path.exists(SCRIPTS_DIR), reason="can't find scripts/ directory", ) class TestExampleScripts(PsutilTestCase): @staticmethod def assert_stdout(exe, *args, **kwargs): kwargs.setdefault("env", PYTHON_EXE_ENV) exe = os.path.join(SCRIPTS_DIR, exe) cmd = [PYTHON_EXE, exe] for arg in args: cmd.append(arg) try: out = sh(cmd, **kwargs).strip() except RuntimeError as err: if 'AccessDenied' in str(err): return str(err) else: raise assert out, out return out @staticmethod def assert_syntax(exe): exe = os.path.join(SCRIPTS_DIR, exe) with open(exe, encoding="utf8") as f: src = f.read() ast.parse(src) def test_coverage(self): # make sure all example scripts have a test method defined meths = dir(self) for name in os.listdir(SCRIPTS_DIR): if name.endswith('.py'): if 'test_' + os.path.splitext(name)[0] not in meths: # self.assert_stdout(name) raise self.fail( "no test defined for" f" {os.path.join(SCRIPTS_DIR, name)!r} script" ) @pytest.mark.skipif(not POSIX, reason="POSIX only") def test_executable(self): for root, dirs, files in os.walk(SCRIPTS_DIR): for file in files: if file.endswith('.py'): path = os.path.join(root, file) if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: raise self.fail(f"{path!r} is not executable") def test_disk_usage(self): self.assert_stdout('disk_usage.py') def test_free(self): self.assert_stdout('free.py') def test_meminfo(self): self.assert_stdout('meminfo.py') def test_procinfo(self): self.assert_stdout('procinfo.py', str(os.getpid())) @pytest.mark.skipif(CI_TESTING and not psutil.users(), reason="no users") def test_who(self): self.assert_stdout('who.py') def test_ps(self): self.assert_stdout('ps.py') def test_pstree(self): self.assert_stdout('pstree.py') def test_netstat(self): self.assert_stdout('netstat.py') def test_ifconfig(self): self.assert_stdout('ifconfig.py') @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported") def test_pmap(self): self.assert_stdout('pmap.py', str(os.getpid())) def test_procsmem(self): if 'uss' not in psutil.Process().memory_full_info()._fields: raise pytest.skip("not supported") self.assert_stdout('procsmem.py') def test_killall(self): self.assert_syntax('killall.py') def test_nettop(self): self.assert_syntax('nettop.py') def test_top(self): self.assert_syntax('top.py') def test_iotop(self): self.assert_syntax('iotop.py') def test_pidof(self): output = self.assert_stdout('pidof.py', psutil.Process().name()) assert str(os.getpid()) in output @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") def test_winservices(self): self.assert_stdout('winservices.py') def test_cpu_distribution(self): self.assert_syntax('cpu_distribution.py') @pytest.mark.skipif(not HAS_SENSORS_TEMPERATURES, reason="not supported") def test_temperatures(self): if not psutil.sensors_temperatures(): raise pytest.skip("no temperatures") self.assert_stdout('temperatures.py') @pytest.mark.skipif(not HAS_SENSORS_FANS, reason="not supported") def test_fans(self): if not psutil.sensors_fans(): raise pytest.skip("no fans") self.assert_stdout('fans.py') @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported") @pytest.mark.skipif(not HAS_BATTERY, reason="no battery") def test_battery(self): self.assert_stdout('battery.py') @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported") @pytest.mark.skipif(not HAS_BATTERY, reason="no battery") def test_sensors(self): self.assert_stdout('sensors.py') # =================================================================== # --- Tests scripts in scripts/internal/ directory # =================================================================== @pytest.mark.skipif( CI_TESTING and not os.path.exists(INTERNAL_SCRIPTS_DIR), reason="can't find scripts/internal/ directory", ) class TestInternalScripts(PsutilTestCase): @staticmethod def ls(): for name in os.listdir(INTERNAL_SCRIPTS_DIR): if name.endswith(".py"): yield os.path.join(INTERNAL_SCRIPTS_DIR, name) def test_syntax_all(self): for path in self.ls(): with open(path, encoding="utf8") as f: data = f.read() ast.parse(data) @pytest.mark.skipif(CI_TESTING, reason="not on CI") def test_import_all(self): for path in self.ls(): try: import_module_by_path(path) except SystemExit: pass # =================================================================== # --- Tests for setup.py script # =================================================================== @pytest.mark.skipif( CI_TESTING and not os.path.exists(SETUP_PY), reason="can't find setup.py" ) class TestSetupScript(PsutilTestCase): def test_invocation(self): module = import_module_by_path(SETUP_PY) with pytest.raises(SystemExit): module.setup() assert module.get_version() == psutil.__version__ @pytest.mark.skipif( not shutil.which("python2.7"), reason="python2.7 not installed" ) def test_python2(self): # There's a duplicate of this test in scripts/internal # directory, which is only executed by CI. We replicate it here # to run it when developing locally. p = subprocess.Popen( [shutil.which("python2.7"), SETUP_PY], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) stdout, stderr = p.communicate() assert p.wait() == 1 assert not stdout assert "psutil no longer supports Python 2.7" in stderr assert "Latest version supporting Python 2.7 is" in stderr tests/test_memleaks.py 0000644 00000035421 15030211765 0011122 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Tests for detecting function memory leaks (typically the ones implemented in C). It does so by calling a function many times and checking whether process memory usage keeps increasing between calls or over time. Note that this may produce false positives (especially on Windows for some reason). PyPy appears to be completely unstable for this framework, probably because of how its JIT handles memory, so tests are skipped. """ import functools import os import platform import psutil import psutil._common from psutil import LINUX from psutil import MACOS from psutil import OPENBSD from psutil import POSIX from psutil import SUNOS from psutil import WINDOWS from psutil.tests import HAS_CPU_AFFINITY from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_ENVIRON from psutil.tests import HAS_IONICE from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import HAS_PROC_CPU_NUM from psutil.tests import HAS_PROC_IO_COUNTERS from psutil.tests import HAS_RLIMIT from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import TestMemoryLeak from psutil.tests import create_sockets from psutil.tests import get_testfn from psutil.tests import process_namespace from psutil.tests import pytest from psutil.tests import skip_on_access_denied from psutil.tests import spawn_testproc from psutil.tests import system_namespace from psutil.tests import terminate cext = psutil._psplatform.cext thisproc = psutil.Process() FEW_TIMES = 5 def fewtimes_if_linux(): """Decorator for those Linux functions which are implemented in pure Python, and which we want to run faster. """ def decorator(fun): @functools.wraps(fun) def wrapper(self, *args, **kwargs): if LINUX: before = self.__class__.times try: self.__class__.times = FEW_TIMES return fun(self, *args, **kwargs) finally: self.__class__.times = before else: return fun(self, *args, **kwargs) return wrapper return decorator # =================================================================== # Process class # =================================================================== class TestProcessObjectLeaks(TestMemoryLeak): """Test leaks of Process class methods.""" proc = thisproc def test_coverage(self): ns = process_namespace(None) ns.test_class_coverage(self, ns.getters + ns.setters) @fewtimes_if_linux() def test_name(self): self.execute(self.proc.name) @fewtimes_if_linux() def test_cmdline(self): self.execute(self.proc.cmdline) @fewtimes_if_linux() def test_exe(self): self.execute(self.proc.exe) @fewtimes_if_linux() def test_ppid(self): self.execute(self.proc.ppid) @pytest.mark.skipif(not POSIX, reason="POSIX only") @fewtimes_if_linux() def test_uids(self): self.execute(self.proc.uids) @pytest.mark.skipif(not POSIX, reason="POSIX only") @fewtimes_if_linux() def test_gids(self): self.execute(self.proc.gids) @fewtimes_if_linux() def test_status(self): self.execute(self.proc.status) def test_nice(self): self.execute(self.proc.nice) def test_nice_set(self): niceness = thisproc.nice() self.execute(lambda: self.proc.nice(niceness)) @pytest.mark.skipif(not HAS_IONICE, reason="not supported") def test_ionice(self): self.execute(self.proc.ionice) @pytest.mark.skipif(not HAS_IONICE, reason="not supported") def test_ionice_set(self): if WINDOWS: value = thisproc.ionice() self.execute(lambda: self.proc.ionice(value)) else: self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE)) fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) self.execute_w_exc(OSError, fun) @pytest.mark.skipif(not HAS_PROC_IO_COUNTERS, reason="not supported") @fewtimes_if_linux() def test_io_counters(self): self.execute(self.proc.io_counters) @pytest.mark.skipif(POSIX, reason="worthless on POSIX") def test_username(self): # always open 1 handle on Windows (only once) psutil.Process().username() self.execute(self.proc.username) @fewtimes_if_linux() def test_create_time(self): self.execute(self.proc.create_time) @fewtimes_if_linux() @skip_on_access_denied(only_if=OPENBSD) def test_num_threads(self): self.execute(self.proc.num_threads) @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") def test_num_handles(self): self.execute(self.proc.num_handles) @pytest.mark.skipif(not POSIX, reason="POSIX only") @fewtimes_if_linux() def test_num_fds(self): self.execute(self.proc.num_fds) @fewtimes_if_linux() def test_num_ctx_switches(self): self.execute(self.proc.num_ctx_switches) @fewtimes_if_linux() @skip_on_access_denied(only_if=OPENBSD) def test_threads(self): self.execute(self.proc.threads) @fewtimes_if_linux() def test_cpu_times(self): self.execute(self.proc.cpu_times) @fewtimes_if_linux() @pytest.mark.skipif(not HAS_PROC_CPU_NUM, reason="not supported") def test_cpu_num(self): self.execute(self.proc.cpu_num) @fewtimes_if_linux() def test_memory_info(self): self.execute(self.proc.memory_info) @fewtimes_if_linux() def test_memory_full_info(self): self.execute(self.proc.memory_full_info) @pytest.mark.skipif(not POSIX, reason="POSIX only") @fewtimes_if_linux() def test_terminal(self): self.execute(self.proc.terminal) def test_resume(self): times = FEW_TIMES if POSIX else self.times self.execute(self.proc.resume, times=times) @fewtimes_if_linux() def test_cwd(self): self.execute(self.proc.cwd) @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported") def test_cpu_affinity(self): self.execute(self.proc.cpu_affinity) @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported") def test_cpu_affinity_set(self): affinity = thisproc.cpu_affinity() self.execute(lambda: self.proc.cpu_affinity(affinity)) self.execute_w_exc(ValueError, lambda: self.proc.cpu_affinity([-1])) @fewtimes_if_linux() def test_open_files(self): with open(get_testfn(), 'w'): self.execute(self.proc.open_files) @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported") @fewtimes_if_linux() def test_memory_maps(self): self.execute(self.proc.memory_maps) @pytest.mark.skipif(not LINUX, reason="LINUX only") @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported") def test_rlimit(self): self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE)) @pytest.mark.skipif(not LINUX, reason="LINUX only") @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported") def test_rlimit_set(self): limit = thisproc.rlimit(psutil.RLIMIT_NOFILE) self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit)) self.execute_w_exc((OSError, ValueError), lambda: self.proc.rlimit(-1)) @fewtimes_if_linux() # Windows implementation is based on a single system-wide # function (tested later). @pytest.mark.skipif(WINDOWS, reason="worthless on WINDOWS") def test_net_connections(self): # TODO: UNIX sockets are temporarily implemented by parsing # 'pfiles' cmd output; we don't want that part of the code to # be executed. with create_sockets(): kind = 'inet' if SUNOS else 'all' self.execute(lambda: self.proc.net_connections(kind)) @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported") def test_environ(self): self.execute(self.proc.environ) @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") def test_proc_info(self): self.execute(lambda: cext.proc_info(os.getpid())) class TestTerminatedProcessLeaks(TestProcessObjectLeaks): """Repeat the tests above looking for leaks occurring when dealing with terminated processes raising NoSuchProcess exception. The C functions are still invoked but will follow different code paths. We'll check those code paths. """ @classmethod def setUpClass(cls): super().setUpClass() cls.subp = spawn_testproc() cls.proc = psutil.Process(cls.subp.pid) cls.proc.kill() cls.proc.wait() @classmethod def tearDownClass(cls): super().tearDownClass() terminate(cls.subp) def call(self, fun): try: fun() except psutil.NoSuchProcess: pass if WINDOWS: def test_kill(self): self.execute(self.proc.kill) def test_terminate(self): self.execute(self.proc.terminate) def test_suspend(self): self.execute(self.proc.suspend) def test_resume(self): self.execute(self.proc.resume) def test_wait(self): self.execute(self.proc.wait) def test_proc_info(self): # test dual implementation def call(): try: return cext.proc_info(self.proc.pid) except ProcessLookupError: pass self.execute(call) @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") class TestProcessDualImplementation(TestMemoryLeak): def test_cmdline_peb_true(self): self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True)) def test_cmdline_peb_false(self): self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False)) # =================================================================== # system APIs # =================================================================== class TestModuleFunctionsLeaks(TestMemoryLeak): """Test leaks of psutil module functions.""" def test_coverage(self): ns = system_namespace() ns.test_class_coverage(self, ns.all) # --- cpu @fewtimes_if_linux() def test_cpu_count(self): # logical self.execute(lambda: psutil.cpu_count(logical=True)) @fewtimes_if_linux() def test_cpu_count_cores(self): self.execute(lambda: psutil.cpu_count(logical=False)) @fewtimes_if_linux() def test_cpu_times(self): self.execute(psutil.cpu_times) @fewtimes_if_linux() def test_per_cpu_times(self): self.execute(lambda: psutil.cpu_times(percpu=True)) @fewtimes_if_linux() def test_cpu_stats(self): self.execute(psutil.cpu_stats) @fewtimes_if_linux() # TODO: remove this once 1892 is fixed @pytest.mark.skipif( MACOS and platform.machine() == 'arm64', reason="skipped due to #1892" ) @pytest.mark.skipif(not HAS_CPU_FREQ, reason="not supported") def test_cpu_freq(self): self.execute(psutil.cpu_freq) @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") def test_getloadavg(self): psutil.getloadavg() self.execute(psutil.getloadavg) # --- mem def test_virtual_memory(self): self.execute(psutil.virtual_memory) # TODO: remove this skip when this gets fixed @pytest.mark.skipif(SUNOS, reason="worthless on SUNOS (uses a subprocess)") def test_swap_memory(self): self.execute(psutil.swap_memory) def test_pid_exists(self): times = FEW_TIMES if POSIX else self.times self.execute(lambda: psutil.pid_exists(os.getpid()), times=times) # --- disk def test_disk_usage(self): times = FEW_TIMES if POSIX else self.times self.execute(lambda: psutil.disk_usage('.'), times=times) def test_disk_partitions(self): self.execute(psutil.disk_partitions) @pytest.mark.skipif( LINUX and not os.path.exists('/proc/diskstats'), reason="/proc/diskstats not available on this Linux version", ) @fewtimes_if_linux() def test_disk_io_counters(self): self.execute(lambda: psutil.disk_io_counters(nowrap=False)) # --- proc @fewtimes_if_linux() def test_pids(self): self.execute(psutil.pids) # --- net @fewtimes_if_linux() @pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported") def test_net_io_counters(self): self.execute(lambda: psutil.net_io_counters(nowrap=False)) @fewtimes_if_linux() @pytest.mark.skipif(MACOS and os.getuid() != 0, reason="need root access") def test_net_connections(self): # always opens and handle on Windows() (once) psutil.net_connections(kind='all') with create_sockets(): self.execute(lambda: psutil.net_connections(kind='all')) def test_net_if_addrs(self): # Note: verified that on Windows this was a false positive. tolerance = 80 * 1024 if WINDOWS else self.tolerance self.execute(psutil.net_if_addrs, tolerance=tolerance) def test_net_if_stats(self): self.execute(psutil.net_if_stats) # --- sensors @fewtimes_if_linux() @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported") def test_sensors_battery(self): self.execute(psutil.sensors_battery) @fewtimes_if_linux() @pytest.mark.skipif(not HAS_SENSORS_TEMPERATURES, reason="not supported") def test_sensors_temperatures(self): self.execute(psutil.sensors_temperatures) @fewtimes_if_linux() @pytest.mark.skipif(not HAS_SENSORS_FANS, reason="not supported") def test_sensors_fans(self): self.execute(psutil.sensors_fans) # --- others @fewtimes_if_linux() def test_boot_time(self): self.execute(psutil.boot_time) def test_users(self): self.execute(psutil.users) def test_set_debug(self): self.execute(lambda: psutil._set_debug(False)) if WINDOWS: # --- win services def test_win_service_iter(self): self.execute(cext.winservice_enumerate) def test_win_service_get(self): pass def test_win_service_get_config(self): name = next(psutil.win_service_iter()).name() self.execute(lambda: cext.winservice_query_config(name)) def test_win_service_get_status(self): name = next(psutil.win_service_iter()).name() self.execute(lambda: cext.winservice_query_status(name)) def test_win_service_get_description(self): name = next(psutil.win_service_iter()).name() self.execute(lambda: cext.winservice_query_descr(name)) tests/test_posix.py 0000644 00000041443 15030211765 0010467 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """POSIX specific tests.""" import datetime import errno import os import re import shutil import subprocess import time from unittest import mock import psutil from psutil import AIX from psutil import BSD from psutil import LINUX from psutil import MACOS from psutil import OPENBSD from psutil import POSIX from psutil import SUNOS from psutil.tests import AARCH64 from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import PYTHON_EXE from psutil.tests import PsutilTestCase from psutil.tests import pytest from psutil.tests import retry_on_failure from psutil.tests import sh from psutil.tests import skip_on_access_denied from psutil.tests import spawn_testproc from psutil.tests import terminate if POSIX: import mmap import resource from psutil._psutil_posix import getpagesize def ps(fmt, pid=None): """Wrapper for calling the ps command with a little bit of cross-platform support for a narrow range of features. """ cmd = ['ps'] if LINUX: cmd.append('--no-headers') if pid is not None: cmd.extend(['-p', str(pid)]) elif SUNOS or AIX: cmd.append('-A') else: cmd.append('ax') if SUNOS: fmt = fmt.replace("start", "stime") cmd.extend(['-o', fmt]) output = sh(cmd) output = output.splitlines() if LINUX else output.splitlines()[1:] all_output = [] for line in output: line = line.strip() try: line = int(line) except ValueError: pass all_output.append(line) if pid is None: return all_output else: return all_output[0] # ps "-o" field names differ wildly between platforms. # "comm" means "only executable name" but is not available on BSD platforms. # "args" means "command with all its arguments", and is also not available # on BSD platforms. # "command" is like "args" on most platforms, but like "comm" on AIX, # and not available on SUNOS. # so for the executable name we can use "comm" on Solaris and split "command" # on other platforms. # to get the cmdline (with args) we have to use "args" on AIX and # Solaris, and can use "command" on all others. def ps_name(pid): field = "command" if SUNOS: field = "comm" command = ps(field, pid).split() return command[0] def ps_args(pid): field = "command" if AIX or SUNOS: field = "args" out = ps(field, pid) # observed on BSD + Github CI: '/usr/local/bin/python3 -E -O (python3.9)' out = re.sub(r"\(python.*?\)$", "", out) return out.strip() def ps_rss(pid): field = "rss" if AIX: field = "rssize" return ps(field, pid) def ps_vsz(pid): field = "vsz" if AIX: field = "vsize" return ps(field, pid) def df(device): try: out = sh(f"df -k {device}").strip() except RuntimeError as err: if "device busy" in str(err).lower(): raise pytest.skip("df returned EBUSY") raise line = out.split('\n')[1] fields = line.split() sys_total = int(fields[1]) * 1024 sys_used = int(fields[2]) * 1024 sys_free = int(fields[3]) * 1024 sys_percent = float(fields[4].replace('%', '')) return (sys_total, sys_used, sys_free, sys_percent) @pytest.mark.skipif(not POSIX, reason="POSIX only") class TestProcess(PsutilTestCase): """Compare psutil results against 'ps' command line utility (mainly).""" @classmethod def setUpClass(cls): cls.pid = spawn_testproc( [PYTHON_EXE, "-E", "-O"], stdin=subprocess.PIPE ).pid @classmethod def tearDownClass(cls): terminate(cls.pid) def test_ppid(self): ppid_ps = ps('ppid', self.pid) ppid_psutil = psutil.Process(self.pid).ppid() assert ppid_ps == ppid_psutil def test_uid(self): uid_ps = ps('uid', self.pid) uid_psutil = psutil.Process(self.pid).uids().real assert uid_ps == uid_psutil def test_gid(self): gid_ps = ps('rgid', self.pid) gid_psutil = psutil.Process(self.pid).gids().real assert gid_ps == gid_psutil def test_username(self): username_ps = ps('user', self.pid) username_psutil = psutil.Process(self.pid).username() assert username_ps == username_psutil def test_username_no_resolution(self): # Emulate a case where the system can't resolve the uid to # a username in which case psutil is supposed to return # the stringified uid. p = psutil.Process() with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun: assert p.username() == str(p.uids().real) assert fun.called @skip_on_access_denied() @retry_on_failure() def test_rss_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) rss_ps = ps_rss(self.pid) rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 assert rss_ps == rss_psutil @skip_on_access_denied() @retry_on_failure() def test_vsz_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) vsz_ps = ps_vsz(self.pid) vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 assert vsz_ps == vsz_psutil def test_name(self): name_ps = ps_name(self.pid) # remove path if there is any, from the command name_ps = os.path.basename(name_ps).lower() name_psutil = psutil.Process(self.pid).name().lower() # ...because of how we calculate PYTHON_EXE; on MACOS this may # be "pythonX.Y". name_ps = re.sub(r"\d.\d", "", name_ps) name_psutil = re.sub(r"\d.\d", "", name_psutil) # ...may also be "python.X" name_ps = re.sub(r"\d", "", name_ps) name_psutil = re.sub(r"\d", "", name_psutil) assert name_ps == name_psutil def test_name_long(self): # On UNIX the kernel truncates the name to the first 15 # characters. In such a case psutil tries to determine the # full name from the cmdline. name = "long-program-name" cmdline = ["long-program-name-extended", "foo", "bar"] with mock.patch("psutil._psplatform.Process.name", return_value=name): with mock.patch( "psutil._psplatform.Process.cmdline", return_value=cmdline ): p = psutil.Process() assert p.name() == "long-program-name-extended" def test_name_long_cmdline_ad_exc(self): # Same as above but emulates a case where cmdline() raises # AccessDenied in which case psutil is supposed to return # the truncated name instead of crashing. name = "long-program-name" with mock.patch("psutil._psplatform.Process.name", return_value=name): with mock.patch( "psutil._psplatform.Process.cmdline", side_effect=psutil.AccessDenied(0, ""), ): p = psutil.Process() assert p.name() == "long-program-name" def test_name_long_cmdline_nsp_exc(self): # Same as above but emulates a case where cmdline() raises NSP # which is supposed to propagate. name = "long-program-name" with mock.patch("psutil._psplatform.Process.name", return_value=name): with mock.patch( "psutil._psplatform.Process.cmdline", side_effect=psutil.NoSuchProcess(0, ""), ): p = psutil.Process() with pytest.raises(psutil.NoSuchProcess): p.name() @pytest.mark.skipif(MACOS or BSD, reason="ps -o start not available") def test_create_time(self): time_ps = ps('start', self.pid) time_psutil = psutil.Process(self.pid).create_time() time_psutil_tstamp = datetime.datetime.fromtimestamp( time_psutil ).strftime("%H:%M:%S") # sometimes ps shows the time rounded up instead of down, so we check # for both possible values round_time_psutil = round(time_psutil) round_time_psutil_tstamp = datetime.datetime.fromtimestamp( round_time_psutil ).strftime("%H:%M:%S") assert time_ps in {time_psutil_tstamp, round_time_psutil_tstamp} def test_exe(self): ps_pathname = ps_name(self.pid) psutil_pathname = psutil.Process(self.pid).exe() try: assert ps_pathname == psutil_pathname except AssertionError: # certain platforms such as BSD are more accurate returning: # "/usr/local/bin/python3.7" # ...instead of: # "/usr/local/bin/python" # We do not want to consider this difference in accuracy # an error. adjusted_ps_pathname = ps_pathname[: len(ps_pathname)] assert ps_pathname == adjusted_ps_pathname # On macOS the official python installer exposes a python wrapper that # executes a python executable hidden inside an application bundle inside # the Python framework. # There's a race condition between the ps call & the psutil call below # depending on the completion of the execve call so let's retry on failure @retry_on_failure() def test_cmdline(self): ps_cmdline = ps_args(self.pid) psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) if AARCH64 and len(ps_cmdline) < len(psutil_cmdline): assert psutil_cmdline.startswith(ps_cmdline) else: assert ps_cmdline == psutil_cmdline # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an # incorrect value (20); the real deal is getpriority(2) which # returns 0; psutil relies on it, see: # https://github.com/giampaolo/psutil/issues/1082 # AIX has the same issue @pytest.mark.skipif(SUNOS, reason="not reliable on SUNOS") @pytest.mark.skipif(AIX, reason="not reliable on AIX") def test_nice(self): ps_nice = ps('nice', self.pid) psutil_nice = psutil.Process().nice() assert ps_nice == psutil_nice @pytest.mark.skipif(not POSIX, reason="POSIX only") class TestSystemAPIs(PsutilTestCase): """Test some system APIs.""" @retry_on_failure() def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime pids_ps = sorted(ps("pid")) pids_psutil = psutil.pids() # on MACOS and OPENBSD ps doesn't show pid 0 if MACOS or (OPENBSD and 0 not in pids_ps): pids_ps.insert(0, 0) # There will often be one more process in pids_ps for ps itself if len(pids_ps) - len(pids_psutil) > 1: difference = [x for x in pids_psutil if x not in pids_ps] + [ x for x in pids_ps if x not in pids_psutil ] raise self.fail("difference: " + str(difference)) # for some reason ifconfig -a does not report all interfaces # returned by psutil @pytest.mark.skipif(SUNOS, reason="unreliable on SUNOS") @pytest.mark.skipif(not shutil.which("ifconfig"), reason="no ifconfig cmd") @pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported") def test_nic_names(self): output = sh("ifconfig -a") for nic in psutil.net_io_counters(pernic=True): for line in output.split(): if line.startswith(nic): break else: raise self.fail( f"couldn't find {nic} nic in 'ifconfig -a'" f" output\n{output}" ) # @pytest.mark.skipif(CI_TESTING and not psutil.users(), # reason="unreliable on CI") @retry_on_failure() def test_users(self): out = sh("who -u") if not out.strip(): raise pytest.skip("no users on this system") lines = out.split('\n') users = [x.split()[0] for x in lines] terminals = [x.split()[1] for x in lines] assert len(users) == len(psutil.users()) with self.subTest(psutil=psutil.users(), who=out): for idx, u in enumerate(psutil.users()): assert u.name == users[idx] assert u.terminal == terminals[idx] if u.pid is not None: # None on OpenBSD psutil.Process(u.pid) @retry_on_failure() def test_users_started(self): out = sh("who -u") if not out.strip(): raise pytest.skip("no users on this system") tstamp = None # '2023-04-11 09:31' (Linux) started = re.findall(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d", out) if started: tstamp = "%Y-%m-%d %H:%M" else: # 'Apr 10 22:27' (macOS) started = re.findall(r"[A-Z][a-z][a-z] \d\d \d\d:\d\d", out) if started: tstamp = "%b %d %H:%M" else: # 'Apr 10' started = re.findall(r"[A-Z][a-z][a-z] \d\d", out) if started: tstamp = "%b %d" else: # 'apr 10' (sunOS) started = re.findall(r"[a-z][a-z][a-z] \d\d", out) if started: tstamp = "%b %d" started = [x.capitalize() for x in started] if not tstamp: raise pytest.skip(f"cannot interpret tstamp in who output\n{out}") with self.subTest(psutil=psutil.users(), who=out): for idx, u in enumerate(psutil.users()): psutil_value = datetime.datetime.fromtimestamp( u.started ).strftime(tstamp) assert psutil_value == started[idx] def test_pid_exists_let_raise(self): # According to "man 2 kill" possible error values for kill # are (EINVAL, EPERM, ESRCH). Test that any other errno # results in an exception. with mock.patch( "psutil._psposix.os.kill", side_effect=OSError(errno.EBADF, "") ) as m: with pytest.raises(OSError): psutil._psposix.pid_exists(os.getpid()) assert m.called def test_os_waitpid_let_raise(self): # os.waitpid() is supposed to catch EINTR and ECHILD only. # Test that any other errno results in an exception. with mock.patch( "psutil._psposix.os.waitpid", side_effect=OSError(errno.EBADF, "") ) as m: with pytest.raises(OSError): psutil._psposix.wait_pid(os.getpid()) assert m.called def test_os_waitpid_eintr(self): # os.waitpid() is supposed to "retry" on EINTR. with mock.patch( "psutil._psposix.os.waitpid", side_effect=OSError(errno.EINTR, "") ) as m: with pytest.raises(psutil._psposix.TimeoutExpired): psutil._psposix.wait_pid(os.getpid(), timeout=0.01) assert m.called def test_os_waitpid_bad_ret_status(self): # Simulate os.waitpid() returning a bad status. with mock.patch( "psutil._psposix.os.waitpid", return_value=(1, -1) ) as m: with pytest.raises(ValueError): psutil._psposix.wait_pid(os.getpid()) assert m.called # AIX can return '-' in df output instead of numbers, e.g. for /proc @pytest.mark.skipif(AIX, reason="unreliable on AIX") @retry_on_failure() def test_disk_usage(self): tolerance = 4 * 1024 * 1024 # 4MB for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) try: sys_total, sys_used, sys_free, sys_percent = df(part.device) except RuntimeError as err: # see: # https://travis-ci.org/giampaolo/psutil/jobs/138338464 # https://travis-ci.org/giampaolo/psutil/jobs/138343361 err = str(err).lower() if ( "no such file or directory" in err or "raw devices not supported" in err or "permission denied" in err ): continue raise else: assert abs(usage.total - sys_total) < tolerance assert abs(usage.used - sys_used) < tolerance assert abs(usage.free - sys_free) < tolerance assert abs(usage.percent - sys_percent) <= 1 @pytest.mark.skipif(not POSIX, reason="POSIX only") class TestMisc(PsutilTestCase): def test_getpagesize(self): pagesize = getpagesize() assert pagesize > 0 assert pagesize == resource.getpagesize() assert pagesize == mmap.PAGESIZE tests/test_unicode.py 0000644 00000024230 15030211765 0010746 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Notes about unicode handling in psutil ======================================. Starting from version 5.3.0 psutil adds unicode support, see: https://github.com/giampaolo/psutil/issues/1040 The notes below apply to *any* API returning a string such as process exe(), cwd() or username(): * all strings are encoded by using the OS filesystem encoding (sys.getfilesystemencoding()) which varies depending on the platform (e.g. "UTF-8" on macOS, "mbcs" on Win) * no API call is supposed to crash with UnicodeDecodeError * instead, in case of badly encoded data returned by the OS, the following error handlers are used to replace the corrupted characters in the string: * sys.getfilesystemencodeerrors() or "surrogatescape" on POSIX and "replace" on Windows. For a detailed explanation of how psutil handles unicode see #1040. Tests ===== List of APIs returning or dealing with a string: ('not tested' means they are not tested to deal with non-ASCII strings): * Process.cmdline() * Process.cwd() * Process.environ() * Process.exe() * Process.memory_maps() * Process.name() * Process.net_connections('unix') * Process.open_files() * Process.username() (not tested) * disk_io_counters() (not tested) * disk_partitions() (not tested) * disk_usage(str) * net_connections('unix') * net_if_addrs() (not tested) * net_if_stats() (not tested) * net_io_counters() (not tested) * sensors_fans() (not tested) * sensors_temperatures() (not tested) * users() (not tested) * WindowsService.binpath() (not tested) * WindowsService.description() (not tested) * WindowsService.display_name() (not tested) * WindowsService.name() (not tested) * WindowsService.status() (not tested) * WindowsService.username() (not tested) In here we create a unicode path with a funky non-ASCII name and (where possible) make psutil return it back (e.g. on name(), exe(), open_files(), etc.) and make sure that: * psutil never crashes with UnicodeDecodeError * the returned path matches """ import os import shutil import warnings from contextlib import closing import psutil from psutil import BSD from psutil import POSIX from psutil import WINDOWS from psutil.tests import ASCII_FS from psutil.tests import CI_TESTING from psutil.tests import HAS_ENVIRON from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import HAS_NET_CONNECTIONS_UNIX from psutil.tests import INVALID_UNICODE_SUFFIX from psutil.tests import PYPY from psutil.tests import TESTFN_PREFIX from psutil.tests import UNICODE_SUFFIX from psutil.tests import PsutilTestCase from psutil.tests import bind_unix_socket from psutil.tests import chdir from psutil.tests import copyload_shared_lib from psutil.tests import create_py_exe from psutil.tests import get_testfn from psutil.tests import pytest from psutil.tests import safe_mkdir from psutil.tests import safe_rmpath from psutil.tests import skip_on_access_denied from psutil.tests import spawn_testproc from psutil.tests import terminate def try_unicode(suffix): """Return True if both the fs and the subprocess module can deal with a unicode file name. """ sproc = None testfn = get_testfn(suffix=suffix) try: safe_rmpath(testfn) create_py_exe(testfn) sproc = spawn_testproc(cmd=[testfn]) shutil.copyfile(testfn, testfn + '-2') safe_rmpath(testfn + '-2') except (UnicodeEncodeError, OSError): return False else: return True finally: if sproc is not None: terminate(sproc) safe_rmpath(testfn) # =================================================================== # FS APIs # =================================================================== class BaseUnicodeTest(PsutilTestCase): funky_suffix = None @classmethod def setUpClass(cls): super().setUpClass() cls.skip_tests = False cls.funky_name = None if cls.funky_suffix is not None: if not try_unicode(cls.funky_suffix): cls.skip_tests = True else: cls.funky_name = get_testfn(suffix=cls.funky_suffix) create_py_exe(cls.funky_name) def setUp(self): super().setUp() if self.skip_tests: raise pytest.skip("can't handle unicode str") @pytest.mark.xdist_group(name="serial") @pytest.mark.skipif(ASCII_FS, reason="ASCII fs") class TestFSAPIs(BaseUnicodeTest): """Test FS APIs with a funky, valid, UTF8 path name.""" funky_suffix = UNICODE_SUFFIX def expect_exact_path_match(self): with warnings.catch_warnings(): warnings.simplefilter("ignore") return self.funky_name in os.listdir(".") # --- def test_proc_exe(self): cmd = [ self.funky_name, "-c", "import time; [time.sleep(0.1) for x in range(100)]", ] subp = self.spawn_testproc(cmd) p = psutil.Process(subp.pid) exe = p.exe() assert isinstance(exe, str) if self.expect_exact_path_match(): assert os.path.normcase(exe) == os.path.normcase(self.funky_name) def test_proc_name(self): cmd = [ self.funky_name, "-c", "import time; [time.sleep(0.1) for x in range(100)]", ] subp = self.spawn_testproc(cmd) name = psutil.Process(subp.pid).name() assert isinstance(name, str) if self.expect_exact_path_match(): assert name == os.path.basename(self.funky_name) def test_proc_cmdline(self): cmd = [ self.funky_name, "-c", "import time; [time.sleep(0.1) for x in range(100)]", ] subp = self.spawn_testproc(cmd) p = psutil.Process(subp.pid) cmdline = p.cmdline() for part in cmdline: assert isinstance(part, str) if self.expect_exact_path_match(): assert cmdline == cmd def test_proc_cwd(self): dname = self.funky_name + "2" self.addCleanup(safe_rmpath, dname) safe_mkdir(dname) with chdir(dname): p = psutil.Process() cwd = p.cwd() assert isinstance(p.cwd(), str) if self.expect_exact_path_match(): assert cwd == dname @pytest.mark.skipif(PYPY and WINDOWS, reason="fails on PYPY + WINDOWS") def test_proc_open_files(self): p = psutil.Process() start = set(p.open_files()) with open(self.funky_name, 'rb'): new = set(p.open_files()) path = (new - start).pop().path assert isinstance(path, str) if BSD and not path: # XXX - see https://github.com/giampaolo/psutil/issues/595 raise pytest.skip("open_files on BSD is broken") if self.expect_exact_path_match(): assert os.path.normcase(path) == os.path.normcase(self.funky_name) @pytest.mark.skipif(not POSIX, reason="POSIX only") def test_proc_net_connections(self): name = self.get_testfn(suffix=self.funky_suffix) sock = bind_unix_socket(name) with closing(sock): conn = psutil.Process().net_connections('unix')[0] assert isinstance(conn.laddr, str) assert conn.laddr == name @pytest.mark.skipif(not POSIX, reason="POSIX only") @pytest.mark.skipif( not HAS_NET_CONNECTIONS_UNIX, reason="can't list UNIX sockets" ) @skip_on_access_denied() def test_net_connections(self): def find_sock(cons): for conn in cons: if os.path.basename(conn.laddr).startswith(TESTFN_PREFIX): return conn raise ValueError("connection not found") name = self.get_testfn(suffix=self.funky_suffix) sock = bind_unix_socket(name) with closing(sock): cons = psutil.net_connections(kind='unix') conn = find_sock(cons) assert isinstance(conn.laddr, str) assert conn.laddr == name def test_disk_usage(self): dname = self.funky_name + "2" self.addCleanup(safe_rmpath, dname) safe_mkdir(dname) psutil.disk_usage(dname) @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported") @pytest.mark.skipif(PYPY, reason="unstable on PYPY") def test_memory_maps(self): with copyload_shared_lib(suffix=self.funky_suffix) as funky_path: def normpath(p): return os.path.realpath(os.path.normcase(p)) libpaths = [ normpath(x.path) for x in psutil.Process().memory_maps() ] # ...just to have a clearer msg in case of failure libpaths = [x for x in libpaths if TESTFN_PREFIX in x] assert normpath(funky_path) in libpaths for path in libpaths: assert isinstance(path, str) @pytest.mark.skipif(CI_TESTING, reason="unreliable on CI") class TestFSAPIsWithInvalidPath(TestFSAPIs): """Test FS APIs with a funky, invalid path name.""" funky_suffix = INVALID_UNICODE_SUFFIX def expect_exact_path_match(self): return True # =================================================================== # Non fs APIs # =================================================================== class TestNonFSAPIS(BaseUnicodeTest): """Unicode tests for non fs-related APIs.""" funky_suffix = UNICODE_SUFFIX @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported") @pytest.mark.skipif(PYPY and WINDOWS, reason="segfaults on PYPY + WINDOWS") def test_proc_environ(self): # Note: differently from others, this test does not deal # with fs paths. env = os.environ.copy() env['FUNNY_ARG'] = self.funky_suffix sproc = self.spawn_testproc(env=env) p = psutil.Process(sproc.pid) env = p.environ() for k, v in env.items(): assert isinstance(k, str) assert isinstance(v, str) assert env['FUNNY_ARG'] == self.funky_suffix tests/test_osx.py 0000644 00000014253 15030211765 0010135 0 ustar 00 #!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """macOS specific tests.""" import platform import re import time import psutil from psutil import MACOS from psutil import POSIX from psutil.tests import CI_TESTING from psutil.tests import HAS_BATTERY from psutil.tests import TOLERANCE_DISK_USAGE from psutil.tests import TOLERANCE_SYS_MEM from psutil.tests import PsutilTestCase from psutil.tests import pytest from psutil.tests import retry_on_failure from psutil.tests import sh from psutil.tests import spawn_testproc from psutil.tests import terminate if POSIX: from psutil._psutil_posix import getpagesize def sysctl(cmdline): """Expects a sysctl command with an argument and parse the result returning only the value of interest. """ out = sh(cmdline) result = out.split()[1] try: return int(result) except ValueError: return result def vm_stat(field): """Wrapper around 'vm_stat' cmdline utility.""" out = sh('vm_stat') for line in out.split('\n'): if field in line: break else: raise ValueError("line not found") return int(re.search(r'\d+', line).group(0)) * getpagesize() @pytest.mark.skipif(not MACOS, reason="MACOS only") class TestProcess(PsutilTestCase): @classmethod def setUpClass(cls): cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): terminate(cls.pid) def test_process_create_time(self): output = sh(f"ps -o lstart -p {self.pid}") start_ps = output.replace('STARTED', '').strip() hhmmss = start_ps.split(' ')[-2] year = start_ps.split(' ')[-1] start_psutil = psutil.Process(self.pid).create_time() assert hhmmss == time.strftime( "%H:%M:%S", time.localtime(start_psutil) ) assert year == time.strftime("%Y", time.localtime(start_psutil)) @pytest.mark.skipif(not MACOS, reason="MACOS only") class TestSystemAPIs(PsutilTestCase): # --- disk @retry_on_failure() def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh(f'df -k "{path}"').strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total = int(total) * 1024 used = int(used) * 1024 free = int(free) * 1024 return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) assert part.device == dev assert usage.total == total assert abs(usage.free - free) < TOLERANCE_DISK_USAGE assert abs(usage.used - used) < TOLERANCE_DISK_USAGE # --- cpu def test_cpu_count_logical(self): num = sysctl("sysctl hw.logicalcpu") assert num == psutil.cpu_count(logical=True) def test_cpu_count_cores(self): num = sysctl("sysctl hw.physicalcpu") assert num == psutil.cpu_count(logical=False) # TODO: remove this once 1892 is fixed @pytest.mark.skipif( MACOS and platform.machine() == 'arm64', reason="skipped due to #1892" ) def test_cpu_freq(self): freq = psutil.cpu_freq() assert freq.current * 1000 * 1000 == sysctl("sysctl hw.cpufrequency") assert freq.min * 1000 * 1000 == sysctl("sysctl hw.cpufrequency_min") assert freq.max * 1000 * 1000 == sysctl("sysctl hw.cpufrequency_max") # --- virtual mem def test_vmem_total(self): sysctl_hwphymem = sysctl('sysctl hw.memsize') assert sysctl_hwphymem == psutil.virtual_memory().total @pytest.mark.skipif( CI_TESTING and MACOS and platform.machine() == 'arm64', reason="skipped on MACOS + ARM64 + CI_TESTING", ) @retry_on_failure() def test_vmem_free(self): vmstat_val = vm_stat("free") psutil_val = psutil.virtual_memory().free assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM @retry_on_failure() def test_vmem_active(self): vmstat_val = vm_stat("active") psutil_val = psutil.virtual_memory().active assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM @retry_on_failure() def test_vmem_inactive(self): vmstat_val = vm_stat("inactive") psutil_val = psutil.virtual_memory().inactive assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM @retry_on_failure() def test_vmem_wired(self): vmstat_val = vm_stat("wired") psutil_val = psutil.virtual_memory().wired assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM # --- swap mem @retry_on_failure() def test_swapmem_sin(self): vmstat_val = vm_stat("Pageins") psutil_val = psutil.swap_memory().sin assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM @retry_on_failure() def test_swapmem_sout(self): vmstat_val = vm_stat("Pageout") psutil_val = psutil.swap_memory().sout assert abs(psutil_val - vmstat_val) < TOLERANCE_SYS_MEM # --- network def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: out = sh(f"ifconfig {name}") except RuntimeError: pass else: assert stats.isup == ('RUNNING' in out), out assert stats.mtu == int(re.findall(r'mtu (\d+)', out)[0]) # --- sensors_battery @pytest.mark.skipif(not HAS_BATTERY, reason="no battery") def test_sensors_battery(self): out = sh("pmset -g batt") percent = re.search(r"(\d+)%", out).group(1) drawing_from = re.search(r"Now drawing from '([^']+)'", out).group(1) power_plugged = drawing_from == "AC Power" psutil_result = psutil.sensors_battery() assert psutil_result.power_plugged == power_plugged assert psutil_result.percent == int(percent) tests/__pycache__/test_aix.cpython-310.pyc 0000644 00000005771 15030211765 0014451 0 ustar 00 o �h8 � @ sj d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ejj e dd�G d d � d e��Z dS )zAIX specific tests.� N)�AIX)�PsutilTestCase)�pytest)�shzAIX only)�reasonc @ s4 e Zd Zdd� Zdd� Zdd� Zdd� Zd d � ZdS )�AIXSpecificTestCasec C s� t d�}d}dD ] }|d|� d�7 }qt�||�}|d usJ �d}t|�d��| }t|�d��| }t|�d ��| }t|�d ��| } t�� } d| | }| j|ksVJ �t| j | �|k saJ �t| j | �|k slJ �t| j| �|k swJ �d S )Nz/usr/bin/svmon -O unit=KBz memory\s*)�size�inuse�free�pin�virtual� available�mmode�(?P<�>\S+)\s+i r r r r � )r �re�search�int�group�psutil�virtual_memory�total�abs�usedr r )�self�out� re_pattern�field�matchobj�KBr r r r � psutil_result�TOLERANCE_SYS_MEM� r# �H/usr/local/CyberCP/lib/python3.10/site-packages/psutil/tests/test_aix.py�test_virtual_memory s"