# Copyright 2021-2022 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Process statistics PyTest plugin interface.
"""
import os
from collections import OrderedDict
from typing import Dict
from typing import ItemsView
from typing import Iterator
from typing import Optional
from typing import Union
import attr
import psutil
import pytest
from pytestskipmarkers.utils import platform
if False: # TYPE_CHECKING # pylint: disable=using-constant-test
# pylint: disable=import-error,unused-import,no-name-in-module
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import SubRequest
from _pytest.main import Session
from _pytest.reporter import TerminalReporter
from _pytest.reports import TestReport
# pylint: enable=import-error,unused-import,no-name-in-module
[docs]@attr.s(kw_only=True, slots=True, hash=True)
class StatsProcesses:
"""
Class which holds the processes being tracked.
"""
processes = attr.ib(
init=False, default=attr.Factory(OrderedDict), hash=False
) # type: Dict[str, psutil.Process]
[docs] def add(self, display_name: str, process: Union[int, psutil.Process]) -> None:
"""
Add a process to track.
"""
if isinstance(process, int):
# This is a process pid
process = psutil.Process(process)
self.processes[display_name] = process
[docs] def remove(self, display_name: str) -> None:
"""
Remove tracked process.
"""
self.processes.pop(display_name, None)
[docs] def items(self) -> ItemsView[str, psutil.Process]:
"""
Return the tracked items.
"""
return self.processes.items()
def __iter__(self) -> Iterator[str]:
"""
Iterate over tracked processes.
"""
return iter(self.processes)
[docs]@attr.s(kw_only=True, slots=True, hash=True)
class SystemStatsReporter:
"""
Tracked processes pytest reporter.
"""
config = attr.ib(repr=False, hash=False) # type: "Config"
stats_processes = attr.ib(repr=False, hash=False) # type: Optional[StatsProcesses]
terminalreporter = attr.ib(repr=False, hash=False) # type: "TerminalReporter"
show_sys_stats = attr.ib(init=False) # type: bool
sys_stats_no_children = attr.ib(init=False) # type: bool
sys_stats_mem_type = attr.ib(init=False) # type: str
def __attrs_post_init__(self) -> None:
"""
Initialization routines, port attrs initialization.
"""
self.show_sys_stats = (
self.config.getoption("--sys-stats") is True
and self.config.getoption("--no-sys-stats") is False
)
self.sys_stats_no_children = self.config.getoption("--sys-stats-no-children") is True
if self.config.getoption("--sys-stats-uss-mem") is True:
self.sys_stats_mem_type = "uss"
if platform.is_freebsd():
# FreeBSD doesn't apparently support uss
self.sys_stats_mem_type = "rss"
else:
self.sys_stats_mem_type = "rss"
[docs] @pytest.hookimpl(trylast=True) # type: ignore[misc]
def pytest_runtest_logreport(self, report: "TestReport") -> None:
"""
Pytest logreport hook.
"""
if self.terminalreporter.verbosity <= 0:
return
if report.when != "call":
return
if self.show_sys_stats is False:
return
if self.terminalreporter.verbosity > 1:
assert self.stats_processes # Make mypy happy
remove_from_stats = set()
self.terminalreporter.ensure_newline()
self.terminalreporter.section("Processes Statistics", sep="-", bold=True)
left_padding = len(max(["System"] + list(self.stats_processes), key=len))
template = (
" ...{dots} {name} - CPU: {cpu:6.2f} % MEM: {mem:6.2f} % (Virtual Memory)"
)
stats = {
"name": "System",
"dots": "." * (left_padding - len("System")),
"cpu": psutil.cpu_percent(),
"mem": psutil.virtual_memory().percent,
}
swap = psutil.swap_memory().percent
if swap > 0:
template += " SWAP: {swap:6.2f} %"
stats["swap"] = swap
template += "\n"
self.terminalreporter.write(template.format(**stats))
template = " ...{dots} {name} - CPU: {cpu:6.2f} % MEM: {mem:6.2f} % ({m_type})"
children_template = (
template + " MEM SUM: {c_mem} % ({m_type}) CHILD PROCS: {c_count}\n"
)
no_children_template = template + "\n"
for name, psproc in self.stats_processes.items():
template = no_children_template
dots = "." * (left_padding - len(name))
pids = []
try:
with psproc.oneshot():
stats = {
"name": name,
"dots": dots,
"cpu": psproc.cpu_percent(),
"mem": psproc.memory_percent(self.sys_stats_mem_type),
"m_type": self.sys_stats_mem_type.upper(),
}
if self.sys_stats_no_children is False:
pids.append(psproc.pid)
children = psproc.children(recursive=True)
if children:
template = children_template
stats["c_count"] = 0
c_mem = stats["mem"]
for child in children:
if child.pid in pids: # pragma: no cover
continue
pids.append(child.pid)
if not psutil.pid_exists(child.pid): # pragma: no cover
remove_from_stats.add(name)
continue
try:
c_mem += child.memory_percent(self.sys_stats_mem_type)
stats["c_count"] += 1
except (
psutil.AccessDenied,
psutil.NoSuchProcess,
): # pragma: no cover
continue
if stats["c_count"]:
stats["c_mem"] = "{:6.2f}".format(c_mem)
else: # pragma: no cover
template = no_children_template
self.terminalreporter.write(template.format(**stats))
except psutil.NoSuchProcess: # pragma: no cover
remove_from_stats.add(name)
continue
if remove_from_stats: # pragma: no cover
for name in remove_from_stats:
self.stats_processes.remove(name)
[docs]def pytest_addoption(parser: "Parser") -> None:
"""
Register argparse-style options and ini-style config values.
"""
output_options_group = parser.getgroup("Output Options")
output_options_group.addoption(
"--sys-stats",
default=False,
action="store_true",
help="Print System CPU and MEM statistics after each test execution.",
)
output_options_group.addoption(
"--no-sys-stats",
default=False,
action="store_true",
help="Do not print System CPU and MEM statistics after each test execution.",
)
output_options_group.addoption(
"--sys-stats-no-children",
default=False,
action="store_true",
help="Don't include child processes memory statistics.",
)
output_options_group.addoption(
"--sys-stats-uss-mem",
default=False,
action="store_true",
help='Use the USS("Unique Set Size", memory unique to a process which would be freed if the process was '
"terminated) memory instead which is more expensive to calculate.",
)
[docs]@pytest.hookimpl(trylast=True) # type: ignore[misc]
def pytest_sessionstart(session: "Session") -> None:
"""
Pytest session start routines.
"""
if (
session.config.getoption("--sys-stats") is True
and session.config.getoption("--no-sys-stats") is False
):
stats_processes_instance = StatsProcesses()
stats_processes_instance.add("Test Suite Run", os.getpid())
else:
stats_processes_instance = None
session.config.pluginmanager.register(stats_processes_instance, "sysstats-processes")
terminalreporter = session.config.pluginmanager.getplugin(
"terminalreporter"
) # type: "TerminalReporter"
sys_stats_reporter = SystemStatsReporter(
config=session.config,
stats_processes=stats_processes_instance,
terminalreporter=terminalreporter,
)
session.config.pluginmanager.register(sys_stats_reporter, "sysstats-reporter")
[docs]@pytest.fixture(scope="session") # type: ignore[misc]
def stats_processes(request: "SubRequest") -> StatsProcesses:
"""
Session scoped process statistics tracker.
"""
plugin = request.config.pluginmanager.get_plugin("sysstats-processes") # type: StatsProcesses
return plugin