from test.libregrtest.cmdline import _parse_args, Namespace
from test.libregrtest.runtest import (
findtests, split_test_packages, run_single_test, abs_module_name,
- PROGRESS_MIN_TIME, State, RunTests, TestResult,
+ PROGRESS_MIN_TIME, State, RunTests, TestResult, HuntRefleak,
FilterTuple, FilterDict, TestList)
from test.libregrtest.setup import setup_tests, setup_test_dir
from test.libregrtest.pgo import setup_pgo_tests
self.pgo_extended: bool = ns.pgo_extended
self.output_on_failure: bool = ns.verbose3
self.timeout: float | None = ns.timeout
+ self.verbose: bool = ns.verbose
+ self.quiet: bool = ns.quiet
+ if ns.huntrleaks:
+ self.hunt_refleak: HuntRefleak = HuntRefleak(*ns.huntrleaks)
+ else:
+ self.hunt_refleak = None
+ self.test_dir: str | None = ns.testdir
+ self.junit_filename: str | None = ns.xmlpath
# tests
self.tests = []
print(line, flush=True)
def display_progress(self, test_index, text):
- quiet = self.ns.quiet
- if quiet:
+ if self.quiet:
return
# "[ 51/405/1] test_tcl passed"
def find_tests(self):
ns = self.ns
single = ns.single
- test_dir = ns.testdir
if single:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
exclude_tests.add(arg)
ns.args = []
- alltests = findtests(testdir=test_dir, exclude=exclude_tests)
+ alltests = findtests(testdir=self.test_dir,
+ exclude=exclude_tests)
if not self.fromfile:
self.selected = self.tests or ns.args
print(test.id())
def list_cases(self):
- ns = self.ns
- test_dir = ns.testdir
support.verbose = False
support.set_match_tests(self.match_tests, self.ignore_tests)
skipped = []
for test_name in self.selected:
- module_name = abs_module_name(test_name, test_dir)
+ module_name = abs_module_name(test_name, self.test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
# Configure the runner to re-run tests
ns = self.ns
- ns.verbose = True
if ns.use_mp is None:
ns.use_mp = 1
runtests = runtests.copy(
tests=tuple(tests),
rerun=True,
+ verbose=True,
forever=False,
fail_fast=False,
match_tests_dict=match_tests_dict,
def display_result(self, runtests):
pgo = runtests.pgo
- quiet = self.ns.quiet
print_slow = self.ns.print_slow
# If running the test suite for PGO then no one cares about results.
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
- if self.good and not quiet:
+ if self.good and not self.quiet:
print()
if (not self.bad
and not self.skipped
count(len(self.environment_changed), "test")))
printlist(self.environment_changed)
- if self.skipped and not quiet:
+ if self.skipped and not self.quiet:
print()
print(count(len(self.skipped), "test"), "skipped:")
printlist(self.skipped)
- if self.resource_denied and not quiet:
+ if self.resource_denied and not self.quiet:
print()
print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
printlist(self.resource_denied)
print(f"Result: {result}")
def save_xml_result(self):
- if not self.ns.xmlpath and not self.testsuite_xml:
+ if not self.junit_filename and not self.testsuite_xml:
return
import xml.etree.ElementTree as ET
for k, v in totals.items():
root.set(k, str(v))
- xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
+ xmlpath = os.path.join(os_helper.SAVEDCWD, self.junit_filename)
with open(xmlpath, 'wb') as f:
for s in ET.tostringlist(root):
f.write(s)
ns = self.ns
self.tests = tests
- if ns.xmlpath:
+ if self.junit_filename:
support.junit_xml_list = self.testsuite_xml = []
strip_py_suffix(ns.args)
return exitcode
def action_run_tests(self):
- if self.ns.huntrleaks:
- warmup, repetitions, _ = self.ns.huntrleaks
- if warmup < 3:
- msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
- "3 warmup repetitions can give false positives!")
- print(msg, file=sys.stdout, flush=True)
+ if self.hunt_refleak and self.hunt_refleak.warmups < 3:
+ msg = ("WARNING: Running tests with --huntrleaks/-R and "
+ "less than 3 warmup repetitions can give false positives!")
+ print(msg, file=sys.stdout, flush=True)
# For a partial run, we do not need to clutter the output.
if (self.want_header
- or not(self.pgo or self.ns.quiet or self.ns.single
+ or not(self.pgo or self.quiet or self.ns.single
or self.tests or self.ns.args)):
self.display_header()
pgo=self.pgo,
pgo_extended=self.pgo_extended,
output_on_failure=self.output_on_failure,
- timeout=self.timeout)
+ timeout=self.timeout,
+ verbose=self.verbose,
+ quiet=self.quiet,
+ hunt_refleak=self.hunt_refleak,
+ test_dir=self.test_dir,
+ junit_filename=self.junit_filename)
setup_tests(runtests, self.ns)
if self.want_wait:
input("Press any key to continue...")
- setup_test_dir(self.ns.testdir)
+ setup_test_dir(self.test_dir)
self.find_tests()
exitcode = 0
from inspect import isabstract
from test import support
from test.support import os_helper
+from test.libregrtest.runtest import HuntRefleak
from test.libregrtest.utils import clear_caches
try:
cls._abc_negative_cache, cls._abc_negative_cache_version)
-def dash_R(ns, test_name, test_func):
+def runtest_refleak(test_name, test_func,
+ hunt_refleak: HuntRefleak,
+ quiet: bool):
"""Run a test multiple times, looking for reference leaks.
Returns:
def get_pooled_int(value):
return int_pool.setdefault(value, value)
- nwarmup, ntracked, fname = ns.huntrleaks
- fname = os.path.join(os_helper.SAVEDCWD, fname)
- repcount = nwarmup + ntracked
+ warmups = hunt_refleak.warmups
+ runs = hunt_refleak.runs
+ filename = hunt_refleak.filename
+ filename = os.path.join(os_helper.SAVEDCWD, filename)
+ repcount = warmups + runs
# Pre-allocate to ensure that the loop doesn't allocate anything new
rep_range = list(range(repcount))
# initialize variables to make pyflakes quiet
rc_before = alloc_before = fd_before = interned_before = 0
- if not ns.quiet:
+ if not quiet:
print("beginning", repcount, "repetitions", file=sys.stderr)
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
rc_after = gettotalrefcount() - interned_after * 2
fd_after = fd_count()
- if not ns.quiet:
+ if not quiet:
print('.', end='', file=sys.stderr, flush=True)
rc_deltas[i] = get_pooled_int(rc_after - rc_before)
fd_before = fd_after
interned_before = interned_after
- if not ns.quiet:
+ if not quiet:
print(file=sys.stderr)
# These checkers return False on success, True on failure
(fd_deltas, 'file descriptors', check_fd_deltas)
]:
# ignore warmup runs
- deltas = deltas[nwarmup:]
+ deltas = deltas[warmups:]
if checker(deltas):
msg = '%s leaked %s %s, sum=%s' % (
test_name, deltas, item_name, sum(deltas))
print(msg, file=sys.stderr, flush=True)
- with open(fname, "a", encoding="utf-8") as refrep:
+ with open(filename, "a", encoding="utf-8") as refrep:
print(msg, file=refrep)
refrep.flush()
failed = True
FilterDict = dict[str, FilterTuple]
+@dataclasses.dataclass(slots=True, frozen=True)
+class HuntRefleak:
+ warmups: int
+ runs: int
+ filename: str
+
+
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
pgo_extended: bool = False
output_on_failure: bool = False
timeout: float | None = None
+ verbose: bool = False
+ quiet: bool = False
+ hunt_refleak: HuntRefleak | None = None
+ test_dir: str | None = None
+ junit_filename: str | None = None
def copy(self, **override):
state = dataclasses.asdict(self)
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-def findtests(*, testdir=None, exclude=(),
+def findtests(*, testdir: str | None =None, exclude=(),
split_test_dirs=SPLITTESTDIRS, base_mod=""):
"""Return a list of all applicable test modules."""
testdir = findtestdir(testdir)
return sorted(tests)
-def split_test_packages(tests, *, testdir=None, exclude=(),
+def split_test_packages(tests, *, testdir: str | None = None, exclude=(),
split_test_dirs=SPLITTESTDIRS):
testdir = findtestdir(testdir)
splitted = []
support.PGO_EXTENDED = runtests.pgo_extended
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
support.failfast = runtests.fail_fast
- support.verbose = ns.verbose
- if ns.xmlpath:
+ support.verbose = runtests.verbose
+ if runtests.junit_filename:
support.junit_xml_list = []
else:
support.junit_xml_list = None
def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
- verbose = ns.verbose
+ verbose = runtests.verbose
output_on_failure = runtests.output_on_failure
timeout = runtests.timeout
else:
# Tell tests to be moderately quiet
support.verbose = verbose
- _runtest_env_changed_exc(result, runtests, ns, display_failure=not verbose)
+ _runtest_env_changed_exc(result, runtests, ns,
+ display_failure=not verbose)
xml_list = support.junit_xml_list
if xml_list:
Returns a TestResult.
- If ns.xmlpath is not None, xml_data is a list containing each
+ If runtests.junit_filename is not None, xml_data is a list containing each
generated testsuite element.
"""
start_time = time.perf_counter()
return support.run_unittest(tests)
-def save_env(test_name: str, runtests: RunTests, ns: Namespace):
- return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=runtests.pgo)
+def save_env(test_name: str, runtests: RunTests):
+ return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
+ pgo=runtests.pgo)
-def regrtest_runner(result, test_func, ns) -> None:
+def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
- refleak, test_result = dash_R(ns, result.test_name, test_func)
+ if runtests.hunt_refleak:
+ from test.libregrtest.refleak import runtest_refleak
+ refleak, test_result = runtest_refleak(result.test_name, test_func,
+ runtests.hunt_refleak,
+ runtests.quiet)
else:
test_result = test_func()
refleak = False
def _load_run_test(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
# Load the test function, run the test function.
- module_name = abs_module_name(result.test_name, ns.testdir)
+ module_name = abs_module_name(result.test_name, runtests.test_dir)
# Remove the module from sys.module to reload it if it was already imported
sys.modules.pop(module_name, None)
return run_unittest(test_mod)
try:
- with save_env(result.test_name, runtests, ns):
- regrtest_runner(result, test_func, ns)
+ with save_env(result.test_name, runtests):
+ regrtest_runner(result, test_func, runtests)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# failures.
support.gc_collect()
- remove_testfn(result.test_name, ns.verbose)
+ remove_testfn(result.test_name, runtests.verbose)
if gc.garbage:
support.environment_altered = True
pgo = runtests.pgo
if pgo:
display_failure = False
+ quiet = runtests.quiet
test_name = result.test_name
try:
clear_caches()
support.gc_collect()
- with save_env(test_name, runtests, ns):
+ with save_env(test_name, runtests):
_load_run_test(result, runtests, ns)
except support.ResourceDenied as msg:
- if not ns.quiet and not pgo:
+ if not quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.RESOURCE_DENIED
return
except unittest.SkipTest as msg:
- if not ns.quiet and not pgo:
+ if not quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.SKIPPED
return
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
- setup_test_dir(ns.testdir)
+ setup_test_dir(runtests.test_dir)
setup_tests(runtests, ns)
if runtests.rerun:
print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
else:
print(f"Re-running {test_name} in verbose mode", flush=True)
- ns.verbose = True
result = run_single_test(test_name, runtests, ns)
print() # Force a newline (just in case)
UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD"
-def setup_test_dir(testdir):
+def setup_test_dir(testdir: str | None) -> None:
if testdir:
# Prepend test directory to sys.path, so runtest() will be able
# to locate tests
if getattr(module, '__file__', None):
module.__file__ = os.path.abspath(module.__file__)
- if ns.huntrleaks:
+ if runtests.hunt_refleak:
unittest.BaseTestSuite._cleanup = False
if ns.memlimit is not None:
if ns.threshold is not None:
gc.set_threshold(ns.threshold)
- support.suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2)
+ support.suppress_msvcrt_asserts(runtests.verbose and runtests.verbose >= 2)
support.use_resources = ns.use_resources
support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout)
support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout)
- if ns.xmlpath:
+ if runtests.junit_filename:
from test.support.testresult import RegressionTestResult
RegressionTestResult.USE_XML = True