* Rename runtest() to run_single_test().
* Pass runtests to run_single_test().
* Add type annotation to Regrtest attributes. Add missing attributes
to Namespace.
* Add attributes to Regrtest and RunTests:
* fail_fast
* ignore_tests
* match_tests
* output_on_failure
* pgo
* pgo_extended
* timeout
* Get pgo from 'runtests', rather than from 'ns'.
* Remove WorkerJob.match_tests.
* setup_support() now gets pgo_extended from runtests.
* save_env(): change parameter order, pass test_name first.
* Add setup_test_dir() function.
* Pass runtests to setup_tests().
self.verbose = 0
self.quiet = False
self.exclude = False
+ self.cleanup = False
+ self.wait = False
+ self.list_cases = False
+ self.list_tests = False
self.single = False
self.randomize = False
self.fromfile = None
self.pgo = False
self.pgo_extended = False
self.worker_json = None
+ self.start = None
+ self.timeout = None
super().__init__(**kwargs)
import unittest
from test.libregrtest.cmdline import _parse_args, Namespace
from test.libregrtest.runtest import (
- findtests, split_test_packages, runtest, abs_module_name,
+ findtests, split_test_packages, run_single_test, abs_module_name,
PROGRESS_MIN_TIME, State, RunTests, TestResult,
FilterTuple, FilterDict, TestList)
-from test.libregrtest.setup import setup_tests
+from test.libregrtest.setup import setup_tests, setup_test_dir
from test.libregrtest.pgo import setup_pgo_tests
from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
printlist, get_build_info)
self.ns: Namespace = ns
# Actions
- self.want_header = ns.header
- self.want_list_tests = ns.list_tests
- self.want_list_cases = ns.list_cases
- self.want_wait = ns.wait
- self.want_cleanup = ns.cleanup
+ self.want_header: bool = ns.header
+ self.want_list_tests: bool = ns.list_tests
+ self.want_list_cases: bool = ns.list_cases
+ self.want_wait: bool = ns.wait
+ self.want_cleanup: bool = ns.cleanup
# Select tests
if ns.match_tests:
self.ignore_tests: FilterTuple = tuple(ns.ignore_tests)
else:
self.ignore_tests = None
- self.exclude = ns.exclude
- self.fromfile = ns.fromfile
- self.starting_test = ns.start
+ self.exclude: bool = ns.exclude
+ self.fromfile: str | None = ns.fromfile
+ self.starting_test: str | None = ns.start
# Options to run tests
- self.forever = ns.forever
- self.randomize = ns.randomize
- self.random_seed = ns.random_seed
+ self.fail_fast: bool = ns.failfast
+ self.forever: bool = ns.forever
+ self.randomize: bool = ns.randomize
+ self.random_seed: int | None = ns.random_seed
+ self.pgo: bool = ns.pgo
+ self.pgo_extended: bool = ns.pgo_extended
+ self.output_on_failure: bool = ns.verbose3
+ self.timeout: float | None = ns.timeout
# tests
self.tests = []
def display_progress(self, test_index, text):
quiet = self.ns.quiet
- pgo = self.ns.pgo
if quiet:
return
# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(self.bad) + len(self.environment_changed)
- if fails and not pgo:
+ if fails and not self.pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
def find_tests(self):
ns = self.ns
single = ns.single
- pgo = ns.pgo
test_dir = ns.testdir
if single:
strip_py_suffix(self.tests)
- if pgo:
+ if self.pgo:
# add default PGO tests if no tests are specified
setup_pgo_tests(ns)
# Configure the runner to re-run tests
ns = self.ns
ns.verbose = True
- ns.failfast = False
- ns.verbose3 = False
if ns.use_mp is None:
ns.use_mp = 1
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
- runtests = runtests.copy(tests=tuple(tests),
- match_tests_dict=match_tests_dict,
- rerun=True,
- forever=False)
+ runtests = runtests.copy(
+ tests=tuple(tests),
+ rerun=True,
+ forever=False,
+ fail_fast=False,
+ match_tests_dict=match_tests_dict,
+ output_on_failure=False)
self.set_tests(runtests)
self._run_tests_mp(runtests)
+ return runtests
def rerun_failed_tests(self, need_rerun, runtests: RunTests):
if self.ns.python:
self.first_state = self.get_tests_state()
print()
- self._rerun_failed_tests(need_rerun, runtests)
+ rerun_runtests = self._rerun_failed_tests(need_rerun, runtests)
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad)
- self.display_result()
+ self.display_result(rerun_runtests)
- def display_result(self):
- pgo = self.ns.pgo
+ def display_result(self, runtests):
+ pgo = runtests.pgo
quiet = self.ns.quiet
print_slow = self.ns.print_slow
if tracer is not None:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name)')
+ cmd = ('result = run_single_test(test_name, runtests, self.ns)')
ns = dict(locals())
tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
- result = runtest(self.ns, test_name)
+ result = run_single_test(test_name, runtests, self.ns)
self.accumulate_result(result)
def run_tests_sequentially(self, runtests):
ns = self.ns
coverage = ns.trace
- fail_fast = ns.failfast
fail_env_changed = ns.fail_env_changed
- timeout = ns.timeout
if coverage:
import trace
save_modules = sys.modules.keys()
msg = "Run tests sequentially"
- if timeout:
- msg += " (timeout: %s)" % format_duration(timeout)
+ if runtests.timeout:
+ msg += " (timeout: %s)" % format_duration(runtests.timeout)
self.log(msg)
previous_test = None
if module not in save_modules and module.startswith("test."):
support.unload(module)
- if result.must_stop(fail_fast, fail_env_changed):
+ if result.must_stop(self.fail_fast, fail_env_changed):
break
previous_test = str(result)
# For a partial run, we do not need to clutter the output.
if (self.want_header
- or not(self.ns.pgo or self.ns.quiet or self.ns.single
+ or not(self.pgo or self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
self.display_header()
if self.randomize:
print("Using random seed", self.random_seed)
- runtests = RunTests(tuple(self.selected), forever=self.forever)
+ runtests = RunTests(
+ tuple(self.selected),
+ fail_fast=self.fail_fast,
+ match_tests=self.match_tests,
+ ignore_tests=self.ignore_tests,
+ forever=self.forever,
+ pgo=self.pgo,
+ pgo_extended=self.pgo_extended,
+ output_on_failure=self.output_on_failure,
+ timeout=self.timeout)
+
+ setup_tests(runtests, self.ns)
+
tracer = self.run_tests(runtests)
- self.display_result()
+ self.display_result(runtests)
need_rerun = self.need_rerun
if self.ns.rerun and need_rerun:
if self.want_wait:
input("Press any key to continue...")
- setup_tests(self.ns)
+ setup_test_dir(self.ns.testdir)
self.find_tests()
exitcode = 0
@dataclasses.dataclass(slots=True, frozen=True)
class RunTests:
tests: TestTuple
+ fail_fast: bool = False
+ match_tests: FilterTuple | None = None
+ ignore_tests: FilterTuple | None = None
match_tests_dict: FilterDict | None = None
rerun: bool = False
forever: bool = False
+ pgo: bool = False
+ pgo_extended: bool = False
+ output_on_failure: bool = False
+ timeout: float | None = None
def copy(self, **override):
state = dataclasses.asdict(self)
return 'test.' + test_name
-def setup_support(ns: Namespace):
- support.PGO = ns.pgo
- support.PGO_EXTENDED = ns.pgo_extended
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.failfast = ns.failfast
+def setup_support(runtests: RunTests, ns: Namespace):
+ support.PGO = runtests.pgo
+ support.PGO_EXTENDED = runtests.pgo_extended
+ support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
+ support.failfast = runtests.fail_fast
support.verbose = ns.verbose
if ns.xmlpath:
support.junit_xml_list = []
support.junit_xml_list = None
-def _runtest(result: TestResult, ns: Namespace) -> None:
+def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
verbose = ns.verbose
- output_on_failure = ns.verbose3
- timeout = ns.timeout
+ output_on_failure = runtests.output_on_failure
+ timeout = runtests.timeout
use_timeout = (
timeout is not None and threading_helper.can_start_thread
faulthandler.dump_traceback_later(timeout, exit=True)
try:
- setup_support(ns)
+ setup_support(runtests, ns)
if output_on_failure:
support.verbose = True
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
- _runtest_env_changed_exc(result, ns, display_failure=False)
+ _runtest_env_changed_exc(result, runtests, ns, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
else:
# Tell tests to be moderately quiet
support.verbose = verbose
- _runtest_env_changed_exc(result, ns, display_failure=not verbose)
+ _runtest_env_changed_exc(result, runtests, ns, display_failure=not verbose)
xml_list = support.junit_xml_list
if xml_list:
support.junit_xml_list = None
-def runtest(ns: Namespace, test_name: str) -> TestResult:
+def run_single_test(test_name: str, runtests: RunTests, ns: Namespace) -> TestResult:
"""Run a single test.
ns -- regrtest namespace of options
"""
start_time = time.perf_counter()
result = TestResult(test_name)
+ pgo = runtests.pgo
try:
- _runtest(result, ns)
+ _runtest(result, runtests, ns)
except:
- if not ns.pgo:
+ if not pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return support.run_unittest(tests)
-def save_env(ns: Namespace, test_name: str):
- return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
+def save_env(test_name: str, runtests: RunTests, ns: Namespace):
+ return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=runtests.pgo)
def regrtest_runner(result, test_func, ns) -> None:
FOUND_GARBAGE = []
-def _load_run_test(result: TestResult, ns: Namespace) -> None:
+def _load_run_test(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
# Load the test function, run the test function.
module_name = abs_module_name(result.test_name, ns.testdir)
return run_unittest(test_mod)
try:
- with save_env(ns, result.test_name):
+ with save_env(result.test_name, runtests, ns):
regrtest_runner(result, test_func, ns)
finally:
# First kill any dangling references to open files etc.
support.reap_children()
-def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
+def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
+ ns: Namespace,
display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
# the environment
support.environment_altered = False
- if ns.pgo:
+ pgo = runtests.pgo
+ if pgo:
display_failure = False
test_name = result.test_name
clear_caches()
support.gc_collect()
- with save_env(ns, test_name):
- _load_run_test(result, ns)
+ with save_env(test_name, runtests, ns):
+ _load_run_test(result, runtests, ns)
except support.ResourceDenied as msg:
- if not ns.quiet and not ns.pgo:
+ if not ns.quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.RESOURCE_DENIED
return
except unittest.SkipTest as msg:
- if not ns.quiet and not ns.pgo:
+ if not ns.quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.SKIPPED
return
result.state = State.INTERRUPTED
return
except:
- if not ns.pgo:
+ if not pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
import threading
import time
import traceback
-from typing import NamedTuple, NoReturn, Literal, Any, TextIO
+from typing import NoReturn, Literal, Any, TextIO
from test import support
from test.support import os_helper
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
- runtest, TestResult, State, PROGRESS_MIN_TIME,
+ run_single_test, TestResult, State, PROGRESS_MIN_TIME,
FilterTuple, RunTests)
-from test.libregrtest.setup import setup_tests
+from test.libregrtest.setup import setup_tests, setup_test_dir
from test.libregrtest.utils import format_duration, print_warning
if sys.platform == 'win32':
class WorkerJob:
runtests: RunTests
namespace: Namespace
- match_tests: FilterTuple | None = None
class _EncodeWorkerJob(json.JSONEncoder):
runtests = worker_job.runtests
ns = worker_job.namespace
test_name = runtests.tests[0]
- match_tests: FilterTuple | None = worker_job.match_tests
+ match_tests: FilterTuple | None = runtests.match_tests
- setup_tests(ns)
+ setup_test_dir(ns.testdir)
+ setup_tests(runtests, ns)
if runtests.rerun:
if match_tests:
print(f"Re-running {test_name} in verbose mode", flush=True)
ns.verbose = True
- if match_tests is not None:
- ns.match_tests = match_tests
-
- result = runtest(ns, test_name)
+ result = run_single_test(test_name, runtests, ns)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
- worker_runtests = self.runtests.copy(tests=tests)
+ kwargs = {}
+ if match_tests:
+ kwargs['match_tests'] = match_tests
+ worker_runtests = self.runtests.copy(tests=tests, **kwargs)
worker_job = WorkerJob(
worker_runtests,
- namespace=self.ns,
- match_tests=match_tests)
+ namespace=self.ns)
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
return MultiprocessResult(result, stdout)
def run(self) -> None:
- fail_fast = self.ns.failfast
+ fail_fast = self.runtests.fail_fast
fail_env_changed = self.ns.fail_env_changed
while not self._stopped:
try:
class MultiprocessTestRunner:
def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
ns = regrtest.ns
- timeout = ns.timeout
self.regrtest = regrtest
self.runtests = runtests
self.output: queue.Queue[QueueOutput] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
- if timeout is not None:
+ self.timeout = runtests.timeout
+ if self.timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
# when faulthandler fails to kill a worker process. Give a maximum
# of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
+ self.worker_timeout = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers = None
def start_workers(self) -> None:
use_mp = self.ns.use_mp
- timeout = self.ns.timeout
self.workers = [TestWorkerProcess(index, self)
for index in range(1, use_mp + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if timeout:
+ if self.timeout:
msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(timeout),
+ % (format_duration(self.timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
worker.wait_stopped(start_time)
def _get_result(self) -> QueueOutput | None:
- pgo = self.ns.pgo
- use_faulthandler = (self.ns.timeout is not None)
- timeout = PROGRESS_UPDATE
+ pgo = self.runtests.pgo
+ use_faulthandler = (self.timeout is not None)
# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
# wait for a thread
try:
- return self.output.get(timeout=timeout)
+ return self.output.get(timeout=PROGRESS_UPDATE)
except queue.Empty:
pass
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
- pgo = self.ns.pgo
+ pgo = self.runtests.pgo
text = str(result)
if mp_result.err_msg:
return result
def run_tests(self) -> None:
- fail_fast = self.ns.failfast
+ fail_fast = self.runtests.fail_fast
fail_env_changed = self.ns.fail_env_changed
- timeout = self.ns.timeout
self.start_workers()
print()
self.regrtest.interrupted = True
finally:
- if timeout is not None:
+ if self.timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD"
-def setup_tests(ns):
+def setup_test_dir(testdir):
+ if testdir:
+ # Prepend test directory to sys.path, so runtest() will be able
+ # to locate tests
+ sys.path.insert(0, os.path.abspath(testdir))
+
+
+def setup_tests(runtests, ns):
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
replace_stdout()
support.record_original_stdout(sys.stdout)
- if ns.testdir:
- # Prepend test directory to sys.path, so runtest() will be able
- # to locate tests
- sys.path.insert(0, os.path.abspath(ns.testdir))
-
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
# imports might fail. This affects only the modules imported before os.chdir().
setup_unraisable_hook()
setup_threading_excepthook()
- if ns.timeout is not None:
+ timeout = runtests.timeout
+ if timeout is not None:
# For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT
- support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, ns.timeout / 40)
- support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, ns.timeout / 4)
+ support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, timeout / 40)
+ support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, timeout / 4)
# If --timeout is short: reduce timeouts
- support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, ns.timeout)
- support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, ns.timeout)
- support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, ns.timeout)
- support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, ns.timeout)
+ support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, timeout)
+ support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, timeout)
+ support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout)
+ support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout)
if ns.xmlpath:
from test.support.testresult import RegressionTestResult