import os
import sys
import unittest
+from collections.abc import Container
from test import support
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-def findtests(*, testdir: StrPath | None = None, exclude=(),
+def findtests(*, testdir: StrPath | None = None, exclude: Container[str] = (),
split_test_dirs: set[TestName] = SPLITTESTDIRS,
base_mod: str = "") -> TestList:
"""Return a list of all applicable test modules."""
return sorted(tests)
-def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
- split_test_dirs=SPLITTESTDIRS):
+def split_test_packages(tests, *, testdir: StrPath | None = None,
+ exclude: Container[str] = (),
+ split_test_dirs=SPLITTESTDIRS) -> list[TestName]:
testdir = findtestdir(testdir)
splitted = []
for name in tests:
return splitted
-def _list_cases(suite):
+def _list_cases(suite: unittest.TestSuite) -> None:
for test in suite:
- if isinstance(test, unittest.loader._FailedTest):
+ if isinstance(test, unittest.loader._FailedTest): # type: ignore[attr-defined]
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
def list_cases(tests: TestTuple, *,
match_tests: TestFilter | None = None,
- test_dir: StrPath | None = None):
+ test_dir: StrPath | None = None) -> None:
support.verbose = False
set_match_tests(match_tests)
import sysconfig
import time
import trace
+from typing import NoReturn
from test.support import os_helper, MS_WINDOWS, flush_std_streams
self.next_single_test: TestName | None = None
self.next_single_filename: StrPath | None = None
- def log(self, line=''):
+ def log(self, line: str = '') -> None:
self.logger.log(line)
def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]:
return (tuple(selected), tests)
@staticmethod
- def list_tests(tests: TestTuple):
+ def list_tests(tests: TestTuple) -> None:
for name in tests:
print(name)
- def _rerun_failed_tests(self, runtests: RunTests):
+ def _rerun_failed_tests(self, runtests: RunTests) -> RunTests:
# Configure the runner to re-run tests
if self.num_workers == 0 and not self.single_process:
# Always run tests in fresh processes to have more deterministic
self.run_tests_sequentially(runtests)
return runtests
- def rerun_failed_tests(self, runtests: RunTests):
+ def rerun_failed_tests(self, runtests: RunTests) -> None:
if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
if not self._run_bisect(runtests, name, progress):
return
- def display_result(self, runtests):
+ def display_result(self, runtests: RunTests) -> None:
# If running the test suite for PGO then no one cares about results.
if runtests.pgo:
return
return result
- def run_tests_sequentially(self, runtests) -> None:
+ def run_tests_sequentially(self, runtests: RunTests) -> None:
if self.coverage:
tracer = trace.Trace(trace=False, count=True)
else:
if previous_test:
print(previous_test)
- def get_state(self):
+ def get_state(self) -> str:
state = self.results.get_state(self.fail_env_changed)
if self.first_state:
state = f'{self.first_state} then {state}'
if self.junit_filename:
self.results.write_junit(self.junit_filename)
- def display_summary(self):
+ def display_summary(self) -> None:
duration = time.perf_counter() - self.logger.start_time
filtered = bool(self.match_tests)
state = self.get_state()
print(f"Result: {state}")
- def create_run_tests(self, tests: TestTuple):
+ def create_run_tests(self, tests: TestTuple) -> RunTests:
return RunTests(
tests,
fail_fast=self.fail_fast,
f"Command: {cmd_text}")
# continue executing main()
- def _add_python_opts(self):
- python_opts = []
- regrtest_opts = []
+ def _add_python_opts(self) -> None:
+ python_opts: list[str] = []
+ regrtest_opts: list[str] = []
environ, keep_environ = self._add_cross_compile_opts(regrtest_opts)
if self.ci_mode:
self.tmp_dir = get_temp_dir(self.tmp_dir)
- def main(self, tests: TestList | None = None):
+ def main(self, tests: TestList | None = None) -> NoReturn:
if self.want_add_python_opts:
self._add_python_opts()
sys.exit(exitcode)
-def main(tests=None, _add_python_opts=False, **kwargs):
+def main(tests=None, _add_python_opts=False, **kwargs) -> NoReturn:
"""Run the Python suite."""
ns = _parse_args(sys.argv[1:], **kwargs)
Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests)
'test_xml_etree_c',
]
-def setup_pgo_tests(cmdline_args, pgo_extended: bool):
+def setup_pgo_tests(cmdline_args, pgo_extended: bool) -> None:
if not cmdline_args and not pgo_extended:
# run default set of tests for PGO training
cmdline_args[:] = PGO_TESTS[:]
sys._clear_internal_caches()
-def warm_caches():
+def warm_caches() -> None:
# char cache
s = bytes(range(256))
for i in range(256):
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
+ assert self.duration is not None, "self.duration is None"
return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
return ', '.join(state)
- def get_exitcode(self, fail_env_changed, fail_rerun):
+ def get_exitcode(self, fail_env_changed: bool, fail_rerun: bool) -> int:
exitcode = 0
if self.bad:
exitcode = EXITCODE_BAD_TEST
exitcode = EXITCODE_BAD_TEST
return exitcode
- def accumulate_result(self, result: TestResult, runtests: RunTests):
+ def accumulate_result(self, result: TestResult, runtests: RunTests) -> None:
test_name = result.test_name
rerun = runtests.rerun
fail_env_changed = runtests.fail_env_changed
counts = {loc: 1 for loc in self.covered_lines}
return trace.CoverageResults(counts=counts)
- def need_rerun(self):
+ def need_rerun(self) -> bool:
return bool(self.rerun_results)
def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple, FilterDict]:
return (tuple(tests), match_tests_dict)
- def add_junit(self, xml_data: list[str]):
+ def add_junit(self, xml_data: list[str]) -> None:
import xml.etree.ElementTree as ET
for e in xml_data:
try:
print(xml_data, file=sys.__stderr__)
raise
- def write_junit(self, filename: StrPath):
+ def write_junit(self, filename: StrPath) -> None:
if not self.testsuite_xml:
# Don't create empty XML file
return
for s in ET.tostringlist(root):
f.write(s)
- def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool):
+ def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool) -> None:
if print_slowest:
self.test_times.sort(reverse=True)
print()
print()
print("Test suite interrupted by signal SIGINT.")
- def display_summary(self, first_runtests: RunTests, filtered: bool):
+ def display_summary(self, first_runtests: RunTests, filtered: bool) -> None:
# Total tests
stats = self.stats
text = f'run={stats.tests_run:,}'
import shlex
import subprocess
import sys
-from typing import Any
+from typing import Any, Iterator
from test import support
from .utils import (
- StrPath, StrJSON, TestTuple, TestFilter, FilterTuple, FilterDict)
+ StrPath, StrJSON, TestTuple, TestName, TestFilter, FilterTuple, FilterDict)
class JsonFileType:
popen_kwargs['startupinfo'] = startupinfo
@contextlib.contextmanager
- def inherit_subprocess(self):
- if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ def inherit_subprocess(self) -> Iterator[None]:
+ if sys.platform == 'win32' and self.file_type == JsonFileType.WINDOWS_HANDLE:
os.set_handle_inheritable(self.file, True)
try:
yield
state.update(override)
return RunTests(**state)
- def create_worker_runtests(self, **override):
+ def create_worker_runtests(self, **override) -> WorkerRunTests:
state = dataclasses.asdict(self)
state.update(override)
return WorkerRunTests(**state)
- def get_match_tests(self, test_name) -> FilterTuple | None:
+ def get_match_tests(self, test_name: TestName) -> FilterTuple | None:
if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None)
else:
return None
- def get_jobs(self):
+ def get_jobs(self) -> int | None:
# Number of run_single_test() calls needed to run all tests.
# None means that there is not bound limit (--forever option).
if self.forever:
return None
return len(self.tests)
- def iter_tests(self):
+ def iter_tests(self) -> Iterator[TestName]:
if self.forever:
while True:
yield from self.tests
sys.path.insert(0, os.path.abspath(testdir))
-def setup_process():
+def setup_process() -> None:
fix_umask()
+ assert sys.__stderr__ is not None, "sys.__stderr__ is None"
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
# and ValueError on a closed stream.
#
# Catch AttributeError for stderr being None.
- stderr_fd = None
+ pass
else:
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True, file=stderr_fd)
for index, path in enumerate(module.__path__):
module.__path__[index] = os.path.abspath(path)
if getattr(module, '__file__', None):
- module.__file__ = os.path.abspath(module.__file__)
+ module.__file__ = os.path.abspath(module.__file__) # type: ignore[type-var]
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested
os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
-def setup_tests(runtests: RunTests):
+def setup_tests(runtests: RunTests) -> None:
support.verbose = runtests.verbose
support.failfast = runtests.fail_fast
support.PGO = runtests.pgo
]
-def setup_tsan_tests(cmdline_args):
+def setup_tsan_tests(cmdline_args) -> None:
if not cmdline_args:
cmdline_args[:] = TSAN_TESTS[:]
FilterDict = dict[TestName, FilterTuple]
-def format_duration(seconds):
+def format_duration(seconds: float) -> str:
ms = math.ceil(seconds * 1e3)
seconds, ms = divmod(ms, 1000)
minutes, seconds = divmod(seconds, 60)
names[idx] = basename
-def plural(n, singular, plural=None):
+def plural(n: int, singular: str, plural: str | None = None) -> str:
if n == 1:
return singular
elif plural is not None:
return singular + 's'
-def count(n, word):
+def count(n: int, word: str) -> str:
if n == 1:
return f"{n} {word}"
else:
file=file)
-def print_warning(msg):
+def print_warning(msg: str) -> None:
support.print_warning(msg)
-orig_unraisablehook = None
+orig_unraisablehook: Callable[..., None] | None = None
-def regrtest_unraisable_hook(unraisable):
+def regrtest_unraisable_hook(unraisable) -> None:
global orig_unraisablehook
support.environment_altered = True
support.print_warning("Unraisable exception")
try:
support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr
+ assert orig_unraisablehook is not None, "orig_unraisablehook not set"
orig_unraisablehook(unraisable)
sys.stderr.flush()
finally:
sys.stderr = old_stderr
-def setup_unraisable_hook():
+def setup_unraisable_hook() -> None:
global orig_unraisablehook
orig_unraisablehook = sys.unraisablehook
sys.unraisablehook = regrtest_unraisable_hook
-orig_threading_excepthook = None
+orig_threading_excepthook: Callable[..., None] | None = None
-def regrtest_threading_excepthook(args):
+def regrtest_threading_excepthook(args) -> None:
global orig_threading_excepthook
support.environment_altered = True
support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}")
try:
support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr
+ assert orig_threading_excepthook is not None, "orig_threading_excepthook not set"
orig_threading_excepthook(args)
sys.stderr.flush()
finally:
sys.stderr = old_stderr
-def setup_threading_excepthook():
+def setup_threading_excepthook() -> None:
global orig_threading_excepthook
import threading
orig_threading_excepthook = threading.excepthook
return os.path.abspath(tmp_dir)
-def fix_umask():
+def fix_umask() -> None:
if support.is_emscripten:
# Emscripten has default umask 0o777, which breaks some tests.
# see https://github.com/emscripten-core/emscripten/issues/17269
'setUpModule', 'tearDownModule',
))
-def normalize_test_name(test_full_name, *, is_error=False):
+def normalize_test_name(test_full_name: str, *,
+ is_error: bool = False) -> str | None:
short_name = test_full_name.split(" ")[0]
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
return short_name
-def adjust_rlimit_nofile():
+def adjust_rlimit_nofile() -> None:
"""
On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
for our test suite to succeed. Raise it to something more reasonable. 1024
f"{new_fd_limit}: {err}.")
-def get_host_runner():
+def get_host_runner() -> str:
if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
hostrunner = sysconfig.get_config_var("HOSTRUNNER")
return hostrunner
-def is_cross_compiled():
+def is_cross_compiled() -> bool:
return ('_PYTHON_HOST_PLATFORM' in os.environ)
-def format_resources(use_resources: Iterable[str]):
+def format_resources(use_resources: Iterable[str]) -> str:
use_resources = set(use_resources)
all_resources = set(ALL_RESOURCES)
def display_header(use_resources: tuple[str, ...],
- python_cmd: tuple[str, ...] | None):
+ python_cmd: tuple[str, ...] | None) -> None:
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
print(flush=True)
-def cleanup_temp_dir(tmp_dir: StrPath):
+def cleanup_temp_dir(tmp_dir: StrPath) -> None:
import glob
path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
return ''.join(f'\\x{ord(ch):02x}' if ch <= '\xff' else ascii(ch)[1:-1]
for ch in text)
-def sanitize_xml(text):
+def sanitize_xml(text: str) -> str:
return ILLEGAL_XML_CHARS_RE.sub(_sanitize_xml_replace, text)
sys.exit(0)
-def main():
+def main() -> NoReturn:
if len(sys.argv) != 2:
print("usage: python -m test.libregrtest.worker JSON")
sys.exit(1)
# Requirements file for external linters and checks we run on
# Tools/clinic, Tools/cases_generator/, and Tools/peg_generator/ in CI
-mypy==1.12
+mypy==1.13
# needed for peg_generator:
types-psutil==6.0.0.20240901