]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-109162: libregrtest: add single.py and result.py (#109243)
authorVictor Stinner <vstinner@python.org>
Mon, 11 Sep 2023 00:07:18 +0000 (02:07 +0200)
committerGitHub <noreply@github.com>
Mon, 11 Sep 2023 00:07:18 +0000 (00:07 +0000)
* Add single.py and result.py files.
* Rename runtest.py to runtests.py.
* Move run_single_test() function and its helper functions to
  single.py.
* Move remove_testfn(), abs_module_name() and normalize_test_name()
  to utils.py.
* Move setup_support() to setup.py.
* Move type hints like TestName to utils.py.
* Rename runtest.py to runtests.py.

14 files changed:
Lib/test/libregrtest/findtests.py [new file with mode: 0644]
Lib/test/libregrtest/logger.py
Lib/test/libregrtest/main.py
Lib/test/libregrtest/refleak.py
Lib/test/libregrtest/result.py [new file with mode: 0644]
Lib/test/libregrtest/results.py
Lib/test/libregrtest/runtest.py [deleted file]
Lib/test/libregrtest/runtest_mp.py
Lib/test/libregrtest/runtests.py [new file with mode: 0644]
Lib/test/libregrtest/setup.py
Lib/test/libregrtest/single.py [new file with mode: 0644]
Lib/test/libregrtest/utils.py
Lib/test/libregrtest/worker.py
Lib/test/test_regrtest.py

diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py
new file mode 100644 (file)
index 0000000..88570be
--- /dev/null
@@ -0,0 +1,58 @@
+import os
+
+from test.libregrtest.utils import StrPath, TestName, TestList
+
+
+# If these test directories are encountered recurse into them and treat each
+# "test_*.py" file or each sub-directory as a separate test module. This can
+# increase parallelism.
+#
+# Beware this can't generally be done for any directory with sub-tests as the
+# __init__.py may do things which alter what tests are to be run.
+SPLITTESTDIRS: set[TestName] = {
+    "test_asyncio",
+    "test_concurrent_futures",
+    "test_multiprocessing_fork",
+    "test_multiprocessing_forkserver",
+    "test_multiprocessing_spawn",
+}
+
+
+def findtestdir(path=None):
+    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
+
+
+def findtests(*, testdir: StrPath | None = None, exclude=(),
+              split_test_dirs: set[TestName] = SPLITTESTDIRS,
+              base_mod: str = "") -> TestList:
+    """Return a list of all applicable test modules."""
+    testdir = findtestdir(testdir)
+    tests = []
+    for name in os.listdir(testdir):
+        mod, ext = os.path.splitext(name)
+        if (not mod.startswith("test_")) or (mod in exclude):
+            continue
+        if mod in split_test_dirs:
+            subdir = os.path.join(testdir, mod)
+            mod = f"{base_mod or 'test'}.{mod}"
+            tests.extend(findtests(testdir=subdir, exclude=exclude,
+                                   split_test_dirs=split_test_dirs,
+                                   base_mod=mod))
+        elif ext in (".py", ""):
+            tests.append(f"{base_mod}.{mod}" if base_mod else mod)
+    return sorted(tests)
+
+
+def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
+                        split_test_dirs=SPLITTESTDIRS):
+    testdir = findtestdir(testdir)
+    splitted = []
+    for name in tests:
+        if name in split_test_dirs:
+            subdir = os.path.join(testdir, name)
+            splitted.extend(findtests(testdir=subdir, exclude=exclude,
+                                      split_test_dirs=split_test_dirs,
+                                      base_mod=name))
+        else:
+            splitted.append(name)
+    return splitted
index c4498a43545545e1c180fcec9791b575ea13828c..05b9307c97ded11f07b349032a219e61133b8cd1 100644 (file)
@@ -1,7 +1,7 @@
 import os
 import time
 
-from test.libregrtest.runtest import RunTests
+from test.libregrtest.runtests import RunTests
 from test.libregrtest.utils import print_warning, MS_WINDOWS
 
 if MS_WINDOWS:
index ed0813d6f30c10b1b6c161e5a2c29e6af93c3bb1..0227a2d1fc612e9b1e11b3fc83773b0bfc485cf0 100644 (file)
@@ -11,17 +11,19 @@ from test import support
 from test.support import os_helper
 
 from test.libregrtest.cmdline import _parse_args, Namespace
+from test.libregrtest.findtests import findtests, split_test_packages
 from test.libregrtest.logger import Logger
-from test.libregrtest.runtest import (
-    findtests, split_test_packages, run_single_test, abs_module_name,
-    PROGRESS_MIN_TIME, State, RunTests, HuntRefleak,
-    FilterTuple, TestList, StrJSON, TestName)
+from test.libregrtest.result import State
+from test.libregrtest.runtests import RunTests, HuntRefleak
 from test.libregrtest.setup import setup_tests, setup_test_dir
+from test.libregrtest.single import run_single_test, PROGRESS_MIN_TIME
 from test.libregrtest.pgo import setup_pgo_tests
 from test.libregrtest.results import TestResults
 from test.libregrtest.utils import (
-    strip_py_suffix, count, format_duration, StrPath,
-    printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout)
+    StrPath, StrJSON, TestName, TestList, FilterTuple,
+    strip_py_suffix, count, format_duration,
+    printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout,
+     abs_module_name)
 
 
 class Regrtest:
index 81f163c47e56656ee96622f5ea0fb018797c06d4..6b1d7082ba46ba391faf570b74f94313e4d4a009 100644 (file)
@@ -1,10 +1,11 @@
-import os
 import sys
 import warnings
 from inspect import isabstract
+
 from test import support
 from test.support import os_helper
-from test.libregrtest.runtest import HuntRefleak
+
+from test.libregrtest.runtests import HuntRefleak
 from test.libregrtest.utils import clear_caches
 
 try:
diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py
new file mode 100644 (file)
index 0000000..4a68872
--- /dev/null
@@ -0,0 +1,184 @@
+import dataclasses
+import json
+from typing import Any
+
+from test.support import TestStats
+
+from test.libregrtest.utils import (
+    TestName, FilterTuple,
+    format_duration, normalize_test_name, print_warning)
+
+
+# Avoid enum.Enum to reduce the number of imports when tests are run
+class State:
+    PASSED = "PASSED"
+    FAILED = "FAILED"
+    SKIPPED = "SKIPPED"
+    UNCAUGHT_EXC = "UNCAUGHT_EXC"
+    REFLEAK = "REFLEAK"
+    ENV_CHANGED = "ENV_CHANGED"
+    RESOURCE_DENIED = "RESOURCE_DENIED"
+    INTERRUPTED = "INTERRUPTED"
+    MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
+    DID_NOT_RUN = "DID_NOT_RUN"
+    TIMEOUT = "TIMEOUT"
+
+    @staticmethod
+    def is_failed(state):
+        return state in {
+            State.FAILED,
+            State.UNCAUGHT_EXC,
+            State.REFLEAK,
+            State.MULTIPROCESSING_ERROR,
+            State.TIMEOUT}
+
+    @staticmethod
+    def has_meaningful_duration(state):
+        # Consider that the duration is meaningless for these cases.
+        # For example, if a whole test file is skipped, its duration
+        # is unlikely to be the duration of executing its tests,
+        # but just the duration to execute code which skips the test.
+        return state not in {
+            State.SKIPPED,
+            State.RESOURCE_DENIED,
+            State.INTERRUPTED,
+            State.MULTIPROCESSING_ERROR,
+            State.DID_NOT_RUN}
+
+    @staticmethod
+    def must_stop(state):
+        return state in {
+            State.INTERRUPTED,
+            State.MULTIPROCESSING_ERROR}
+
+
+@dataclasses.dataclass(slots=True)
+class TestResult:
+    test_name: TestName
+    state: str | None = None
+    # Test duration in seconds
+    duration: float | None = None
+    xml_data: list[str] | None = None
+    stats: TestStats | None = None
+
+    # errors and failures copied from support.TestFailedWithDetails
+    errors: list[tuple[str, str]] | None = None
+    failures: list[tuple[str, str]] | None = None
+
+    def is_failed(self, fail_env_changed: bool) -> bool:
+        if self.state == State.ENV_CHANGED:
+            return fail_env_changed
+        return State.is_failed(self.state)
+
+    def _format_failed(self):
+        if self.errors and self.failures:
+            le = len(self.errors)
+            lf = len(self.failures)
+            error_s = "error" + ("s" if le > 1 else "")
+            failure_s = "failure" + ("s" if lf > 1 else "")
+            return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
+
+        if self.errors:
+            le = len(self.errors)
+            error_s = "error" + ("s" if le > 1 else "")
+            return f"{self.test_name} failed ({le} {error_s})"
+
+        if self.failures:
+            lf = len(self.failures)
+            failure_s = "failure" + ("s" if lf > 1 else "")
+            return f"{self.test_name} failed ({lf} {failure_s})"
+
+        return f"{self.test_name} failed"
+
+    def __str__(self) -> str:
+        match self.state:
+            case State.PASSED:
+                return f"{self.test_name} passed"
+            case State.FAILED:
+                return self._format_failed()
+            case State.SKIPPED:
+                return f"{self.test_name} skipped"
+            case State.UNCAUGHT_EXC:
+                return f"{self.test_name} failed (uncaught exception)"
+            case State.REFLEAK:
+                return f"{self.test_name} failed (reference leak)"
+            case State.ENV_CHANGED:
+                return f"{self.test_name} failed (env changed)"
+            case State.RESOURCE_DENIED:
+                return f"{self.test_name} skipped (resource denied)"
+            case State.INTERRUPTED:
+                return f"{self.test_name} interrupted"
+            case State.MULTIPROCESSING_ERROR:
+                return f"{self.test_name} process crashed"
+            case State.DID_NOT_RUN:
+                return f"{self.test_name} ran no tests"
+            case State.TIMEOUT:
+                return f"{self.test_name} timed out ({format_duration(self.duration)})"
+            case _:
+                raise ValueError("unknown result state: {state!r}")
+
+    def has_meaningful_duration(self):
+        return State.has_meaningful_duration(self.state)
+
+    def set_env_changed(self):
+        if self.state is None or self.state == State.PASSED:
+            self.state = State.ENV_CHANGED
+
+    def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+        if State.must_stop(self.state):
+            return True
+        if fail_fast and self.is_failed(fail_env_changed):
+            return True
+        return False
+
+    def get_rerun_match_tests(self) -> FilterTuple | None:
+        match_tests = []
+
+        errors = self.errors or []
+        failures = self.failures or []
+        for error_list, is_error in (
+            (errors, True),
+            (failures, False),
+        ):
+            for full_name, *_ in error_list:
+                match_name = normalize_test_name(full_name, is_error=is_error)
+                if match_name is None:
+                    # 'setUpModule (test.test_sys)': don't filter tests
+                    return None
+                if not match_name:
+                    error_type = "ERROR" if is_error else "FAIL"
+                    print_warning(f"rerun failed to parse {error_type} test name: "
+                                  f"{full_name!r}: don't filter tests")
+                    return None
+                match_tests.append(match_name)
+
+        if not match_tests:
+            return None
+        return tuple(match_tests)
+
+    def write_json(self, file) -> None:
+        json.dump(self, file, cls=_EncodeTestResult)
+
+    @staticmethod
+    def from_json(worker_json) -> 'TestResult':
+        return json.loads(worker_json, object_hook=_decode_test_result)
+
+
+class _EncodeTestResult(json.JSONEncoder):
+    def default(self, o: Any) -> dict[str, Any]:
+        if isinstance(o, TestResult):
+            result = dataclasses.asdict(o)
+            result["__test_result__"] = o.__class__.__name__
+            return result
+        else:
+            return super().default(o)
+
+
+def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
+    if "__test_result__" in data:
+        data.pop('__test_result__')
+        if data['stats'] is not None:
+            data['stats'] = TestStats(**data['stats'])
+        return TestResult(**data)
+    else:
+        return data
index e44301938c6527c6dfab85b0fe36eb2fa34d668a..b7a044eae25aaeaa596d7c6d44a5f128fb2c2201 100644 (file)
@@ -1,11 +1,11 @@
 import sys
 from test.support import TestStats
 
-from test.libregrtest.runtest import (
-    TestName, TestTuple, TestList, FilterDict, State,
-    TestResult, RunTests)
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.result import State, TestResult
 from test.libregrtest.utils import (
-    printlist, count, format_duration, StrPath)
+    StrPath, TestName, TestTuple, TestList, FilterDict,
+    printlist, count, format_duration)
 
 
 EXITCODE_BAD_TEST = 2
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644 (file)
index a12c7fc..0000000
+++ /dev/null
@@ -1,676 +0,0 @@
-import dataclasses
-import doctest
-import faulthandler
-import gc
-import importlib
-import io
-import json
-import os
-import sys
-import time
-import traceback
-import unittest
-from typing import Any
-
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import (
-    clear_caches, format_duration, print_warning, StrPath)
-
-
-StrJSON = str
-TestName = str
-TestTuple = tuple[TestName, ...]
-TestList = list[TestName]
-
-# --match and --ignore options: list of patterns
-# ('*' joker character can be used)
-FilterTuple = tuple[TestName, ...]
-FilterDict = dict[TestName, FilterTuple]
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class HuntRefleak:
-    warmups: int
-    runs: int
-    filename: StrPath
-
-
-# Avoid enum.Enum to reduce the number of imports when tests are run
-class State:
-    PASSED = "PASSED"
-    FAILED = "FAILED"
-    SKIPPED = "SKIPPED"
-    UNCAUGHT_EXC = "UNCAUGHT_EXC"
-    REFLEAK = "REFLEAK"
-    ENV_CHANGED = "ENV_CHANGED"
-    RESOURCE_DENIED = "RESOURCE_DENIED"
-    INTERRUPTED = "INTERRUPTED"
-    MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
-    DID_NOT_RUN = "DID_NOT_RUN"
-    TIMEOUT = "TIMEOUT"
-
-    @staticmethod
-    def is_failed(state):
-        return state in {
-            State.FAILED,
-            State.UNCAUGHT_EXC,
-            State.REFLEAK,
-            State.MULTIPROCESSING_ERROR,
-            State.TIMEOUT}
-
-    @staticmethod
-    def has_meaningful_duration(state):
-        # Consider that the duration is meaningless for these cases.
-        # For example, if a whole test file is skipped, its duration
-        # is unlikely to be the duration of executing its tests,
-        # but just the duration to execute code which skips the test.
-        return state not in {
-            State.SKIPPED,
-            State.RESOURCE_DENIED,
-            State.INTERRUPTED,
-            State.MULTIPROCESSING_ERROR,
-            State.DID_NOT_RUN}
-
-    @staticmethod
-    def must_stop(state):
-        return state in {
-            State.INTERRUPTED,
-            State.MULTIPROCESSING_ERROR}
-
-
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
-    'setUpClass', 'tearDownClass',
-    'setUpModule', 'tearDownModule',
-))
-
-def normalize_test_name(test_full_name, *, is_error=False):
-    short_name = test_full_name.split(" ")[0]
-    if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
-        if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
-            # if setUpModule() or tearDownModule() failed, don't filter
-            # tests with the test file name, don't use use filters.
-            return None
-
-        # This means that we have a failure in a life-cycle hook,
-        # we need to rerun the whole module or class suite.
-        # Basically the error looks like this:
-        #    ERROR: setUpClass (test.test_reg_ex.RegTest)
-        # or
-        #    ERROR: setUpModule (test.test_reg_ex)
-        # So, we need to parse the class / module name.
-        lpar = test_full_name.index('(')
-        rpar = test_full_name.index(')')
-        return test_full_name[lpar + 1: rpar].split('.')[-1]
-    return short_name
-
-
-@dataclasses.dataclass(slots=True)
-class TestResult:
-    test_name: TestName
-    state: str | None = None
-    # Test duration in seconds
-    duration: float | None = None
-    xml_data: list[str] | None = None
-    stats: TestStats | None = None
-
-    # errors and failures copied from support.TestFailedWithDetails
-    errors: list[tuple[str, str]] | None = None
-    failures: list[tuple[str, str]] | None = None
-
-    def is_failed(self, fail_env_changed: bool) -> bool:
-        if self.state == State.ENV_CHANGED:
-            return fail_env_changed
-        return State.is_failed(self.state)
-
-    def _format_failed(self):
-        if self.errors and self.failures:
-            le = len(self.errors)
-            lf = len(self.failures)
-            error_s = "error" + ("s" if le > 1 else "")
-            failure_s = "failure" + ("s" if lf > 1 else "")
-            return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
-
-        if self.errors:
-            le = len(self.errors)
-            error_s = "error" + ("s" if le > 1 else "")
-            return f"{self.test_name} failed ({le} {error_s})"
-
-        if self.failures:
-            lf = len(self.failures)
-            failure_s = "failure" + ("s" if lf > 1 else "")
-            return f"{self.test_name} failed ({lf} {failure_s})"
-
-        return f"{self.test_name} failed"
-
-    def __str__(self) -> str:
-        match self.state:
-            case State.PASSED:
-                return f"{self.test_name} passed"
-            case State.FAILED:
-                return self._format_failed()
-            case State.SKIPPED:
-                return f"{self.test_name} skipped"
-            case State.UNCAUGHT_EXC:
-                return f"{self.test_name} failed (uncaught exception)"
-            case State.REFLEAK:
-                return f"{self.test_name} failed (reference leak)"
-            case State.ENV_CHANGED:
-                return f"{self.test_name} failed (env changed)"
-            case State.RESOURCE_DENIED:
-                return f"{self.test_name} skipped (resource denied)"
-            case State.INTERRUPTED:
-                return f"{self.test_name} interrupted"
-            case State.MULTIPROCESSING_ERROR:
-                return f"{self.test_name} process crashed"
-            case State.DID_NOT_RUN:
-                return f"{self.test_name} ran no tests"
-            case State.TIMEOUT:
-                return f"{self.test_name} timed out ({format_duration(self.duration)})"
-            case _:
-                raise ValueError("unknown result state: {state!r}")
-
-    def has_meaningful_duration(self):
-        return State.has_meaningful_duration(self.state)
-
-    def set_env_changed(self):
-        if self.state is None or self.state == State.PASSED:
-            self.state = State.ENV_CHANGED
-
-    def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
-        if State.must_stop(self.state):
-            return True
-        if fail_fast and self.is_failed(fail_env_changed):
-            return True
-        return False
-
-    def get_rerun_match_tests(self) -> FilterTuple | None:
-        match_tests = []
-
-        errors = self.errors or []
-        failures = self.failures or []
-        for error_list, is_error in (
-            (errors, True),
-            (failures, False),
-        ):
-            for full_name, *_ in error_list:
-                match_name = normalize_test_name(full_name, is_error=is_error)
-                if match_name is None:
-                    # 'setUpModule (test.test_sys)': don't filter tests
-                    return None
-                if not match_name:
-                    error_type = "ERROR" if is_error else "FAIL"
-                    print_warning(f"rerun failed to parse {error_type} test name: "
-                                  f"{full_name!r}: don't filter tests")
-                    return None
-                match_tests.append(match_name)
-
-        if not match_tests:
-            return None
-        return tuple(match_tests)
-
-    def write_json(self, file) -> None:
-        json.dump(self, file, cls=_EncodeTestResult)
-
-    @staticmethod
-    def from_json(worker_json) -> 'TestResult':
-        return json.loads(worker_json, object_hook=_decode_test_result)
-
-
-class _EncodeTestResult(json.JSONEncoder):
-    def default(self, o: Any) -> dict[str, Any]:
-        if isinstance(o, TestResult):
-            result = dataclasses.asdict(o)
-            result["__test_result__"] = o.__class__.__name__
-            return result
-        else:
-            return super().default(o)
-
-
-def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
-    if "__test_result__" in data:
-        data.pop('__test_result__')
-        if data['stats'] is not None:
-            data['stats'] = TestStats(**data['stats'])
-        return TestResult(**data)
-    else:
-        return data
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class RunTests:
-    tests: TestTuple
-    fail_fast: bool = False
-    fail_env_changed: bool = False
-    match_tests: FilterTuple | None = None
-    ignore_tests: FilterTuple | None = None
-    match_tests_dict: FilterDict | None = None
-    rerun: bool = False
-    forever: bool = False
-    pgo: bool = False
-    pgo_extended: bool = False
-    output_on_failure: bool = False
-    timeout: float | None = None
-    verbose: bool = False
-    quiet: bool = False
-    hunt_refleak: HuntRefleak | None = None
-    test_dir: StrPath | None = None
-    use_junit: bool = False
-    memory_limit: str | None = None
-    gc_threshold: int | None = None
-    use_resources: list[str] = None
-    python_cmd: list[str] | None = None
-
-    def copy(self, **override):
-        state = dataclasses.asdict(self)
-        state.update(override)
-        return RunTests(**state)
-
-    def get_match_tests(self, test_name) -> FilterTuple | None:
-        if self.match_tests_dict is not None:
-            return self.match_tests_dict.get(test_name, None)
-        else:
-            return None
-
-    def iter_tests(self):
-        if self.forever:
-            while True:
-                yield from self.tests
-        else:
-            yield from self.tests
-
-    def as_json(self) -> StrJSON:
-        return json.dumps(self, cls=_EncodeRunTests)
-
-    @staticmethod
-    def from_json(worker_json: StrJSON) -> 'RunTests':
-        return json.loads(worker_json, object_hook=_decode_runtests)
-
-
-class _EncodeRunTests(json.JSONEncoder):
-    def default(self, o: Any) -> dict[str, Any]:
-        if isinstance(o, RunTests):
-            result = dataclasses.asdict(o)
-            result["__runtests__"] = True
-            return result
-        else:
-            return super().default(o)
-
-
-def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
-    if "__runtests__" in data:
-        data.pop('__runtests__')
-        if data['hunt_refleak']:
-            data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
-        return RunTests(**data)
-    else:
-        return data
-
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0   # seconds
-
-#If these test directories are encountered recurse into them and treat each
-# test_ .py or dir as a separate test module. This can increase parallelism.
-# Beware this can't generally be done for any directory with sub-tests as the
-# __init__.py may do things which alter what tests are to be run.
-
-SPLITTESTDIRS: set[TestName] = {
-    "test_asyncio",
-    "test_concurrent_futures",
-    "test_multiprocessing_fork",
-    "test_multiprocessing_forkserver",
-    "test_multiprocessing_spawn",
-}
-
-
-def findtestdir(path=None):
-    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(*, testdir: StrPath | None = None, exclude=(),
-              split_test_dirs: set[TestName] = SPLITTESTDIRS,
-              base_mod: str = "") -> TestList:
-    """Return a list of all applicable test modules."""
-    testdir = findtestdir(testdir)
-    tests = []
-    for name in os.listdir(testdir):
-        mod, ext = os.path.splitext(name)
-        if (not mod.startswith("test_")) or (mod in exclude):
-            continue
-        if mod in split_test_dirs:
-            subdir = os.path.join(testdir, mod)
-            mod = f"{base_mod or 'test'}.{mod}"
-            tests.extend(findtests(testdir=subdir, exclude=exclude,
-                                   split_test_dirs=split_test_dirs,
-                                   base_mod=mod))
-        elif ext in (".py", ""):
-            tests.append(f"{base_mod}.{mod}" if base_mod else mod)
-    return sorted(tests)
-
-
-def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
-                        split_test_dirs=SPLITTESTDIRS):
-    testdir = findtestdir(testdir)
-    splitted = []
-    for name in tests:
-        if name in split_test_dirs:
-            subdir = os.path.join(testdir, name)
-            splitted.extend(findtests(testdir=subdir, exclude=exclude,
-                                      split_test_dirs=split_test_dirs,
-                                      base_mod=name))
-        else:
-            splitted.append(name)
-    return splitted
-
-
-def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
-    if test_name.startswith('test.') or test_dir:
-        return test_name
-    else:
-        # Import it from the test package
-        return 'test.' + test_name
-
-
-def setup_support(runtests: RunTests):
-    support.PGO = runtests.pgo
-    support.PGO_EXTENDED = runtests.pgo_extended
-    support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
-    support.failfast = runtests.fail_fast
-    support.verbose = runtests.verbose
-    if runtests.use_junit:
-        support.junit_xml_list = []
-    else:
-        support.junit_xml_list = None
-
-
-def _runtest(result: TestResult, runtests: RunTests) -> None:
-    # Capture stdout and stderr, set faulthandler timeout,
-    # and create JUnit XML report.
-    verbose = runtests.verbose
-    output_on_failure = runtests.output_on_failure
-    timeout = runtests.timeout
-
-    use_timeout = (
-        timeout is not None and threading_helper.can_start_thread
-    )
-    if use_timeout:
-        faulthandler.dump_traceback_later(timeout, exit=True)
-
-    try:
-        setup_support(runtests)
-
-        if output_on_failure:
-            support.verbose = True
-
-            stream = io.StringIO()
-            orig_stdout = sys.stdout
-            orig_stderr = sys.stderr
-            print_warning = support.print_warning
-            orig_print_warnings_stderr = print_warning.orig_stderr
-
-            output = None
-            try:
-                sys.stdout = stream
-                sys.stderr = stream
-                # print_warning() writes into the temporary stream to preserve
-                # messages order. If support.environment_altered becomes true,
-                # warnings will be written to sys.stderr below.
-                print_warning.orig_stderr = stream
-
-                _runtest_env_changed_exc(result, runtests, display_failure=False)
-                # Ignore output if the test passed successfully
-                if result.state != State.PASSED:
-                    output = stream.getvalue()
-            finally:
-                sys.stdout = orig_stdout
-                sys.stderr = orig_stderr
-                print_warning.orig_stderr = orig_print_warnings_stderr
-
-            if output is not None:
-                sys.stderr.write(output)
-                sys.stderr.flush()
-        else:
-            # Tell tests to be moderately quiet
-            support.verbose = verbose
-            _runtest_env_changed_exc(result, runtests,
-                                     display_failure=not verbose)
-
-        xml_list = support.junit_xml_list
-        if xml_list:
-            import xml.etree.ElementTree as ET
-            result.xml_data = [ET.tostring(x).decode('us-ascii')
-                               for x in xml_list]
-    finally:
-        if use_timeout:
-            faulthandler.cancel_dump_traceback_later()
-        support.junit_xml_list = None
-
-
-def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
-    """Run a single test.
-
-    test_name -- the name of the test
-
-    Returns a TestResult.
-
-    If runtests.use_junit, xml_data is a list containing each generated
-    testsuite element.
-    """
-    start_time = time.perf_counter()
-    result = TestResult(test_name)
-    pgo = runtests.pgo
-    try:
-        _runtest(result, runtests)
-    except:
-        if not pgo:
-            msg = traceback.format_exc()
-            print(f"test {test_name} crashed -- {msg}",
-                  file=sys.stderr, flush=True)
-        result.state = State.UNCAUGHT_EXC
-    result.duration = time.perf_counter() - start_time
-    return result
-
-
-def run_unittest(test_mod):
-    loader = unittest.TestLoader()
-    tests = loader.loadTestsFromModule(test_mod)
-    for error in loader.errors:
-        print(error, file=sys.stderr)
-    if loader.errors:
-        raise Exception("errors while loading tests")
-    return support.run_unittest(tests)
-
-
-def save_env(test_name: TestName, runtests: RunTests):
-    return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
-                                  pgo=runtests.pgo)
-
-
-def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
-    # Run test_func(), collect statistics, and detect reference and memory
-    # leaks.
-    if runtests.hunt_refleak:
-        from test.libregrtest.refleak import runtest_refleak
-        refleak, test_result = runtest_refleak(result.test_name, test_func,
-                                               runtests.hunt_refleak,
-                                               runtests.quiet)
-    else:
-        test_result = test_func()
-        refleak = False
-
-    if refleak:
-        result.state = State.REFLEAK
-
-    match test_result:
-        case TestStats():
-            stats = test_result
-        case unittest.TestResult():
-            stats = TestStats.from_unittest(test_result)
-        case doctest.TestResults():
-            stats = TestStats.from_doctest(test_result)
-        case None:
-            print_warning(f"{result.test_name} test runner returned None: {test_func}")
-            stats = None
-        case _:
-            print_warning(f"Unknown test result type: {type(test_result)}")
-            stats = None
-
-    result.stats = stats
-
-
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
-
-def _load_run_test(result: TestResult, runtests: RunTests) -> None:
-    # Load the test function, run the test function.
-    module_name = abs_module_name(result.test_name, runtests.test_dir)
-
-    # Remove the module from sys.module to reload it if it was already imported
-    sys.modules.pop(module_name, None)
-
-    test_mod = importlib.import_module(module_name)
-
-    if hasattr(test_mod, "test_main"):
-        # https://github.com/python/cpython/issues/89392
-        raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
-    def test_func():
-        return run_unittest(test_mod)
-
-    try:
-        with save_env(result.test_name, runtests):
-            regrtest_runner(result, test_func, runtests)
-    finally:
-        # First kill any dangling references to open files etc.
-        # This can also issue some ResourceWarnings which would otherwise get
-        # triggered during the following test run, and possibly produce
-        # failures.
-        support.gc_collect()
-
-        remove_testfn(result.test_name, runtests.verbose)
-
-    if gc.garbage:
-        support.environment_altered = True
-        print_warning(f"{result.test_name} created {len(gc.garbage)} "
-                      f"uncollectable object(s)")
-
-        # move the uncollectable objects somewhere,
-        # so we don't see them again
-        FOUND_GARBAGE.extend(gc.garbage)
-        gc.garbage.clear()
-
-    support.reap_children()
-
-
-def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
-                             display_failure: bool = True) -> None:
-    # Detect environment changes, handle exceptions.
-
-    # Reset the environment_altered flag to detect if a test altered
-    # the environment
-    support.environment_altered = False
-
-    pgo = runtests.pgo
-    if pgo:
-        display_failure = False
-    quiet = runtests.quiet
-
-    test_name = result.test_name
-    try:
-        clear_caches()
-        support.gc_collect()
-
-        with save_env(test_name, runtests):
-            _load_run_test(result, runtests)
-    except support.ResourceDenied as msg:
-        if not quiet and not pgo:
-            print(f"{test_name} skipped -- {msg}", flush=True)
-        result.state = State.RESOURCE_DENIED
-        return
-    except unittest.SkipTest as msg:
-        if not quiet and not pgo:
-            print(f"{test_name} skipped -- {msg}", flush=True)
-        result.state = State.SKIPPED
-        return
-    except support.TestFailedWithDetails as exc:
-        msg = f"test {test_name} failed"
-        if display_failure:
-            msg = f"{msg} -- {exc}"
-        print(msg, file=sys.stderr, flush=True)
-        result.state = State.FAILED
-        result.errors = exc.errors
-        result.failures = exc.failures
-        result.stats = exc.stats
-        return
-    except support.TestFailed as exc:
-        msg = f"test {test_name} failed"
-        if display_failure:
-            msg = f"{msg} -- {exc}"
-        print(msg, file=sys.stderr, flush=True)
-        result.state = State.FAILED
-        result.stats = exc.stats
-        return
-    except support.TestDidNotRun:
-        result.state = State.DID_NOT_RUN
-        return
-    except KeyboardInterrupt:
-        print()
-        result.state = State.INTERRUPTED
-        return
-    except:
-        if not pgo:
-            msg = traceback.format_exc()
-            print(f"test {test_name} crashed -- {msg}",
-                  file=sys.stderr, flush=True)
-        result.state = State.UNCAUGHT_EXC
-        return
-
-    if support.environment_altered:
-        result.set_env_changed()
-    # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
-    if result.state is None:
-        result.state = State.PASSED
-
-
-def remove_testfn(test_name: TestName, verbose: int) -> None:
-    # Try to clean up os_helper.TESTFN if left behind.
-    #
-    # While tests shouldn't leave any files or directories behind, when a test
-    # fails that can be tedious for it to arrange.  The consequences can be
-    # especially nasty on Windows, since if a test leaves a file open, it
-    # cannot be deleted by name (while there's nothing we can do about that
-    # here either, we can display the name of the offending test, which is a
-    # real help).
-    name = os_helper.TESTFN
-    if not os.path.exists(name):
-        return
-
-    if os.path.isdir(name):
-        import shutil
-        kind, nuker = "directory", shutil.rmtree
-    elif os.path.isfile(name):
-        kind, nuker = "file", os.unlink
-    else:
-        raise RuntimeError(f"os.path says {name!r} exists but is neither "
-                           f"directory nor file")
-
-    if verbose:
-        print_warning(f"{test_name} left behind {kind} {name!r}")
-        support.environment_altered = True
-
-    try:
-        import stat
-        # fix possible permissions problems that might prevent cleanup
-        os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
-        nuker(name)
-    except Exception as exc:
-        print_warning(f"{test_name} left behind {kind} {name!r} "
-                      f"and it couldn't be removed: {exc}")
index c1bd911c43e2c48c18473b7e048e40388e0e53fa..f576d49e85db93ded577b04a3c2377caf7e6333b 100644 (file)
@@ -15,12 +15,13 @@ from test import support
 from test.support import os_helper
 
 from test.libregrtest.main import Regrtest
-from test.libregrtest.runtest import (
-    TestResult, State, PROGRESS_MIN_TIME,
-    RunTests, TestName)
+from test.libregrtest.result import TestResult, State
 from test.libregrtest.results import TestResults
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.single import PROGRESS_MIN_TIME
 from test.libregrtest.utils import (
-    format_duration, print_warning, StrPath)
+    StrPath, TestName,
+    format_duration, print_warning)
 from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP
 
 if sys.platform == 'win32':
diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py
new file mode 100644 (file)
index 0000000..366c6f1
--- /dev/null
@@ -0,0 +1,83 @@
+import dataclasses
+import json
+from typing import Any
+
+from test.libregrtest.utils import (
+    StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class HuntRefleak:
+    warmups: int
+    runs: int
+    filename: StrPath
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+    tests: TestTuple
+    fail_fast: bool = False
+    fail_env_changed: bool = False
+    match_tests: FilterTuple | None = None
+    ignore_tests: FilterTuple | None = None
+    match_tests_dict: FilterDict | None = None
+    rerun: bool = False
+    forever: bool = False
+    pgo: bool = False
+    pgo_extended: bool = False
+    output_on_failure: bool = False
+    timeout: float | None = None
+    verbose: bool = False
+    quiet: bool = False
+    hunt_refleak: HuntRefleak | None = None
+    test_dir: StrPath | None = None
+    use_junit: bool = False
+    memory_limit: str | None = None
+    gc_threshold: int | None = None
+    use_resources: list[str] = None
+    python_cmd: list[str] | None = None
+
+    def copy(self, **override):
+        state = dataclasses.asdict(self)
+        state.update(override)
+        return RunTests(**state)
+
+    def get_match_tests(self, test_name) -> FilterTuple | None:
+        if self.match_tests_dict is not None:
+            return self.match_tests_dict.get(test_name, None)
+        else:
+            return None
+
+    def iter_tests(self):
+        if self.forever:
+            while True:
+                yield from self.tests
+        else:
+            yield from self.tests
+
+    def as_json(self) -> StrJSON:
+        return json.dumps(self, cls=_EncodeRunTests)
+
+    @staticmethod
+    def from_json(worker_json: StrJSON) -> 'RunTests':
+        return json.loads(worker_json, object_hook=_decode_runtests)
+
+
+class _EncodeRunTests(json.JSONEncoder):
+    def default(self, o: Any) -> dict[str, Any]:
+        if isinstance(o, RunTests):
+            result = dataclasses.asdict(o)
+            result["__runtests__"] = True
+            return result
+        else:
+            return super().default(o)
+
+
+def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
+    if "__runtests__" in data:
+        data.pop('__runtests__')
+        if data['hunt_refleak']:
+            data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+        return RunTests(**data)
+    else:
+        return data
index 48eb8b6800af1edcb01f4341bff7de6bce75223f..20ef3dc38cbd04083e1a935d8fa7155314a1d590 100644 (file)
@@ -11,6 +11,7 @@ try:
 except ImportError:
     gc = None
 
+from test.libregrtest.runtests import RunTests
 from test.libregrtest.utils import (
     setup_unraisable_hook, setup_threading_excepthook, fix_umask)
 
@@ -25,6 +26,18 @@ def setup_test_dir(testdir: str | None) -> None:
         sys.path.insert(0, os.path.abspath(testdir))
 
 
+def setup_support(runtests: RunTests):
+    support.PGO = runtests.pgo
+    support.PGO_EXTENDED = runtests.pgo_extended
+    support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
+    support.failfast = runtests.fail_fast
+    support.verbose = runtests.verbose
+    if runtests.use_junit:
+        support.junit_xml_list = []
+    else:
+        support.junit_xml_list = None
+
+
 def setup_tests(runtests):
     fix_umask()
 
diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py
new file mode 100644 (file)
index 0000000..bb33387
--- /dev/null
@@ -0,0 +1,275 @@
+import doctest
+import faulthandler
+import gc
+import importlib
+import io
+import sys
+import time
+import traceback
+import unittest
+
+from test import support
+from test.support import TestStats
+from test.support import threading_helper
+
+from test.libregrtest.result import State, TestResult
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.save_env import saved_test_environment
+from test.libregrtest.setup import setup_support
+from test.libregrtest.utils import (
+    TestName,
+    clear_caches, remove_testfn, abs_module_name, print_warning)
+
+
+# Minimum duration of a test to display its duration or to mention that
+# the test is running in background
+PROGRESS_MIN_TIME = 30.0   # seconds
+
+
+def run_unittest(test_mod):
+    loader = unittest.TestLoader()
+    tests = loader.loadTestsFromModule(test_mod)
+    for error in loader.errors:
+        print(error, file=sys.stderr)
+    if loader.errors:
+        raise Exception("errors while loading tests")
+    return support.run_unittest(tests)
+
+
+def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
+    # Run test_func(), collect statistics, and detect reference and memory
+    # leaks.
+    if runtests.hunt_refleak:
+        from test.libregrtest.refleak import runtest_refleak
+        refleak, test_result = runtest_refleak(result.test_name, test_func,
+                                               runtests.hunt_refleak,
+                                               runtests.quiet)
+    else:
+        test_result = test_func()
+        refleak = False
+
+    if refleak:
+        result.state = State.REFLEAK
+
+    match test_result:
+        case TestStats():
+            stats = test_result
+        case unittest.TestResult():
+            stats = TestStats.from_unittest(test_result)
+        case doctest.TestResults():
+            stats = TestStats.from_doctest(test_result)
+        case None:
+            print_warning(f"{result.test_name} test runner returned None: {test_func}")
+            stats = None
+        case _:
+            print_warning(f"Unknown test result type: {type(test_result)}")
+            stats = None
+
+    result.stats = stats
+
+
+def save_env(test_name: TestName, runtests: RunTests):
+    return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
+                                  pgo=runtests.pgo)
+
+
+# Storage of uncollectable GC objects (gc.garbage)
+GC_GARBAGE = []
+
+
+def _load_run_test(result: TestResult, runtests: RunTests) -> None:
+    # Load the test function, run the test function.
+    module_name = abs_module_name(result.test_name, runtests.test_dir)
+
+    # Remove the module from sys.module to reload it if it was already imported
+    sys.modules.pop(module_name, None)
+
+    test_mod = importlib.import_module(module_name)
+
+    if hasattr(test_mod, "test_main"):
+        # https://github.com/python/cpython/issues/89392
+        raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
+    def test_func():
+        return run_unittest(test_mod)
+
+    try:
+        with save_env(result.test_name, runtests):
+            regrtest_runner(result, test_func, runtests)
+    finally:
+        # First kill any dangling references to open files etc.
+        # This can also issue some ResourceWarnings which would otherwise get
+        # triggered during the following test run, and possibly produce
+        # failures.
+        support.gc_collect()
+
+        remove_testfn(result.test_name, runtests.verbose)
+
+    if gc.garbage:
+        support.environment_altered = True
+        print_warning(f"{result.test_name} created {len(gc.garbage)} "
+                      f"uncollectable object(s)")
+
+        # move the uncollectable objects somewhere,
+        # so we don't see them again
+        GC_GARBAGE.extend(gc.garbage)
+        gc.garbage.clear()
+
+    support.reap_children()
+
+
+def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
+                             display_failure: bool = True) -> None:
+    # Detect environment changes, handle exceptions.
+
+    # Reset the environment_altered flag to detect if a test altered
+    # the environment
+    support.environment_altered = False
+
+    pgo = runtests.pgo
+    if pgo:
+        display_failure = False
+    quiet = runtests.quiet
+
+    test_name = result.test_name
+    try:
+        clear_caches()
+        support.gc_collect()
+
+        with save_env(test_name, runtests):
+            _load_run_test(result, runtests)
+    except support.ResourceDenied as msg:
+        if not quiet and not pgo:
+            print(f"{test_name} skipped -- {msg}", flush=True)
+        result.state = State.RESOURCE_DENIED
+        return
+    except unittest.SkipTest as msg:
+        if not quiet and not pgo:
+            print(f"{test_name} skipped -- {msg}", flush=True)
+        result.state = State.SKIPPED
+        return
+    except support.TestFailedWithDetails as exc:
+        msg = f"test {test_name} failed"
+        if display_failure:
+            msg = f"{msg} -- {exc}"
+        print(msg, file=sys.stderr, flush=True)
+        result.state = State.FAILED
+        result.errors = exc.errors
+        result.failures = exc.failures
+        result.stats = exc.stats
+        return
+    except support.TestFailed as exc:
+        msg = f"test {test_name} failed"
+        if display_failure:
+            msg = f"{msg} -- {exc}"
+        print(msg, file=sys.stderr, flush=True)
+        result.state = State.FAILED
+        result.stats = exc.stats
+        return
+    except support.TestDidNotRun:
+        result.state = State.DID_NOT_RUN
+        return
+    except KeyboardInterrupt:
+        print()
+        result.state = State.INTERRUPTED
+        return
+    except:
+        if not pgo:
+            msg = traceback.format_exc()
+            print(f"test {test_name} crashed -- {msg}",
+                  file=sys.stderr, flush=True)
+        result.state = State.UNCAUGHT_EXC
+        return
+
+    if support.environment_altered:
+        result.set_env_changed()
+    # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
+    if result.state is None:
+        result.state = State.PASSED
+
+
+def _runtest(result: TestResult, runtests: RunTests) -> None:
+    # Capture stdout and stderr, set faulthandler timeout,
+    # and create JUnit XML report.
+    verbose = runtests.verbose
+    output_on_failure = runtests.output_on_failure
+    timeout = runtests.timeout
+
+    use_timeout = (
+        timeout is not None and threading_helper.can_start_thread
+    )
+    if use_timeout:
+        faulthandler.dump_traceback_later(timeout, exit=True)
+
+    try:
+        setup_support(runtests)
+
+        if output_on_failure:
+            support.verbose = True
+
+            stream = io.StringIO()
+            orig_stdout = sys.stdout
+            orig_stderr = sys.stderr
+            print_warning = support.print_warning
+            orig_print_warnings_stderr = print_warning.orig_stderr
+
+            output = None
+            try:
+                sys.stdout = stream
+                sys.stderr = stream
+                # print_warning() writes into the temporary stream to preserve
+                # messages order. If support.environment_altered becomes true,
+                # warnings will be written to sys.stderr below.
+                print_warning.orig_stderr = stream
+
+                _runtest_env_changed_exc(result, runtests, display_failure=False)
+                # Ignore output if the test passed successfully
+                if result.state != State.PASSED:
+                    output = stream.getvalue()
+            finally:
+                sys.stdout = orig_stdout
+                sys.stderr = orig_stderr
+                print_warning.orig_stderr = orig_print_warnings_stderr
+
+            if output is not None:
+                sys.stderr.write(output)
+                sys.stderr.flush()
+        else:
+            # Tell tests to be moderately quiet
+            support.verbose = verbose
+            _runtest_env_changed_exc(result, runtests,
+                                     display_failure=not verbose)
+
+        xml_list = support.junit_xml_list
+        if xml_list:
+            import xml.etree.ElementTree as ET
+            result.xml_data = [ET.tostring(x).decode('us-ascii')
+                               for x in xml_list]
+    finally:
+        if use_timeout:
+            faulthandler.cancel_dump_traceback_later()
+        support.junit_xml_list = None
+
+
+def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
+    """Run a single test.
+
+    test_name -- the name of the test
+
+    Returns a TestResult.
+
+    If runtests.use_junit, xml_data is a list containing each generated
+    testsuite element.
+    """
+    start_time = time.perf_counter()
+    result = TestResult(test_name)
+    pgo = runtests.pgo
+    try:
+        _runtest(result, runtests)
+    except:
+        if not pgo:
+            msg = traceback.format_exc()
+            print(f"test {test_name} crashed -- {msg}",
+                  file=sys.stderr, flush=True)
+        result.state = State.UNCAUGHT_EXC
+    result.duration = time.perf_counter() - start_time
+    return result
index e77772cc2577fee038e596fdfe48b1cca154a221..011d287e1674cd94e41045e298037f6883710521 100644 (file)
@@ -21,7 +21,16 @@ MS_WINDOWS = (sys.platform == 'win32')
 EXIT_TIMEOUT = 120.0
 
 
+# Types for types hints
 StrPath = str
+TestName = str
+StrJSON = str
+TestTuple = tuple[TestName, ...]
+TestList = list[TestName]
+# --match and --ignore options: list of patterns
+# ('*' joker character can be used)
+FilterTuple = tuple[TestName, ...]
+FilterDict = dict[TestName, FilterTuple]
 
 
 def format_duration(seconds):
@@ -389,3 +398,76 @@ def exit_timeout():
         if threading_helper.can_start_thread:
             faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
         sys.exit(exc.code)
+
+
+def remove_testfn(test_name: TestName, verbose: int) -> None:
+    # Try to clean up os_helper.TESTFN if left behind.
+    #
+    # While tests shouldn't leave any files or directories behind, when a test
+    # fails that can be tedious for it to arrange.  The consequences can be
+    # especially nasty on Windows, since if a test leaves a file open, it
+    # cannot be deleted by name (while there's nothing we can do about that
+    # here either, we can display the name of the offending test, which is a
+    # real help).
+    name = os_helper.TESTFN
+    if not os.path.exists(name):
+        return
+
+    if os.path.isdir(name):
+        import shutil
+        kind, nuker = "directory", shutil.rmtree
+    elif os.path.isfile(name):
+        kind, nuker = "file", os.unlink
+    else:
+        raise RuntimeError(f"os.path says {name!r} exists but is neither "
+                           f"directory nor file")
+
+    if verbose:
+        print_warning(f"{test_name} left behind {kind} {name!r}")
+        support.environment_altered = True
+
+    try:
+        import stat
+        # fix possible permissions problems that might prevent cleanup
+        os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+        nuker(name)
+    except Exception as exc:
+        print_warning(f"{test_name} left behind {kind} {name!r} "
+                      f"and it couldn't be removed: {exc}")
+
+
+def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
+    if test_name.startswith('test.') or test_dir:
+        return test_name
+    else:
+        # Import it from the test package
+        return 'test.' + test_name
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+    'setUpClass', 'tearDownClass',
+    'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+    short_name = test_full_name.split(" ")[0]
+    if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+        if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+            # if setUpModule() or tearDownModule() failed, don't filter
+            # tests with the test file name, don't use use filters.
+            return None
+
+        # This means that we have a failure in a life-cycle hook,
+        # we need to rerun the whole module or class suite.
+        # Basically the error looks like this:
+        #    ERROR: setUpClass (test.test_reg_ex.RegTest)
+        # or
+        #    ERROR: setUpModule (test.test_reg_ex)
+        # So, we need to parse the class / module name.
+        lpar = test_full_name.index('(')
+        rpar = test_full_name.index(')')
+        return test_full_name[lpar + 1: rpar].split('.')[-1]
+    return short_name
index 033a0a3ff6226044c59d5a2d716e1c55e74e204d..24251c35cdd2f1bde3cd97bbc3c662f2cb9a86e5 100644 (file)
@@ -7,9 +7,11 @@ from test import support
 from test.support import os_helper
 
 from test.libregrtest.setup import setup_tests, setup_test_dir
-from test.libregrtest.runtest import (
-    run_single_test, StrJSON, FilterTuple, RunTests)
-from test.libregrtest.utils import get_work_dir, exit_timeout, StrPath
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.single import run_single_test
+from test.libregrtest.utils import (
+    StrPath, StrJSON, FilterTuple,
+    get_work_dir, exit_timeout)
 
 
 USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
index a5ee4c2155536e079eca647c8b8e526db40ed03e..21babb5185548f5a65ba485e13b9a273951e6a18 100644 (file)
@@ -22,7 +22,7 @@ from test import libregrtest
 from test import support
 from test.support import os_helper, TestStats
 from test.libregrtest import utils, setup
-from test.libregrtest.runtest import normalize_test_name
+from test.libregrtest.utils import normalize_test_name
 
 if not support.has_subprocess_support:
     raise unittest.SkipTest("test module requires subprocess")