from test.libregrtest.cmdline import _parse_args, Namespace
from test.libregrtest.runtest import (
findtests, split_test_packages, run_single_test, abs_module_name,
- PROGRESS_MIN_TIME, State, RunTests, TestResult, HuntRefleak,
- FilterTuple, FilterDict, TestList, StrPath, StrJSON, TestName)
+ PROGRESS_MIN_TIME, State, RunTests, HuntRefleak,
+ FilterTuple, TestList, StrPath, StrJSON, TestName)
from test.libregrtest.setup import setup_tests, setup_test_dir
from test.libregrtest.pgo import setup_pgo_tests
+from test.libregrtest.results import TestResults
from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
printlist, get_build_info)
from test import support
-from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
# Must be smaller than buildbot "1200 seconds without output" limit.
EXIT_TIMEOUT = 120.0
-EXITCODE_BAD_TEST = 2
-EXITCODE_ENV_CHANGED = 3
-EXITCODE_NO_TESTS_RAN = 4
-EXITCODE_RERUN_FAIL = 5
-EXITCODE_INTERRUPTED = 130
-
class Regrtest:
"""Execute a test suite.
# tests
self.tests = []
- self.selected = []
+ self.selected: TestList = []
self.first_runtests: RunTests | None = None
# test results
- self.good: TestList = []
- self.bad: TestList = []
- self.rerun_bad: TestList = []
- self.skipped: TestList = []
- self.resource_denied: TestList = []
- self.environment_changed: TestList = []
- self.run_no_tests: TestList = []
- self.rerun: TestList = []
-
- self.need_rerun: list[TestResult] = []
+ self.results: TestResults = TestResults()
+
self.first_state: str | None = None
- self.interrupted = False
- self.total_stats = TestStats()
# used by --slowest
- self.test_times: list[tuple[float, TestName]] = []
self.print_slowest: bool = ns.print_slow
# used to display the progress bar "[ 3/100]"
self.next_single_test: TestName | None = None
self.next_single_filename: StrPath | None = None
- # used by --junit-xml
- self.testsuite_xml = None
-
# misc
self.win_load_tracker = None
- def get_executed(self):
- return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denied) | set(self.environment_changed)
- | set(self.run_no_tests))
-
- def accumulate_result(self, result, rerun=False):
- test_name = result.test_name
-
- match result.state:
- case State.PASSED:
- self.good.append(test_name)
- case State.ENV_CHANGED:
- self.environment_changed.append(test_name)
- case State.SKIPPED:
- self.skipped.append(test_name)
- case State.RESOURCE_DENIED:
- self.resource_denied.append(test_name)
- case State.INTERRUPTED:
- self.interrupted = True
- case State.DID_NOT_RUN:
- self.run_no_tests.append(test_name)
- case _:
- if result.is_failed(self.fail_env_changed):
- self.bad.append(test_name)
- self.need_rerun.append(result)
- else:
- raise ValueError(f"invalid test state: {result.state!r}")
-
- if result.has_meaningful_duration() and not rerun:
- self.test_times.append((result.duration, test_name))
- if result.stats is not None:
- self.total_stats.accumulate(result.stats)
- if rerun:
- self.rerun.append(test_name)
-
- xml_data = result.xml_data
- if xml_data:
- import xml.etree.ElementTree as ET
- for e in xml_data:
- try:
- self.testsuite_xml.append(ET.fromstring(e))
- except ET.ParseError:
- print(xml_data, file=sys.__stderr__)
- raise
-
def log(self, line=''):
empty = not line
# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
- fails = len(self.bad) + len(self.environment_changed)
+ fails = len(self.results.bad) + len(self.results.env_changed)
if fails and not self.pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
- @staticmethod
- def get_rerun_match(rerun_list) -> FilterDict:
- rerun_match_tests = {}
- for result in rerun_list:
- match_tests = result.get_rerun_match_tests()
- # ignore empty match list
- if match_tests:
- rerun_match_tests[result.test_name] = match_tests
- return rerun_match_tests
-
- def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
+ def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests
if self.num_workers == 0:
self.num_workers = 1
- # Get tests to re-run
- tests = [result.test_name for result in need_rerun]
- match_tests_dict = self.get_rerun_match(need_rerun)
-
- # Clear previously failed tests
- self.rerun_bad.extend(self.bad)
- self.bad.clear()
- self.need_rerun.clear()
+ tests, match_tests_dict = self.results.prepare_rerun()
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = runtests.copy(
- tests=tuple(tests),
+ tests=tests,
rerun=True,
verbose=True,
forever=False,
self._run_tests_mp(runtests, self.num_workers)
return runtests
- def rerun_failed_tests(self, need_rerun, runtests: RunTests):
+ def rerun_failed_tests(self, runtests: RunTests):
if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
)
return
- self.first_state = self.get_tests_state()
+ self.first_state = self.get_state()
print()
- rerun_runtests = self._rerun_failed_tests(need_rerun, runtests)
+ rerun_runtests = self._rerun_failed_tests(runtests)
- if self.bad:
- print(count(len(self.bad), 'test'), "failed again:")
- printlist(self.bad)
+ if self.results.bad:
+ print(count(len(self.results.bad), 'test'), "failed again:")
+ printlist(self.results.bad)
self.display_result(rerun_runtests)
def display_result(self, runtests):
- pgo = runtests.pgo
-
# If running the test suite for PGO then no one cares about results.
- if pgo:
+ if runtests.pgo:
return
+ state = self.get_state()
print()
- print("== Tests result: %s ==" % self.get_tests_state())
-
- if self.interrupted:
- print("Test suite interrupted by signal SIGINT.")
-
- omitted = set(self.selected) - self.get_executed()
- if omitted:
- print()
- print(count(len(omitted), "test"), "omitted:")
- printlist(omitted)
-
- if self.good and not self.quiet:
- print()
- if (not self.bad
- and not self.skipped
- and not self.interrupted
- and len(self.good) > 1):
- print("All", end=' ')
- print(count(len(self.good), "test"), "OK.")
-
- if self.print_slowest:
- self.test_times.sort(reverse=True)
- print()
- print("10 slowest tests:")
- for test_time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(test_time)))
-
- if self.bad:
- print()
- print(count(len(self.bad), "test"), "failed:")
- printlist(self.bad)
-
- if self.environment_changed:
- print()
- print("{} altered the execution environment:".format(
- count(len(self.environment_changed), "test")))
- printlist(self.environment_changed)
-
- if self.skipped and not self.quiet:
- print()
- print(count(len(self.skipped), "test"), "skipped:")
- printlist(self.skipped)
-
- if self.resource_denied and not self.quiet:
- print()
- print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
- printlist(self.resource_denied)
-
- if self.rerun:
- print()
- print("%s:" % count(len(self.rerun), "re-run test"))
- printlist(self.rerun)
-
- if self.run_no_tests:
- print()
- print(count(len(self.run_no_tests), "test"), "run no tests:")
- printlist(self.run_no_tests)
+ print(f"== Tests result: {state} ==")
+
+ self.results.display_result(self.selected, self.quiet, self.print_slowest)
def run_test(self, test_name: TestName, runtests: RunTests, tracer):
if tracer is not None:
else:
result = run_single_test(test_name, runtests)
- self.accumulate_result(result)
+ self.results.accumulate_result(result, runtests)
return result
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")
- def no_tests_run(self):
- return not any((self.good, self.bad, self.skipped, self.interrupted,
- self.environment_changed))
-
- def get_tests_state(self):
- result = []
- if self.bad:
- result.append("FAILURE")
- elif self.fail_env_changed and self.environment_changed:
- result.append("ENV CHANGED")
- elif self.no_tests_run():
- result.append("NO TESTS RAN")
-
- if self.interrupted:
- result.append("INTERRUPTED")
-
- if not result:
- result.append("SUCCESS")
-
- result = ', '.join(result)
+ def get_state(self):
+ state = self.results.get_state(self.fail_env_changed)
if self.first_state:
- result = '%s then %s' % (self.first_state, result)
- return result
+ state = f'{self.first_state} then {state}'
+ return state
def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
from test.libregrtest.runtest_mp import RunWorkers
if self.want_run_leaks:
os.system("leaks %d" % os.getpid())
- self.save_xml_result()
+ if self.junit_filename:
+ self.results.write_junit(self.junit_filename)
def display_summary(self):
duration = time.perf_counter() - self.start_time
print()
print("Total duration: %s" % format_duration(duration))
- # Total tests
- total = self.total_stats
- text = f'run={total.tests_run:,}'
- if filtered:
- text = f"{text} (filtered)"
- stats = [text]
- if total.failures:
- stats.append(f'failures={total.failures:,}')
- if total.skipped:
- stats.append(f'skipped={total.skipped:,}')
- print(f"Total tests: {' '.join(stats)}")
-
- # Total test files
- all_tests = [self.good, self.bad, self.rerun,
- self.skipped,
- self.environment_changed, self.run_no_tests]
- run = sum(map(len, all_tests))
- text = f'run={run}'
- if not self.first_runtests.forever:
- ntest = len(self.first_runtests.tests)
- text = f"{text}/{ntest}"
- if filtered:
- text = f"{text} (filtered)"
- report = [text]
- for name, tests in (
- ('failed', self.bad),
- ('env_changed', self.environment_changed),
- ('skipped', self.skipped),
- ('resource_denied', self.resource_denied),
- ('rerun', self.rerun),
- ('run_no_tests', self.run_no_tests),
- ):
- if tests:
- report.append(f'{name}={len(tests)}')
- print(f"Total test files: {' '.join(report)}")
+ self.results.display_summary(self.first_runtests, filtered)
# Result
- result = self.get_tests_state()
- print(f"Result: {result}")
-
- def save_xml_result(self):
- if not self.junit_filename and not self.testsuite_xml:
- return
-
- import xml.etree.ElementTree as ET
- root = ET.Element("testsuites")
-
- # Manually count the totals for the overall summary
- totals = {'tests': 0, 'errors': 0, 'failures': 0}
- for suite in self.testsuite_xml:
- root.append(suite)
- for k in totals:
- try:
- totals[k] += int(suite.get(k, 0))
- except ValueError:
- pass
-
- for k, v in totals.items():
- root.set(k, str(v))
-
- xmlpath = os.path.join(os_helper.SAVEDCWD, self.junit_filename)
- with open(xmlpath, 'wb') as f:
- for s in ET.tostringlist(root):
- f.write(s)
+ state = self.get_state()
+ print(f"Result: {state}")
@staticmethod
def fix_umask():
os_helper.unlink(name)
def main(self, tests: TestList | None = None):
- self.tests = tests
+ if self.junit_filename and not os.path.isabs(self.junit_filename):
+ self.junit_filename = os.path.abspath(self.junit_filename)
- if self.junit_filename:
- support.junit_xml_list = self.testsuite_xml = []
+ self.tests = tests
strip_py_suffix(self.cmdline_args)
return None
- def get_exitcode(self):
- exitcode = 0
- if self.bad:
- exitcode = EXITCODE_BAD_TEST
- elif self.interrupted:
- exitcode = EXITCODE_INTERRUPTED
- elif self.fail_env_changed and self.environment_changed:
- exitcode = EXITCODE_ENV_CHANGED
- elif self.no_tests_run():
- exitcode = EXITCODE_NO_TESTS_RAN
- elif self.rerun and self.fail_rerun:
- exitcode = EXITCODE_RERUN_FAIL
- return exitcode
-
def action_run_tests(self):
if self.hunt_refleak and self.hunt_refleak.warmups < 3:
msg = ("WARNING: Running tests with --huntrleaks/-R and "
tracer = self.run_tests(runtests)
self.display_result(runtests)
- need_rerun = self.need_rerun
- if self.want_rerun and need_rerun:
- self.rerun_failed_tests(need_rerun, runtests)
+ if self.want_rerun and self.results.need_rerun():
+ self.rerun_failed_tests(runtests)
self.display_summary()
self.finalize_tests(tracer)
self.list_cases()
else:
self.action_run_tests()
- exitcode = self.get_exitcode()
+ exitcode = self.results.get_exitcode(self.fail_env_changed,
+ self.fail_rerun)
sys.exit(exitcode)
--- /dev/null
+import sys
+from test.support import TestStats
+
+from test.libregrtest.runtest import (
+ TestName, TestTuple, TestList, FilterDict, StrPath, State,
+ TestResult, RunTests)
+from test.libregrtest.utils import printlist, count, format_duration
+
+
+EXITCODE_BAD_TEST = 2
+EXITCODE_ENV_CHANGED = 3
+EXITCODE_NO_TESTS_RAN = 4
+EXITCODE_RERUN_FAIL = 5
+EXITCODE_INTERRUPTED = 130
+
+
+class TestResults:
+ def __init__(self):
+ self.bad: TestList = []
+ self.good: TestList = []
+ self.rerun_bad: TestList = []
+ self.skipped: TestList = []
+ self.resource_denied: TestList = []
+ self.env_changed: TestList = []
+ self.run_no_tests: TestList = []
+ self.rerun: TestList = []
+ self.bad_results: list[TestResult] = []
+
+ self.interrupted: bool = False
+ self.test_times: list[tuple[float, TestName]] = []
+ self.stats = TestStats()
+ # used by --junit-xml
+ self.testsuite_xml: list[str] = []
+
+ def get_executed(self):
+ return (set(self.good) | set(self.bad) | set(self.skipped)
+ | set(self.resource_denied) | set(self.env_changed)
+ | set(self.run_no_tests))
+
+ def no_tests_run(self):
+ return not any((self.good, self.bad, self.skipped, self.interrupted,
+ self.env_changed))
+
+ def get_state(self, fail_env_changed):
+ state = []
+ if self.bad:
+ state.append("FAILURE")
+ elif fail_env_changed and self.env_changed:
+ state.append("ENV CHANGED")
+ elif self.no_tests_run():
+ state.append("NO TESTS RAN")
+
+ if self.interrupted:
+ state.append("INTERRUPTED")
+ if not state:
+ state.append("SUCCESS")
+
+ return ', '.join(state)
+
+ def get_exitcode(self, fail_env_changed, fail_rerun):
+ exitcode = 0
+ if self.bad:
+ exitcode = EXITCODE_BAD_TEST
+ elif self.interrupted:
+ exitcode = EXITCODE_INTERRUPTED
+ elif fail_env_changed and self.env_changed:
+ exitcode = EXITCODE_ENV_CHANGED
+ elif self.no_tests_run():
+ exitcode = EXITCODE_NO_TESTS_RAN
+ elif fail_rerun and self.rerun:
+ exitcode = EXITCODE_RERUN_FAIL
+ return exitcode
+
+ def accumulate_result(self, result: TestResult, runtests: RunTests):
+ test_name = result.test_name
+ rerun = runtests.rerun
+ fail_env_changed = runtests.fail_env_changed
+
+ match result.state:
+ case State.PASSED:
+ self.good.append(test_name)
+ case State.ENV_CHANGED:
+ self.env_changed.append(test_name)
+ case State.SKIPPED:
+ self.skipped.append(test_name)
+ case State.RESOURCE_DENIED:
+ self.resource_denied.append(test_name)
+ case State.INTERRUPTED:
+ self.interrupted = True
+ case State.DID_NOT_RUN:
+ self.run_no_tests.append(test_name)
+ case _:
+ if result.is_failed(fail_env_changed):
+ self.bad.append(test_name)
+ self.bad_results.append(result)
+ else:
+ raise ValueError(f"invalid test state: {result.state!r}")
+
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
+ if result.stats is not None:
+ self.stats.accumulate(result.stats)
+ if rerun:
+ self.rerun.append(test_name)
+
+ xml_data = result.xml_data
+ if xml_data:
+ self.add_junit(result.xml_data)
+
+ def need_rerun(self):
+ return bool(self.bad_results)
+
+ def prepare_rerun(self) -> (TestTuple, FilterDict):
+ tests: TestList = []
+ match_tests_dict = {}
+ for result in self.bad_results:
+ tests.append(result.test_name)
+
+ match_tests = result.get_rerun_match_tests()
+ # ignore empty match list
+ if match_tests:
+ match_tests_dict[result.test_name] = match_tests
+
+ # Clear previously failed tests
+ self.rerun_bad.extend(self.bad)
+ self.bad.clear()
+ self.bad_results.clear()
+
+ return (tuple(tests), match_tests_dict)
+
+ def add_junit(self, xml_data: list[str]):
+ import xml.etree.ElementTree as ET
+ for e in xml_data:
+ try:
+ self.testsuite_xml.append(ET.fromstring(e))
+ except ET.ParseError:
+ print(xml_data, file=sys.__stderr__)
+ raise
+
+ def write_junit(self, filename: StrPath):
+ if not self.testsuite_xml:
+ # Don't create empty XML file
+ return
+
+ import xml.etree.ElementTree as ET
+ root = ET.Element("testsuites")
+
+ # Manually count the totals for the overall summary
+ totals = {'tests': 0, 'errors': 0, 'failures': 0}
+ for suite in self.testsuite_xml:
+ root.append(suite)
+ for k in totals:
+ try:
+ totals[k] += int(suite.get(k, 0))
+ except ValueError:
+ pass
+
+ for k, v in totals.items():
+ root.set(k, str(v))
+
+ with open(filename, 'wb') as f:
+ for s in ET.tostringlist(root):
+ f.write(s)
+
+ def display_result(self, tests: TestList, quiet: bool, print_slowest: bool):
+ if self.interrupted:
+ print("Test suite interrupted by signal SIGINT.")
+
+ omitted = set(tests) - self.get_executed()
+ if omitted:
+ print()
+ print(count(len(omitted), "test"), "omitted:")
+ printlist(omitted)
+
+ if self.good and not quiet:
+ print()
+ if (not self.bad
+ and not self.skipped
+ and not self.interrupted
+ and len(self.good) > 1):
+ print("All", end=' ')
+ print(count(len(self.good), "test"), "OK.")
+
+ if print_slowest:
+ self.test_times.sort(reverse=True)
+ print()
+ print("10 slowest tests:")
+ for test_time, test in self.test_times[:10]:
+ print("- %s: %s" % (test, format_duration(test_time)))
+
+ if self.bad:
+ print()
+ print(count(len(self.bad), "test"), "failed:")
+ printlist(self.bad)
+
+ if self.env_changed:
+ print()
+ print("{} altered the execution environment:".format(
+ count(len(self.env_changed), "test")))
+ printlist(self.env_changed)
+
+ if self.skipped and not quiet:
+ print()
+ print(count(len(self.skipped), "test"), "skipped:")
+ printlist(self.skipped)
+
+ if self.resource_denied and not quiet:
+ print()
+ print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
+ printlist(self.resource_denied)
+
+ if self.rerun:
+ print()
+ print("%s:" % count(len(self.rerun), "re-run test"))
+ printlist(self.rerun)
+
+ if self.run_no_tests:
+ print()
+ print(count(len(self.run_no_tests), "test"), "run no tests:")
+ printlist(self.run_no_tests)
+
+ def display_summary(self, first_runtests: RunTests, filtered: bool):
+ # Total tests
+ stats = self.stats
+ text = f'run={stats.tests_run:,}'
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ if stats.failures:
+ report.append(f'failures={stats.failures:,}')
+ if stats.skipped:
+ report.append(f'skipped={stats.skipped:,}')
+ report = ' '.join(report)
+ print(f"Total tests: {report}")
+
+ # Total test files
+ all_tests = [self.good, self.bad, self.rerun,
+ self.skipped,
+ self.env_changed, self.run_no_tests]
+ run = sum(map(len, all_tests))
+ text = f'run={run}'
+ if not first_runtests.forever:
+ ntest = len(first_runtests.tests)
+ text = f"{text}/{ntest}"
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ for name, tests in (
+ ('failed', self.bad),
+ ('env_changed', self.env_changed),
+ ('skipped', self.skipped),
+ ('resource_denied', self.resource_denied),
+ ('rerun', self.rerun),
+ ('run_no_tests', self.run_no_tests),
+ ):
+ if tests:
+ report.append(f'{name}={len(tests)}')
+ report = ' '.join(report)
+ print(f"Total test files: {report}")