import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
- findtests, runtest, get_abs_module, is_failed,
- STDTESTS, NOTTESTS, PROGRESS_MIN_TIME,
- Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted,
- ChildError, DidNotRun)
+ findtests, split_test_packages, runtest, get_abs_module,
+ PROGRESS_MIN_TIME, State)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import removepy, count, format_duration, printlist
+from test.libregrtest.utils import (removepy, count, format_duration,
+ printlist, get_build_info)
from test import support
+from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
self.good = []
self.bad = []
self.skipped = []
- self.resource_denieds = []
+ self.resource_denied = []
self.environment_changed = []
self.run_no_tests = []
self.need_rerun = []
self.rerun = []
self.first_result = None
self.interrupted = False
+ self.stats_dict: dict[str, TestStats] = {}
# used by --slow
self.test_times = []
self.tracer = None
# used to display the progress bar "[ 3/100]"
- self.start_time = time.monotonic()
+ self.start_time = time.perf_counter()
self.test_count = ''
self.test_count_width = 1
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denieds) | set(self.environment_changed)
+ | set(self.resource_denied) | set(self.environment_changed)
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
- test_name = result.name
-
- if not isinstance(result, (ChildError, Interrupted)) and not rerun:
- self.test_times.append((result.duration_sec, test_name))
-
- if isinstance(result, Passed):
- self.good.append(test_name)
- elif isinstance(result, ResourceDenied):
- self.skipped.append(test_name)
- self.resource_denieds.append(test_name)
- elif isinstance(result, Skipped):
- self.skipped.append(test_name)
- elif isinstance(result, EnvChanged):
- self.environment_changed.append(test_name)
- elif isinstance(result, Failed):
- if not rerun:
- self.bad.append(test_name)
- self.need_rerun.append(result)
- elif isinstance(result, DidNotRun):
- self.run_no_tests.append(test_name)
- elif isinstance(result, Interrupted):
- self.interrupted = True
- else:
- raise ValueError("invalid test result: %r" % result)
+ test_name = result.test_name
+
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
- if rerun and not isinstance(result, (Failed, Interrupted)):
+ match result.state:
+ case State.PASSED:
+ self.good.append(test_name)
+ case State.ENV_CHANGED:
+ self.environment_changed.append(test_name)
+ case State.SKIPPED:
+ self.skipped.append(test_name)
+ case State.RESOURCE_DENIED:
+ self.skipped.append(test_name)
+ self.resource_denied.append(test_name)
+ case State.INTERRUPTED:
+ self.interrupted = True
+ case State.DID_NOT_RUN:
+ self.run_no_tests.append(test_name)
+ case _:
+ if result.is_failed(self.ns.fail_env_changed):
+ if not rerun:
+ self.bad.append(test_name)
+ self.need_rerun.append(result)
+ else:
+ raise ValueError(f"invalid test state: {state!r}")
+
+ if result.stats is not None:
+ self.stats_dict[result.test_name] = result.stats
+
+ if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
self.bad.remove(test_name)
xml_data = result.xml_data
line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
- test_time = time.monotonic() - self.start_time
+ test_time = time.perf_counter() - self.start_time
mins, secs = divmod(int(test_time), 60)
hours, mins = divmod(mins, 60)
# add default PGO tests if no tests are specified
setup_pgo_tests(self.ns)
- stdtests = STDTESTS[:]
- nottests = NOTTESTS.copy()
+ exclude = set()
if self.ns.exclude:
for arg in self.ns.args:
- if arg in stdtests:
- stdtests.remove(arg)
- nottests.add(arg)
+ exclude.add(arg)
self.ns.args = []
- # if testdir is set, then we are not running the python tests suite, so
- # don't add default tests to be executed or skipped (pass empty values)
- if self.ns.testdir:
- alltests = findtests(self.ns.testdir, list(), set())
- else:
- alltests = findtests(self.ns.testdir, stdtests, nottests)
+ alltests = findtests(testdir=self.ns.testdir, exclude=exclude)
if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args or alltests
+ self.selected = self.tests or self.ns.args
+ if self.selected:
+ self.selected = split_test_packages(self.selected)
+ else:
+ self.selected = alltests
else:
self.selected = self.tests
+
if self.ns.single:
self.selected = self.selected[:1]
try:
rerun_list = list(self.need_rerun)
self.need_rerun.clear()
for result in rerun_list:
- test_name = result.name
+ test_name = result.test_name
self.rerun.append(test_name)
errors = result.errors or []
self.accumulate_result(result, rerun=True)
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
break
if self.bad:
previous_test = None
for test_index, test_name in enumerate(self.tests, 1):
- start_time = time.monotonic()
+ start_time = time.perf_counter()
text = test_name
if previous_test:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
break
previous_test = str(result)
- test_time = time.monotonic() - start_time
+ test_time = time.perf_counter() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
- elif isinstance(result, Passed):
+ elif result.state == State.PASSED:
# be quiet: say nothing if the test passed shortly
previous_test = None
if module not in save_modules and module.startswith("test."):
support.unload(module)
- if self.ns.failfast and is_failed(result, self.ns):
+ if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
break
if previous_test:
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
+ print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))
+ self.display_sanitizers()
+
+ def display_sanitizers(self):
+ # This makes it easier to remember what to set in your local
+ # environment when trying to reproduce a sanitizer failure.
+ asan = support.check_sanitizer(address=True)
+ msan = support.check_sanitizer(memory=True)
+ ubsan = support.check_sanitizer(ub=True)
+ sanitizers = []
+ if asan:
+ sanitizers.append("address")
+ if msan:
+ sanitizers.append("memory")
+ if ubsan:
+ sanitizers.append("undefined behavior")
+ if not sanitizers:
+ return
+
+ print(f"== sanitizers: {', '.join(sanitizers)}")
+ for sanitizer, env_var in (
+ (asan, "ASAN_OPTIONS"),
+ (msan, "MSAN_OPTIONS"),
+ (ubsan, "UBSAN_OPTIONS"),
+ ):
+ options= os.environ.get(env_var)
+ if sanitizer and options is not None:
+ print(f"== {env_var}={options!r}")
+
+ def no_tests_run(self):
+ return not any((self.good, self.bad, self.skipped, self.interrupted,
+ self.environment_changed))
def get_tests_result(self):
result = []
result.append("FAILURE")
elif self.ns.fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
- elif not any((self.good, self.bad, self.skipped, self.interrupted,
- self.environment_changed)):
- result.append("NO TEST RUN")
+ elif self.no_tests_run():
+ result.append("NO TESTS RAN")
if self.interrupted:
result.append("INTERRUPTED")
coverdir=self.ns.coverdir)
print()
- duration = time.monotonic() - self.start_time
- print("Total duration: %s" % format_duration(duration))
- print("Tests result: %s" % self.get_tests_result())
+ self.display_summary()
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
+ def display_summary(self):
+ duration = time.perf_counter() - self.start_time
+
+ # Total duration
+ print("Total duration: %s" % format_duration(duration))
+
+ # Total tests
+ total = TestStats()
+ for stats in self.stats_dict.values():
+ total.accumulate(stats)
+ stats = [f'run={total.tests_run:,}']
+ if total.failures:
+ stats.append(f'failures={total.failures:,}')
+ if total.skipped:
+ stats.append(f'skipped={total.skipped:,}')
+ print(f"Total tests: {' '.join(stats)}")
+
+ # Total test files
+ report = [f'success={len(self.good)}']
+ if self.bad:
+ report.append(f'failed={len(self.bad)}')
+ if self.environment_changed:
+ report.append(f'env_changed={len(self.environment_changed)}')
+ if self.skipped:
+ report.append(f'skipped={len(self.skipped)}')
+ if self.resource_denied:
+ report.append(f'resource_denied={len(self.resource_denied)}')
+ if self.rerun:
+ report.append(f'rerun={len(self.rerun)}')
+ if self.run_no_tests:
+ report.append(f'run_no_tests={len(self.run_no_tests)}')
+ print(f"Total test files: {' '.join(report)}")
+
+ # Result
+ result = self.get_tests_result()
+ print(f"Result: {result}")
+
def save_xml_result(self):
if not self.ns.xmlpath and not self.testsuite_xml:
return
self.save_xml_result()
if self.bad:
- sys.exit(2)
+ sys.exit(EXITCODE_BAD_TEST)
if self.interrupted:
- sys.exit(130)
+ sys.exit(EXITCODE_INTERRUPTED)
if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(3)
+ sys.exit(EXITCODE_ENV_CHANGED)
+ if self.no_tests_run():
+ sys.exit(EXITCODE_NO_TESTS_RAN)
sys.exit(0)
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
+ results = None
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
for i in rep_range:
- test_func()
+ results = test_func()
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
print(msg, file=refrep)
refrep.flush()
failed = True
- return failed
+ return (failed, results)
def dash_R_cleanup(fs, ps, pic, zdc, abcs):
+import dataclasses
+import doctest
import faulthandler
import functools
import gc
import unittest
from test import support
+from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
from test.libregrtest.cmdline import Namespace
from test.libregrtest.utils import clear_caches, format_duration, print_warning
+# Avoid enum.Enum to reduce the number of imports when tests are run
+class State:
+ PASSED = "PASSED"
+ FAILED = "FAILED"
+ SKIPPED = "SKIPPED"
+ UNCAUGHT_EXC = "UNCAUGHT_EXC"
+ REFLEAK = "REFLEAK"
+ ENV_CHANGED = "ENV_CHANGED"
+ RESOURCE_DENIED = "RESOURCE_DENIED"
+ INTERRUPTED = "INTERRUPTED"
+ MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
+ DID_NOT_RUN = "DID_NOT_RUN"
+ TIMEOUT = "TIMEOUT"
+
+ @staticmethod
+ def is_failed(state):
+ return state in {
+ State.FAILED,
+ State.UNCAUGHT_EXC,
+ State.REFLEAK,
+ State.MULTIPROCESSING_ERROR,
+ State.TIMEOUT}
+
+ @staticmethod
+ def has_meaningful_duration(state):
+ # Consider that the duration is meaningless for these cases.
+ # For example, if a whole test file is skipped, its duration
+ # is unlikely to be the duration of executing its tests,
+ # but just the duration to execute code which skips the test.
+ return state not in {
+ State.SKIPPED,
+ State.RESOURCE_DENIED,
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR,
+ State.DID_NOT_RUN}
+
+
+@dataclasses.dataclass(slots=True)
class TestResult:
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- ) -> None:
- self.name = name
- self.duration_sec = duration_sec
- self.xml_data = xml_data
-
- def __str__(self) -> str:
- return f"{self.name} finished"
-
-
-class Passed(TestResult):
- def __str__(self) -> str:
- return f"{self.name} passed"
-
-
-class Failed(TestResult):
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- errors: list[tuple[str, str]] | None = None,
- failures: list[tuple[str, str]] | None = None,
- ) -> None:
- super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
- self.errors = errors
- self.failures = failures
-
- def __str__(self) -> str:
+ test_name: str
+ state: str | None = None
+ # Test duration in seconds
+ duration: float | None = None
+ xml_data: list[str] | None = None
+ stats: TestStats | None = None
+
+ # errors and failures copied from support.TestFailedWithDetails
+ errors: list[tuple[str, str]] | None = None
+ failures: list[tuple[str, str]] | None = None
+
+ def is_failed(self, fail_env_changed: bool) -> bool:
+ if self.state == State.ENV_CHANGED:
+ return fail_env_changed
+ return State.is_failed(self.state)
+
+ def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
+ return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
- return f"{self.name} failed ({le} {error_s})"
+ return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({lf} {failure_s})"
-
- return f"{self.name} failed"
-
-
-class UncaughtException(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (uncaught exception)"
-
-
-class EnvChanged(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (env changed)"
-
-
-class RefLeak(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (reference leak)"
-
-
-class Skipped(TestResult):
- def __str__(self) -> str:
- return f"{self.name} skipped"
-
-
-class ResourceDenied(Skipped):
- def __str__(self) -> str:
- return f"{self.name} skipped (resource denied)"
-
-
-class Interrupted(TestResult):
- def __str__(self) -> str:
- return f"{self.name} interrupted"
-
-
-class ChildError(Failed):
- def __str__(self) -> str:
- return f"{self.name} crashed"
-
-
-class DidNotRun(TestResult):
- def __str__(self) -> str:
- return f"{self.name} ran no tests"
+ return f"{self.test_name} failed ({lf} {failure_s})"
+ return f"{self.test_name} failed"
-class Timeout(Failed):
def __str__(self) -> str:
- return f"{self.name} timed out ({format_duration(self.duration_sec)})"
+ match self.state:
+ case State.PASSED:
+ return f"{self.test_name} passed"
+ case State.FAILED:
+ return self._format_failed()
+ case State.SKIPPED:
+ return f"{self.test_name} skipped"
+ case State.UNCAUGHT_EXC:
+ return f"{self.test_name} failed (uncaught exception)"
+ case State.REFLEAK:
+ return f"{self.test_name} failed (reference leak)"
+ case State.ENV_CHANGED:
+ return f"{self.test_name} failed (env changed)"
+ case State.RESOURCE_DENIED:
+ return f"{self.test_name} skipped (resource denied)"
+ case State.INTERRUPTED:
+ return f"{self.test_name} interrupted"
+ case State.MULTIPROCESSING_ERROR:
+ return f"{self.test_name} process crashed"
+ case State.DID_NOT_RUN:
+ return f"{self.test_name} ran no tests"
+ case State.TIMEOUT:
+ return f"{self.test_name} timed out ({format_duration(self.duration)})"
+ case _:
+ raise ValueError("unknown result state: {state!r}")
+
+ def has_meaningful_duration(self):
+ return State.has_meaningful_duration(self.state)
+
+ def set_env_changed(self):
+ if self.state is None or self.state == State.PASSED:
+ self.state = State.ENV_CHANGED
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
PROGRESS_MIN_TIME = 30.0 # seconds
-# small set of tests to determine if we have a basically functioning interpreter
-# (i.e. if any of these fail, then anything else is likely to follow)
-STDTESTS = [
- 'test_grammar',
- 'test_opcodes',
- 'test_dict',
- 'test_builtin',
- 'test_exceptions',
- 'test_types',
- 'test_unittest',
- 'test_doctest',
- 'test_doctest2',
- 'test_support'
-]
-
-# set of tests that we don't want to be executed when using regrtest
-NOTTESTS = set()
+#If these test directories are encountered recurse into them and treat each
+# test_ .py or dir as a separate test module. This can increase parallelism.
+# Beware this can't generally be done for any directory with sub-tests as the
+# __init__.py may do things which alter what tests are to be run.
+SPLITTESTDIRS = {
+ "test_asyncio",
+}
# Storage of uncollectable objects
FOUND_GARBAGE = []
-def is_failed(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, EnvChanged):
- return ns.fail_env_changed
- return isinstance(result, Failed)
-
-
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
+def findtests(*, testdir=None, exclude=(),
+ split_test_dirs=SPLITTESTDIRS, base_mod=""):
"""Return a list of all applicable test modules."""
testdir = findtestdir(testdir)
- names = os.listdir(testdir)
tests = []
- others = set(stdtests) | nottests
- for name in names:
+ for name in os.listdir(testdir):
mod, ext = os.path.splitext(name)
- if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
- tests.append(mod)
- return stdtests + sorted(tests)
+ if (not mod.startswith("test_")) or (mod in exclude):
+ continue
+ if mod in split_test_dirs:
+ subdir = os.path.join(testdir, mod)
+ mod = f"{base_mod or 'test'}.{mod}"
+ tests.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs, base_mod=mod))
+ elif ext in (".py", ""):
+ tests.append(f"{base_mod}.{mod}" if base_mod else mod)
+ return sorted(tests)
+
+
+def split_test_packages(tests, *, testdir=None, exclude=(),
+ split_test_dirs=SPLITTESTDIRS):
+ testdir = findtestdir(testdir)
+ splitted = []
+ for name in tests:
+ if name in split_test_dirs:
+ subdir = os.path.join(testdir, name)
+ splitted.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs,
+ base_mod=name))
+ else:
+ splitted.append(name)
+ return splitted
def get_abs_module(ns: Namespace, test_name: str) -> str:
return 'test.' + test_name
-def _runtest(ns: Namespace, test_name: str) -> TestResult:
- # Handle faulthandler timeout, capture stdout+stderr, XML serialization
- # and measure time.
+def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
+ # Capture stdout and stderr, set faulthandler timeout,
+ # and create JUnit XML report.
output_on_failure = ns.verbose3
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
- start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests, ns.ignore_tests)
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
- result = _runtest_inner(ns, test_name,
- display_failure=False)
- if not isinstance(result, Passed):
+ _runtest_env_changed_exc(result, ns, display_failure=False)
+ # Ignore output if the test passed successfully
+ if result.state != State.PASSED:
output = stream.getvalue()
finally:
sys.stdout = orig_stdout
# Tell tests to be moderately quiet
support.verbose = ns.verbose
- result = _runtest_inner(ns, test_name,
- display_failure=not ns.verbose)
+ _runtest_env_changed_exc(result, ns,
+ display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
- result.xml_data = [
- ET.tostring(x).decode('us-ascii')
- for x in xml_list
- ]
-
- result.duration_sec = time.perf_counter() - start_time
- return result
+ result.xml_data = [ET.tostring(x).decode('us-ascii')
+ for x in xml_list]
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
ns -- regrtest namespace of options
test_name -- the name of the test
- Returns a TestResult sub-class depending on the kind of result received.
+ Returns a TestResult.
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
+ start_time = time.perf_counter()
+ result = TestResult(test_name)
try:
- return _runtest(ns, test_name)
+ _runtest_capture_output_timeout_junit(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
- return Failed(test_name)
+ result.state = State.UNCAUGHT_EXC
+ result.duration = time.perf_counter() - start_time
+ return result
def _test_module(the_module):
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
- support.run_unittest(tests)
+ return support.run_unittest(tests)
def save_env(ns: Namespace, test_name: str):
return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
-def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
- # Load the test function, run the test function, handle huntrleaks
- # to detect leaks.
+def regrtest_runner(result, test_func, ns) -> None:
+ # Run test_func(), collect statistics, and detect reference and memory
+ # leaks.
+
+ if ns.huntrleaks:
+ from test.libregrtest.refleak import dash_R
+ refleak, test_result = dash_R(ns, result.test_name, test_func)
+ else:
+ test_result = test_func()
+ refleak = False
+
+ if refleak:
+ result.state = State.REFLEAK
+
+ match test_result:
+ case TestStats():
+ stats = test_result
+ case unittest.TestResult():
+ stats = TestStats.from_unittest(test_result)
+ case doctest.TestResults():
+ stats = TestStats.from_doctest(test_result)
+ case None:
+ print_warning(f"{result.test_name} test runner returned None: {test_func}")
+ stats = None
+ case _:
+ print_warning(f"Unknown test result type: {type(test_result)}")
+ stats = None
+
+ result.stats = stats
+
- abstest = get_abs_module(ns, test_name)
+def _load_run_test(result: TestResult, ns: Namespace) -> None:
+ # Load the test function, run the test function.
+
+ abstest = get_abs_module(ns, result.test_name)
# remove the module from sys.module to reload it if it was already imported
try:
the_module = importlib.import_module(abstest)
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
-
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- test_runner = functools.partial(_test_module, the_module)
+ test_func = getattr(the_module, "test_main", None)
+ if test_func is None:
+ test_func = functools.partial(_test_module, the_module)
try:
- with save_env(ns, test_name):
- if ns.huntrleaks:
- # Return True if the test leaked references
- refleak = dash_R(ns, test_name, test_runner)
- else:
- test_runner()
- refleak = False
+ with save_env(ns, result.test_name):
+ regrtest_runner(result, test_func, ns)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# failures.
support.gc_collect()
- cleanup_test_droppings(test_name, ns.verbose)
+ cleanup_test_droppings(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
- print_warning(f"{test_name} created {len(gc.garbage)} "
+ print_warning(f"{result.test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
support.reap_children()
- return refleak
-
-def _runtest_inner(
- ns: Namespace, test_name: str, display_failure: bool = True
-) -> TestResult:
+def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
+ display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
if ns.pgo:
display_failure = False
+ test_name = result.test_name
try:
clear_caches()
support.gc_collect()
with save_env(ns, test_name):
- refleak = _runtest_inner2(ns, test_name)
+ _load_run_test(result, ns)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
- return ResourceDenied(test_name)
+ result.state = State.RESOURCE_DENIED
+ return
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
- return Skipped(test_name)
+ result.state = State.SKIPPED
+ return
except support.TestFailedWithDetails as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
- return Failed(test_name, errors=exc.errors, failures=exc.failures)
+ result.state = State.FAILED
+ result.errors = exc.errors
+ result.failures = exc.failures
+ result.stats = exc.stats
+ return
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
- return Failed(test_name)
+ result.state = State.FAILED
+ result.stats = exc.stats
+ return
except support.TestDidNotRun:
- return DidNotRun(test_name)
+ result.state = State.DID_NOT_RUN
+ return
except KeyboardInterrupt:
print()
- return Interrupted(test_name)
+ result.state = State.INTERRUPTED
+ return
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
- return UncaughtException(test_name)
+ result.state = State.UNCAUGHT_EXC
+ return
- if refleak:
- return RefLeak(test_name)
if support.environment_altered:
- return EnvChanged(test_name)
- return Passed(test_name)
+ result.set_env_changed()
+ # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
+ if result.state is None:
+ result.state = State.PASSED
def cleanup_test_droppings(test_name: str, verbose: int) -> None:
+import dataclasses
import faulthandler
import json
-import os
+import os.path
import queue
import signal
import subprocess
from test import support
from test.support import os_helper
+from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
- runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME)
+ runtest, TestResult, State,
+ PROGRESS_MIN_TIME)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
def must_stop(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
return True
- if ns.failfast and is_failed(result, ns):
+ if ns.failfast and result.is_failed(ns.fail_env_changed):
return True
return False
return (ns, test_name)
-def run_test_in_subprocess(testname: str, ns: Namespace, stdout_fh: TextIO) -> subprocess.Popen:
+def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen:
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
'-m', 'test.regrtest',
'--worker-args', worker_args]
+ env = dict(os.environ)
+ if tmp_dir is not None:
+ env['TMPDIR'] = tmp_dir
+ env['TEMP'] = tmp_dir
+ env['TMP'] = tmp_dir
+
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
kw = dict(
+ env=env,
stdout=stdout_fh,
# bpo-45410: Write stderr into stdout to keep messages order
stderr=stdout_fh,
class MultiprocessResult(NamedTuple):
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
- stdout: str
- error_msg: str
+ worker_stdout: str | None = None
+ err_msg: str | None = None
ExcStr = str
def mp_result_error(
self,
test_result: TestResult,
- stdout: str = '',
+ stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
- test_result.duration_sec = time.monotonic() - self.start_time
return MultiprocessResult(test_result, stdout, err_msg)
- def _run_process(self, test_name: str, stdout_fh: TextIO) -> int:
- self.start_time = time.monotonic()
-
+ def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
self.current_test_name = test_name
try:
- popen = run_test_in_subprocess(test_name, self.ns, stdout_fh)
+ popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
self._killed = False
self._popen = popen
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
- retcode = self._run_process(test_name, stdout_fh)
+ if not support.is_wasi:
+ # Don't check for leaked temporary files and directories if Python is
+ # run on WASI. WASI don't pass environment variables like TMPDIR to
+ # worker processes.
+ tmp_dir = tempfile.mkdtemp(prefix="test_python_")
+ tmp_dir = os.path.abspath(tmp_dir)
+ try:
+ retcode = self._run_process(test_name, tmp_dir, stdout_fh)
+ finally:
+ tmp_files = os.listdir(tmp_dir)
+ os_helper.rmtree(tmp_dir)
+ else:
+ retcode = self._run_process(test_name, None, stdout_fh)
+ tmp_files = ()
stdout_fh.seek(0)
try:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
- return self.mp_result_error(ChildError(test_name), '', err_msg)
+ result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
+ return self.mp_result_error(result, err_msg=err_msg)
if retcode is None:
- return self.mp_result_error(Timeout(test_name), stdout)
+ result = TestResult(test_name, state=State.TIMEOUT)
+ return self.mp_result_error(result, stdout)
err_msg = None
if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
- stdout, _, result = stdout.rpartition("\n")
+ stdout, _, worker_json = stdout.rpartition("\n")
stdout = stdout.rstrip()
- if not result:
+ if not worker_json:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
- result = json.loads(result, object_hook=decode_test_result)
+ result = json.loads(worker_json,
+ object_hook=decode_test_result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
- if err_msg is not None:
- return self.mp_result_error(ChildError(test_name), stdout, err_msg)
+ if err_msg:
+ result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
+ return self.mp_result_error(result, stdout, err_msg)
+
+ if tmp_files:
+ msg = (f'\n\n'
+ f'Warning -- {test_name} leaked temporary files '
+ f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
+ stdout += msg
+ result.set_env_changed()
- return MultiprocessResult(result, stdout, err_msg)
+ return MultiprocessResult(result, stdout)
def run(self) -> None:
while not self._stopped:
except StopIteration:
break
+ self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
+ mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
if must_stop(mp_result.result, self.ns):
result = mp_result.result
text = str(result)
- if mp_result.error_msg is not None:
- # CHILD_ERROR
- text += ' (%s)' % mp_result.error_msg
- elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
- text += ' (%s)' % format_duration(result.duration_sec)
+ if mp_result.err_msg:
+ # MULTIPROCESSING_ERROR
+ text += ' (%s)' % mp_result.err_msg
+ elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
- result = ChildError("<regrtest worker>")
+ result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
self.regrtest.accumulate_result(result)
return True
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
- if mp_result.stdout:
- print(mp_result.stdout, flush=True)
+ if mp_result.worker_stdout:
+ print(mp_result.worker_stdout, flush=True)
if must_stop(mp_result.result, self.ns):
return True
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
- result = vars(o)
+ result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
return super().default(o)
-def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
+def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
return d
- cls_name = d.pop("__test_result__")
- for cls in get_all_test_result_classes():
- if cls.__name__ == cls_name:
- return cls(**d)
-
-
-def get_all_test_result_classes() -> set[type[TestResult]]:
- prev_count = 0
- classes = {TestResult}
- while len(classes) > prev_count:
- prev_count = len(classes)
- to_add = []
- for cls in classes:
- to_add.extend(cls.__subclasses__())
- classes.update(to_add)
- return classes
+ d.pop('__test_result__')
+ if d['stats'] is not None:
+ d['stats'] = TestStats(**d['stats'])
+ return TestResult(**d)
class saved_test_environment:
"""Save bits of the test environment and restore them at block exit.
- with saved_test_environment(testname, verbose, quiet):
+ with saved_test_environment(test_name, verbose, quiet):
#stuff
Unless quiet is True, a warning is printed to stderr if any of
items is also printed.
"""
- def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
- self.testname = testname
+ def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
+ self.test_name = test_name
self.verbose = verbose
self.quiet = quiet
self.pgo = pgo
restore(original)
if not self.quiet and not self.pgo:
print_warning(
- f"{name} was modified by {self.testname}\n"
+ f"{name} was modified by {self.test_name}\n"
f" Before: {original}\n"
f" After: {current} ")
return False
import math
import os.path
import sys
+import sysconfig
import textwrap
from test import support
else:
for f in typing._cleanups:
f()
+
+ try:
+ fractions = sys.modules['fractions']
+ except KeyError:
+ pass
+ else:
+ fractions._hash_algorithm.cache_clear()
+
+
+def get_build_info():
+ # Get most important configure and build options as a list of strings.
+ # Example: ['debug', 'ASAN+MSAN'] or ['release', 'LTO+PGO'].
+
+ config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
+ cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
+ cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or ''
+ ldflags_nodist = sysconfig.get_config_var('PY_LDFLAGS_NODIST') or ''
+
+ build = []
+
+ # --disable-gil
+ if sysconfig.get_config_var('Py_NOGIL'):
+ build.append("nogil")
+
+ if hasattr(sys, 'gettotalrefcount'):
+ # --with-pydebug
+ build.append('debug')
+
+ if '-DNDEBUG' in (cflags + cflags_nodist):
+ build.append('without_assert')
+ else:
+ build.append('release')
+
+ if '--with-assertions' in config_args:
+ build.append('with_assert')
+ elif '-DNDEBUG' not in (cflags + cflags_nodist):
+ build.append('with_assert')
+
+ # --enable-framework=name
+ framework = sysconfig.get_config_var('PYTHONFRAMEWORK')
+ if framework:
+ build.append(f'framework={framework}')
+
+ # --enable-shared
+ shared = int(sysconfig.get_config_var('PY_ENABLE_SHARED') or '0')
+ if shared:
+ build.append('shared')
+
+ # --with-lto
+ optimizations = []
+ if '-flto=thin' in ldflags_nodist:
+ optimizations.append('ThinLTO')
+ elif '-flto' in ldflags_nodist:
+ optimizations.append('LTO')
+
+ # --enable-optimizations
+ pgo_options = (
+ # GCC
+ '-fprofile-use',
+ # clang: -fprofile-instr-use=code.profclangd
+ '-fprofile-instr-use',
+ # ICC
+ "-prof-use",
+ )
+ if any(option in cflags_nodist for option in pgo_options):
+ optimizations.append('PGO')
+ if optimizations:
+ build.append('+'.join(optimizations))
+
+ # --with-address-sanitizer
+ sanitizers = []
+ if support.check_sanitizer(address=True):
+ sanitizers.append("ASAN")
+ # --with-memory-sanitizer
+ if support.check_sanitizer(memory=True):
+ sanitizers.append("MSAN")
+ # --with-undefined-behavior-sanitizer
+ if support.check_sanitizer(ub=True):
+ sanitizers.append("UBSAN")
+ if sanitizers:
+ build.append('+'.join(sanitizers))
+
+ # --with-trace-refs
+ if hasattr(sys, 'getobjects'):
+ build.append("TraceRefs")
+ # --enable-pystats
+ if hasattr(sys, '_stats_on'):
+ build.append("pystats")
+ # --with-valgrind
+ if sysconfig.get_config_var('WITH_VALGRIND'):
+ build.append("valgrind")
+ # --with-dtrace
+ if sysconfig.get_config_var('WITH_DTRACE'):
+ build.append("dtrace")
+
+ return build
"_PYTHON_PROJECT_BASE",
"_PYTHON_SYSCONFIGDATA_NAME",
"__PYVENV_LAUNCHER__",
+
+ # Sanitizer options
+ "ASAN_OPTIONS",
+ "LSAN_OPTIONS",
+ "MSAN_OPTIONS",
+ "TSAN_OPTIONS",
+ "UBSAN_OPTIONS",
))
for name, value in os.environ.items():
uname = name.upper()
'PY_STDMODULE_CFLAGS',
'Py_DEBUG',
'Py_ENABLE_SHARED',
+ 'Py_NOGIL',
'SHELL',
'SOABI',
'prefix',
raise ImportError('support must be imported from the test package')
import contextlib
+import dataclasses
import functools
import getpass
import os
class TestFailed(Error):
"""Test failed."""
+ def __init__(self, msg, *args, stats=None):
+ self.msg = msg
+ self.stats = stats
+ super().__init__(msg, *args)
+
+ def __str__(self):
+ return self.msg
class TestFailedWithDetails(TestFailed):
"""Test failed."""
- def __init__(self, msg, errors, failures):
- self.msg = msg
+ def __init__(self, msg, errors, failures, stats):
self.errors = errors
self.failures = failures
- super().__init__(msg, errors, failures)
-
- def __str__(self):
- return self.msg
+ super().__init__(msg, errors, failures, stats=stats)
class TestDidNotRun(Error):
"""Test did not run any subtests."""
raise ValueError('At least one of address, memory, or ub must be True')
- _cflags = sysconfig.get_config_var('CFLAGS') or ''
- _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
+ cflags = sysconfig.get_config_var('CFLAGS') or ''
+ config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
memory_sanitizer = (
- '-fsanitize=memory' in _cflags or
- '--with-memory-sanitizer' in _config_args
+ '-fsanitize=memory' in cflags or
+ '--with-memory-sanitizer' in config_args
)
address_sanitizer = (
- '-fsanitize=address' in _cflags or
- '--with-memory-sanitizer' in _config_args
+ '-fsanitize=address' in cflags or
+ '--with-address-sanitizer' in config_args
)
ub_sanitizer = (
- '-fsanitize=undefined' in _cflags or
- '--with-undefined-behavior-sanitizer' in _config_args
+ '-fsanitize=undefined' in cflags or
+ '--with-undefined-behavior-sanitizer' in config_args
)
return (
(memory and memory_sanitizer) or
newtests.append(test)
suite._tests = newtests
+@dataclasses.dataclass(slots=True)
+class TestStats:
+ tests_run: int = 0
+ failures: int = 0
+ skipped: int = 0
+
+ @staticmethod
+ def from_unittest(result):
+ return TestStats(result.testsRun,
+ len(result.failures),
+ len(result.skipped))
+
+ @staticmethod
+ def from_doctest(results):
+ return TestStats(results.attempted,
+ results.failed)
+
+ def accumulate(self, stats):
+ self.tests_run += stats.tests_run
+ self.failures += stats.failures
+ self.skipped += stats.skipped
+
+
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
if not result.testsRun and not result.skipped and not result.errors:
raise TestDidNotRun
if not result.wasSuccessful():
+ stats = TestStats.from_unittest(result)
if len(result.errors) == 1 and not result.failures:
err = result.errors[0][1]
elif len(result.failures) == 1 and not result.errors:
if not verbose: err += "; run in verbose mode for details"
errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
- raise TestFailedWithDetails(err, errors, failures)
+ raise TestFailedWithDetails(err, errors, failures, stats=stats)
+ return result
# By default, don't filter tests
else:
suite.addTest(loader.loadTestsFromTestCase(cls))
_filter_suite(suite, match_test)
- _run_suite(suite)
+ return _run_suite(suite)
#=======================================================================
# Check for the presence of docstrings.
else:
verbosity = None
- f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
- if f:
- raise TestFailed("%d of %d doctests failed" % (f, t))
+ results = doctest.testmod(module,
+ verbose=verbosity,
+ optionflags=optionflags)
+ if results.failed:
+ stats = TestStats.from_doctest(results)
+ raise TestFailed(f"{results.failed} of {results.attempted} "
+ f"doctests failed",
+ stats=stats)
if verbose:
print('doctest (%s) ... %d tests with zero failures' %
- (module.__name__, t))
- return f, t
+ (module.__name__, results.attempted))
+ return results
#=======================================================================
('anonymous', '', 'pass'))
def test_main():
- run_unittest(NetrcTestCase)
+ return run_unittest(NetrcTestCase)
if __name__ == "__main__":
test_main()
def test_main(verbose=False):
from test import support
from test import test_pep646_syntax
- support.run_doctest(test_pep646_syntax, verbose)
+ return support.run_doctest(test_pep646_syntax, verbose)
if __name__ == "__main__":
test_main(verbose=True)
import unittest
from test import libregrtest
from test import support
-from test.support import os_helper
+from test.support import os_helper, TestStats
from test.libregrtest import utils, setup
if not support.has_subprocess_support:
self.fail("%r not found in %r" % (regex, output))
return match
- def check_line(self, output, regex):
+ def check_line(self, output, regex, full=False):
+ if full:
+ regex += '\n'
regex = re.compile(r'^' + regex, re.MULTILINE)
self.assertRegex(output, regex)
def check_executed_tests(self, output, tests, skipped=(), failed=(),
env_changed=(), omitted=(),
- rerun={}, no_test_ran=(),
+ rerun={}, run_no_tests=(),
+ resource_denied=(),
randomize=False, interrupted=False,
- fail_env_changed=False):
+ fail_env_changed=False,
+ *, stats):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
skipped = [skipped]
+ if isinstance(resource_denied, str):
+ resource_denied = [resource_denied]
if isinstance(failed, str):
failed = [failed]
if isinstance(env_changed, str):
env_changed = [env_changed]
if isinstance(omitted, str):
omitted = [omitted]
- if isinstance(no_test_ran, str):
- no_test_ran = [no_test_ran]
+ if isinstance(run_no_tests, str):
+ run_no_tests = [run_no_tests]
+ if isinstance(stats, int):
+ stats = TestStats(stats)
executed = self.parse_executed_tests(output)
if randomize:
regex = LOG_PREFIX + f"Re-running {name} in verbose mode \\(matching: {match}\\)"
self.check_line(output, regex)
- if no_test_ran:
- regex = list_regex('%s test%s run no tests', no_test_ran)
+ if run_no_tests:
+ regex = list_regex('%s test%s run no tests', run_no_tests)
self.check_line(output, regex)
good = (len(tests) - len(skipped) - len(failed)
- - len(omitted) - len(env_changed) - len(no_test_ran))
+ - len(omitted) - len(env_changed) - len(run_no_tests))
if good:
regex = r'%s test%s OK\.$' % (good, plural(good))
if not skipped and not failed and good > 1:
if interrupted:
self.check_line(output, 'Test suite interrupted by signal SIGINT.')
+ # Total tests
+ parts = [f'run={stats.tests_run:,}']
+ if stats.failures:
+ parts.append(f'failures={stats.failures:,}')
+ if stats.skipped:
+ parts.append(f'skipped={stats.skipped:,}')
+ line = fr'Total tests: {" ".join(parts)}'
+ self.check_line(output, line, full=True)
+
+ # Total test files
+ report = [f'success={good}']
+ if failed:
+ report.append(f'failed={len(failed)}')
+ if env_changed:
+ report.append(f'env_changed={len(env_changed)}')
+ if skipped:
+ report.append(f'skipped={len(skipped)}')
+ if resource_denied:
+ report.append(f'resource_denied={len(resource_denied)}')
+ if rerun:
+ report.append(f'rerun={len(rerun)}')
+ if run_no_tests:
+ report.append(f'run_no_tests={len(run_no_tests)}')
+ line = fr'Total test files: {" ".join(report)}'
+ self.check_line(output, line, full=True)
+
+ # Result
result = []
if failed:
result.append('FAILURE')
result.append('INTERRUPTED')
if not any((good, result, failed, interrupted, skipped,
env_changed, fail_env_changed)):
- result.append("NO TEST RUN")
+ result.append("NO TESTS RAN")
elif not result:
result.append('SUCCESS')
result = ', '.join(result)
if rerun:
- self.check_line(output, 'Tests result: FAILURE')
result = 'FAILURE then %s' % result
-
- self.check_line(output, 'Tests result: %s' % result)
+ self.check_line(output, f'Result: {result}', full=True)
def parse_random_seed(self, output):
match = self.regex_search(r'Using random seed ([0-9]+)', output)
def check_output(self, output):
self.parse_random_seed(output)
- self.check_executed_tests(output, self.tests, randomize=True)
+ self.check_executed_tests(output, self.tests,
+ randomize=True, stats=len(self.tests))
def run_tests(self, args):
output = self.run_python(args)
test_failing = self.create_test('failing', code=code)
tests = [test_ok, test_failing]
- output = self.run_tests(*tests, exitcode=2)
- self.check_executed_tests(output, tests, failed=test_failing)
+ output = self.run_tests(*tests, exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, tests, failed=test_failing,
+ stats=TestStats(2, 1))
def test_resources(self):
# test -u command line option
# -u all: 2 resources enabled
output = self.run_tests('-u', 'all', *test_names)
- self.check_executed_tests(output, test_names)
+ self.check_executed_tests(output, test_names, stats=2)
# -u audio: 1 resource enabled
output = self.run_tests('-uaudio', *test_names)
self.check_executed_tests(output, test_names,
- skipped=tests['network'])
+ skipped=tests['network'],
+ resource_denied=tests['network'],
+ stats=1)
# no option: 0 resources enabled
output = self.run_tests(*test_names)
self.check_executed_tests(output, test_names,
- skipped=test_names)
+ skipped=test_names,
+ resource_denied=test_names,
+ stats=0)
def test_random(self):
# test -r and --randseed command line option
test = self.create_test('random', code)
# first run to get the output with the random seed
- output = self.run_tests('-r', test)
+ output = self.run_tests('-r', test, exitcode=EXITCODE_NO_TESTS_RAN)
randseed = self.parse_random_seed(output)
match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output)
test_random = int(match.group(1))
# try to reproduce with the random seed
- output = self.run_tests('-r', '--randseed=%s' % randseed, test)
+ output = self.run_tests('-r', '--randseed=%s' % randseed, test,
+ exitcode=EXITCODE_NO_TESTS_RAN)
randseed2 = self.parse_random_seed(output)
self.assertEqual(randseed2, randseed)
previous = name
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ stats = len(tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format '[2/7] test_opcodes'
with open(filename, "w") as fp:
print("[%s/%s] %s" % (index, len(tests), name), file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format 'test_opcodes'
with open(filename, "w") as fp:
print(name, file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format 'Lib/test/test_opcodes.py'
with open(filename, "w") as fp:
print('Lib/test/%s.py' % name, file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
def test_interrupted(self):
code = TEST_INTERRUPTED
test = self.create_test('sigint', code=code)
- output = self.run_tests(test, exitcode=130)
+ output = self.run_tests(test, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test, omitted=test,
- interrupted=True)
+ interrupted=True, stats=0)
def test_slowest(self):
# test --slowest
tests = [self.create_test() for index in range(3)]
output = self.run_tests("--slowest", *tests)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=len(tests))
regex = ('10 slowest tests:\n'
'(?:- %s: .*\n){%s}'
% (self.TESTNAME_REGEX, len(tests)))
args = ("--slowest", "-j2", test)
else:
args = ("--slowest", test)
- output = self.run_tests(*args, exitcode=130)
+ output = self.run_tests(*args, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test,
- omitted=test, interrupted=True)
+ omitted=test, interrupted=True,
+ stats=0)
regex = ('10 slowest tests:\n')
self.check_line(output, regex)
# test --coverage
test = self.create_test('coverage')
output = self.run_tests("--coverage", test)
- self.check_executed_tests(output, [test])
+ self.check_executed_tests(output, [test], stats=1)
regex = (r'lines +cov% +module +\(path\)\n'
r'(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+')
self.check_line(output, regex)
builtins.__dict__['RUN'] = 1
""")
test = self.create_test('forever', code=code)
- output = self.run_tests('--forever', test, exitcode=2)
- self.check_executed_tests(output, [test]*3, failed=test)
+ output = self.run_tests('--forever', test, exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, [test]*3, failed=test,
+ stats=TestStats(1, 1))
def check_leak(self, code, what):
test = self.create_test('huntrleaks', code=code)
filename = 'reflog.txt'
self.addCleanup(os_helper.unlink, filename)
output = self.run_tests('--huntrleaks', '3:3:', test,
- exitcode=2,
+ exitcode=EXITCODE_BAD_TEST,
stderr=subprocess.STDOUT)
- self.check_executed_tests(output, [test], failed=test)
+ self.check_executed_tests(output, [test], failed=test, stats=1)
line = 'beginning 6 repetitions\n123456\n......\n'
self.check_line(output, re.escape(line))
crash_test = self.create_test(name="crash", code=code)
tests = [crash_test]
- output = self.run_tests("-j2", *tests, exitcode=2)
+ output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=crash_test,
- randomize=True)
+ randomize=True, stats=0)
def parse_methods(self, output):
regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
# don't fail by default
output = self.run_tests(testname)
- self.check_executed_tests(output, [testname], env_changed=testname)
+ self.check_executed_tests(output, [testname],
+ env_changed=testname, stats=1)
# fail with --fail-env-changed
- output = self.run_tests("--fail-env-changed", testname, exitcode=3)
+ output = self.run_tests("--fail-env-changed", testname,
+ exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname], env_changed=testname,
- fail_env_changed=True)
+ fail_env_changed=True, stats=1)
def test_rerun_fail(self):
# FAILURE then FAILURE
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=2)
+ output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
- failed=testname, rerun={testname: "test_fail_always"})
+ failed=testname,
+ rerun={testname: "test_fail_always"},
+ stats=TestStats(1, 1))
def test_rerun_success(self):
# FAILURE then SUCCESS
output = self.run_tests("-w", testname, exitcode=0)
self.check_executed_tests(output, [testname],
- rerun={testname: "test_fail_once"})
+ rerun={testname: "test_fail_once"},
+ stats=1)
def test_rerun_setup_class_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"})
+ rerun={testname: "ExampleTests"},
+ stats=0)
def test_rerun_teardown_class_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"})
+ rerun={testname: "ExampleTests"},
+ stats=1)
def test_rerun_setup_module_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: testname})
+ rerun={testname: testname},
+ stats=0)
def test_rerun_teardown_module_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: testname})
+ rerun={testname: testname},
+ stats=1)
def test_rerun_setup_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_teardown_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_async_setup_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_async_teardown_hook_failure(self):
# FAILURE then FAILURE
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_no_tests_ran(self):
code = textwrap.dedent("""
""")
testname = self.create_test(code=code)
- output = self.run_tests(testname, "-m", "nosuchtest", exitcode=0)
- self.check_executed_tests(output, [testname], no_test_ran=testname)
+ output = self.run_tests(testname, "-m", "nosuchtest",
+ exitcode=EXITCODE_NO_TESTS_RAN)
+ self.check_executed_tests(output, [testname],
+ run_no_tests=testname,
+ stats=0)
def test_no_tests_ran_skip(self):
code = textwrap.dedent("""
""")
testname = self.create_test(code=code)
- output = self.run_tests(testname, exitcode=0)
- self.check_executed_tests(output, [testname])
+ output = self.run_tests(testname)
+ self.check_executed_tests(output, [testname],
+ stats=TestStats(1, skipped=1))
def test_no_tests_ran_multiple_tests_nonexistent(self):
code = textwrap.dedent("""
testname = self.create_test(code=code)
testname2 = self.create_test(code=code)
- output = self.run_tests(testname, testname2, "-m", "nosuchtest", exitcode=0)
+ output = self.run_tests(testname, testname2, "-m", "nosuchtest",
+ exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname, testname2],
- no_test_ran=[testname, testname2])
+ run_no_tests=[testname, testname2],
+ stats=0)
def test_no_test_ran_some_test_exist_some_not(self):
code = textwrap.dedent("""
output = self.run_tests(testname, testname2, "-m", "nosuchtest",
"-m", "test_other_bug", exitcode=0)
self.check_executed_tests(output, [testname, testname2],
- no_test_ran=[testname])
+ run_no_tests=[testname],
+ stats=1)
@support.cpython_only
def test_uncollectable(self):
""")
testname = self.create_test(code=code)
- output = self.run_tests("--fail-env-changed", testname, exitcode=3)
+ output = self.run_tests("--fail-env-changed", testname,
+ exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
def test_multiprocessing_timeout(self):
code = textwrap.dedent(r"""
""")
testname = self.create_test(code=code)
- output = self.run_tests("-j2", "--timeout=1.0", testname, exitcode=2)
+ output = self.run_tests("-j2", "--timeout=1.0", testname,
+ exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
- failed=testname)
+ failed=testname, stats=0)
self.assertRegex(output,
re.compile('%s timed out' % testname, re.MULTILINE))
""")
testname = self.create_test(code=code)
- output = self.run_tests("--fail-env-changed", "-v", testname, exitcode=3)
+ output = self.run_tests("--fail-env-changed", "-v", testname,
+ exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertIn("Warning -- Unraisable exception", output)
self.assertIn("Exception: weakref callback bug", output)
""")
testname = self.create_test(code=code)
- output = self.run_tests("--fail-env-changed", "-v", testname, exitcode=3)
+ output = self.run_tests("--fail-env-changed", "-v", testname,
+ exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertIn("Warning -- Uncaught thread exception", output)
self.assertIn("Exception: bug in thread", output)
for option in ("-v", "-W"):
with self.subTest(option=option):
cmd = ["--fail-env-changed", option, testname]
- output = self.run_tests(*cmd, exitcode=3)
+ output = self.run_tests(*cmd, exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertRegex(output, regex)
def test_unicode_guard_env(self):
for name in names:
self.assertFalse(os.path.exists(name), name)
+ @unittest.skipIf(support.is_wasi,
+ 'checking temp files is not implemented on WASI')
+ def test_leak_tmp_file(self):
+ code = textwrap.dedent(r"""
+ import os.path
+ import tempfile
+ import unittest
+
+ class FileTests(unittest.TestCase):
+ def test_leak_tmp_file(self):
+ filename = os.path.join(tempfile.gettempdir(), 'mytmpfile')
+ with open(filename, "wb") as fp:
+ fp.write(b'content')
+ """)
+ testnames = [self.create_test(code=code) for _ in range(3)]
+
+ output = self.run_tests("--fail-env-changed", "-v", "-j2", *testnames,
+ exitcode=EXITCODE_ENV_CHANGED)
+ self.check_executed_tests(output, testnames,
+ env_changed=testnames,
+ fail_env_changed=True,
+ randomize=True,
+ stats=len(testnames))
+ for testname in testnames:
+ self.assertIn(f"Warning -- {testname} leaked temporary "
+ f"files (1): mytmpfile",
+ output)
+
def test_mp_decode_error(self):
# gh-101634: If a worker stdout cannot be decoded, report a failed test
# and a non-zero exit code.
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
- randomize=True)
+ randomize=True,
+ stats=0)
+
+ def test_doctest(self):
+ code = textwrap.dedent(fr'''
+ import doctest
+ import sys
+ from test import support
+
+ def my_function():
+ """
+ Pass:
+
+ >>> 1 + 1
+ 2
+
+ Failure:
+
+ >>> 2 + 3
+ 23
+ >>> 1 + 1
+ 11
+
+ Skipped test (ignored):
+
+ >>> id(1.0) # doctest: +SKIP
+ 7948648
+ """
+
+ def test_main():
+ testmod = sys.modules[__name__]
+ return support.run_doctest(testmod)
+ ''')
+ testname = self.create_test(code=code)
+
+ output = self.run_tests("--fail-env-changed", "-v", "-j1", testname,
+ exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, [testname],
+ failed=[testname],
+ randomize=True,
+ stats=TestStats(3, 2, 0))
class TestUtils(unittest.TestCase):
old_factories = None
try:
- support.run_unittest(*test_classes)
+ return support.run_unittest(*test_classes)
finally:
from xml.etree import ElementPath
# Restore mapping and path cache
--- /dev/null
+regrtest now checks if a test leaks temporary files or directories if run
+with -jN option. Patch by Victor Stinner.
--- /dev/null
+On Windows, when the Python test suite is run with the ``-jN`` option, the
+ANSI code page is now used as the encoding for the stdout temporary file,
+rather than using UTF-8 which can lead to decoding errors. Patch by Victor
+Stinner.
--- /dev/null
+The Python test suite now fails wit exit code 4 if no tests ran. It should
+help detecting typos in test names and test methods.
--- /dev/null
+The Python test runner (libregrtest) now logs Python build information like
+"debug" vs "release" build, or LTO and PGO optimizations. Patch by Victor
+Stinner.
--- /dev/null
+When running the Python test suite with ``-jN`` option, if a worker stdout
+cannot be decoded from the locale encoding report a failed testn so the
+exitcode is non-zero. Patch by Victor Stinner.
--- /dev/null
+``regrtest`` now computes statistics on all tests: successes, failures and
+skipped. ``test_netrc``, ``test_pep646_syntax`` and ``test_xml_etree`` now
+return results in their ``test_main()`` function. Patch by Victor Stinner
+and Alex Waygood.