]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-127933: Add option to run regression tests in parallel (gh-128003)
authorSam Gross <colesbury@gmail.com>
Tue, 4 Feb 2025 22:44:59 +0000 (17:44 -0500)
committerGitHub <noreply@github.com>
Tue, 4 Feb 2025 22:44:59 +0000 (17:44 -0500)
This adds a new command line argument, `--parallel-threads` to the
regression test runner to allow it to run individual tests in multiple
threads in parallel in order to find multithreading bugs.

Some tests pass when run with `--parallel-threads`, but there's still
more work before the entire suite passes.

12 files changed:
Doc/library/test.rst
Lib/test/libregrtest/cmdline.py
Lib/test/libregrtest/main.py
Lib/test/libregrtest/parallel_case.py [new file with mode: 0644]
Lib/test/libregrtest/runtests.py
Lib/test/libregrtest/single.py
Lib/test/support/__init__.py
Lib/test/test_class.py
Lib/test/test_descr.py
Lib/test/test_operator.py
Lib/test/test_tokenize.py
Misc/NEWS.d/next/Tests/2024-12-16-19-15-10.gh-issue-128003.GVBrfa.rst [new file with mode: 0644]

index b5b6e442e218fda6d93d3f2ba38e26eccbbed018..def22f8bb8ab2d95aa67725727a4a860ae188de7 100644 (file)
@@ -792,6 +792,11 @@ The :mod:`test.support` module defines the following functions:
    Decorator for invoking :func:`check_impl_detail` on *guards*.  If that
    returns ``False``, then uses *msg* as the reason for skipping the test.
 
+.. decorator:: thread_unsafe(reason=None)
+
+   Decorator for marking tests as thread-unsafe.  This test always runs in one
+   thread even when invoked with ``--parallel-threads``.
+
 
 .. decorator:: no_tracing
 
index bf9a71efbdbff9a69326b6397f4d27eed6eddef6..1f3b2381c71d4554f8436480446f31e0bf096dbd 100644 (file)
@@ -160,6 +160,7 @@ class Namespace(argparse.Namespace):
         self.print_slow = False
         self.random_seed = None
         self.use_mp = None
+        self.parallel_threads = None
         self.forever = False
         self.header = False
         self.failfast = False
@@ -316,6 +317,10 @@ def _create_parser():
                             'a single process, ignore -jN option, '
                             'and failed tests are also rerun sequentially '
                             'in the same process')
+    group.add_argument('--parallel-threads', metavar='PARALLEL_THREADS',
+                       type=int,
+                       help='run copies of each test in PARALLEL_THREADS at '
+                            'once')
     group.add_argument('-T', '--coverage', action='store_true',
                        dest='trace',
                        help='turn on code coverage tracing using the trace '
index dcbcc6790c68d8a993bc0427d1c62ce296be4d15..de377f185f7ed9cbceb34cd1d9f98dd4bea3e39f 100644 (file)
@@ -142,6 +142,8 @@ class Regrtest:
         else:
             self.random_seed = ns.random_seed
 
+        self.parallel_threads = ns.parallel_threads
+
         # tests
         self.first_runtests: RunTests | None = None
 
@@ -506,6 +508,7 @@ class Regrtest:
             python_cmd=self.python_cmd,
             randomize=self.randomize,
             random_seed=self.random_seed,
+            parallel_threads=self.parallel_threads,
         )
 
     def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
diff --git a/Lib/test/libregrtest/parallel_case.py b/Lib/test/libregrtest/parallel_case.py
new file mode 100644 (file)
index 0000000..09d9d28
--- /dev/null
@@ -0,0 +1,79 @@
+"""Run a test case multiple times in parallel threads."""
+
+import copy
+import functools
+import threading
+import unittest
+
+from unittest import TestCase
+
+
+class ParallelTestCase(TestCase):
+    def __init__(self, test_case: TestCase, num_threads: int):
+        self.test_case = test_case
+        self.num_threads = num_threads
+        self._testMethodName = test_case._testMethodName
+        self._testMethodDoc = test_case._testMethodDoc
+
+    def __str__(self):
+        return f"{str(self.test_case)} [threads={self.num_threads}]"
+
+    def run_worker(self, test_case: TestCase, result: unittest.TestResult,
+                   barrier: threading.Barrier):
+        barrier.wait()
+        test_case.run(result)
+
+    def run(self, result=None):
+        if result is None:
+            result = test_case.defaultTestResult()
+            startTestRun = getattr(result, 'startTestRun', None)
+            stopTestRun = getattr(result, 'stopTestRun', None)
+            if startTestRun is not None:
+                startTestRun()
+        else:
+            stopTestRun = None
+
+        # Called at the beginning of each test. See TestCase.run.
+        result.startTest(self)
+
+        cases = [copy.copy(self.test_case) for _ in range(self.num_threads)]
+        results = [unittest.TestResult() for _ in range(self.num_threads)]
+
+        barrier = threading.Barrier(self.num_threads)
+        threads = []
+        for i, (case, r) in enumerate(zip(cases, results)):
+            thread = threading.Thread(target=self.run_worker,
+                                      args=(case, r, barrier),
+                                      name=f"{str(self.test_case)}-{i}",
+                                      daemon=True)
+            threads.append(thread)
+
+        for thread in threads:
+            thread.start()
+
+        for threads in threads:
+            threads.join()
+
+        # Aggregate test results
+        if all(r.wasSuccessful() for r in results):
+            result.addSuccess(self)
+
+        # Note: We can't call result.addError, result.addFailure, etc. because
+        # we no longer have the original exception, just the string format.
+        for r in results:
+            if len(r.errors) > 0 or len(r.failures) > 0:
+                result._mirrorOutput = True
+            result.errors.extend(r.errors)
+            result.failures.extend(r.failures)
+            result.skipped.extend(r.skipped)
+            result.expectedFailures.extend(r.expectedFailures)
+            result.unexpectedSuccesses.extend(r.unexpectedSuccesses)
+            result.collectedDurations.extend(r.collectedDurations)
+
+        if any(r.shouldStop for r in results):
+            result.stop()
+
+        # Test has finished running
+        result.stopTest(self)
+        if stopTestRun is not None:
+            stopTestRun()
index 130c036a62eefb089ca06dac38440ee365fb0cc8..759f24fc25e38ca19d2c29151d28bc8c38a2b147 100644 (file)
@@ -100,6 +100,7 @@ class RunTests:
     python_cmd: tuple[str, ...] | None
     randomize: bool
     random_seed: int | str
+    parallel_threads: int | None
 
     def copy(self, **override) -> 'RunTests':
         state = dataclasses.asdict(self)
@@ -184,6 +185,8 @@ class RunTests:
             args.extend(("--python", cmd))
         if self.randomize:
             args.append(f"--randomize")
+        if self.parallel_threads:
+            args.append(f"--parallel-threads={self.parallel_threads}")
         args.append(f"--randseed={self.random_seed}")
         return args
 
index 54df688bbc470e753f9f0cd5cdc1967cab725de8..57d7b649d2ef630a77b5db869d9d07eda4e23365 100644 (file)
@@ -17,6 +17,7 @@ from .runtests import RunTests
 from .save_env import saved_test_environment
 from .setup import setup_tests
 from .testresult import get_test_runner
+from .parallel_case import ParallelTestCase
 from .utils import (
     TestName,
     clear_caches, remove_testfn, abs_module_name, print_warning)
@@ -27,14 +28,17 @@ from .utils import (
 PROGRESS_MIN_TIME = 30.0   # seconds
 
 
-def run_unittest(test_mod):
+def run_unittest(test_mod, runtests: RunTests):
     loader = unittest.TestLoader()
     tests = loader.loadTestsFromModule(test_mod)
+
     for error in loader.errors:
         print(error, file=sys.stderr)
     if loader.errors:
         raise Exception("errors while loading tests")
     _filter_suite(tests, match_test)
+    if runtests.parallel_threads:
+        _parallelize_tests(tests, runtests.parallel_threads)
     return _run_suite(tests)
 
 def _filter_suite(suite, pred):
@@ -49,6 +53,28 @@ def _filter_suite(suite, pred):
                 newtests.append(test)
     suite._tests = newtests
 
+def _parallelize_tests(suite, parallel_threads: int):
+    def is_thread_unsafe(test):
+        test_method = getattr(test, test._testMethodName)
+        instance = test_method.__self__
+        return (getattr(test_method, "__unittest_thread_unsafe__", False) or
+                getattr(instance, "__unittest_thread_unsafe__", False))
+
+    newtests: list[object] = []
+    for test in suite._tests:
+        if isinstance(test, unittest.TestSuite):
+            _parallelize_tests(test, parallel_threads)
+            newtests.append(test)
+            continue
+
+        if is_thread_unsafe(test):
+            # Don't parallelize thread-unsafe tests
+            newtests.append(test)
+            continue
+
+        newtests.append(ParallelTestCase(test, parallel_threads))
+    suite._tests = newtests
+
 def _run_suite(suite):
     """Run tests from a unittest.TestSuite-derived class."""
     runner = get_test_runner(sys.stdout,
@@ -133,7 +159,7 @@ def _load_run_test(result: TestResult, runtests: RunTests) -> None:
         raise Exception(f"Module {test_name} defines test_main() which "
                         f"is no longer supported by regrtest")
     def test_func():
-        return run_unittest(test_mod)
+        return run_unittest(test_mod, runtests)
 
     try:
         regrtest_runner(result, test_func, runtests)
index 230bb240c89f7730c9aeb88ec426d0b84e28a067..f31d98bf731d679508d195c57b8082d0a57dee3b 100644 (file)
@@ -40,7 +40,7 @@ __all__ = [
     "anticipate_failure", "load_package_tests", "detect_api_mismatch",
     "check__all__", "skip_if_buggy_ucrt_strfptime",
     "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
-    "requires_limited_api", "requires_specialization",
+    "requires_limited_api", "requires_specialization", "thread_unsafe",
     # sys
     "MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi",
     "is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval",
@@ -382,6 +382,21 @@ def requires_mac_ver(*min_version):
     return decorator
 
 
+def thread_unsafe(reason):
+    """Mark a test as not thread safe. When the test runner is run with
+    --parallel-threads=N, the test will be run in a single thread."""
+    def decorator(test_item):
+        test_item.__unittest_thread_unsafe__ = True
+        # the reason is not currently used
+        test_item.__unittest_thread_unsafe__why__ = reason
+        return test_item
+    if isinstance(reason, types.FunctionType):
+        test_item = reason
+        reason = ''
+        return decorator(test_item)
+    return decorator
+
+
 def skip_if_buildbot(reason=None):
     """Decorator raising SkipTest if running on a buildbot."""
     import getpass
index e20e59944e9ce967c5044876d5424c01d49df0e2..017aca3c82850f47d400abecd0a5c01e28e84a05 100644 (file)
@@ -1,6 +1,7 @@
 "Test the functionality of Python classes implementing operators."
 
 import unittest
+from test import support
 from test.support import cpython_only, import_helper, script_helper, skip_emscripten_stack_overflow
 
 testmeths = [
@@ -134,6 +135,7 @@ for method in testmeths:
 AllTests = type("AllTests", (object,), d)
 del d, statictests, method, method_template
 
+@support.thread_unsafe("callLst is shared between threads")
 class ClassTests(unittest.TestCase):
     def setUp(self):
         callLst[:] = []
index a7ebc9e8be0294f2877542ad4ac1a9dc4cc285b9..f2f3d9469f8bab979c2a71f959535d99c82d07e5 100644 (file)
@@ -1103,6 +1103,7 @@ class ClassPropertiesAndMethods(unittest.TestCase):
         with self.assertRaises(TypeError):
             frozenset().__class__ = MyFrozenSet
 
+    @support.thread_unsafe
     def test_slots(self):
         # Testing __slots__...
         class C0(object):
@@ -5485,6 +5486,7 @@ class PicklingTests(unittest.TestCase):
                                      {pickle.dumps, pickle._dumps},
                                      {pickle.loads, pickle._loads}))
 
+    @support.thread_unsafe
     def test_pickle_slots(self):
         # Tests pickling of classes with __slots__.
 
@@ -5552,6 +5554,7 @@ class PicklingTests(unittest.TestCase):
                 y = pickle_copier.copy(x)
                 self._assert_is_copy(x, y)
 
+    @support.thread_unsafe
     def test_reduce_copying(self):
         # Tests pickling and copying new-style classes and objects.
         global C1
index 82578a0ef1e6f2fd2c69fdffb6038449fc920d16..1757824580e41669b6a5f19bd4cac26bd77d1107 100644 (file)
@@ -666,6 +666,7 @@ class COperatorTestCase(OperatorTestCase, unittest.TestCase):
     module = c_operator
 
 
+@support.thread_unsafe("swaps global operator module")
 class OperatorPickleTestCase:
     def copy(self, obj, proto):
         with support.swap_item(sys.modules, 'operator', self.module):
index 480bff743a9f8a32fa67ecdf004a66b2ea2b7b31..52d3341975088b1778ea5ddde10bcf0374213c37 100644 (file)
@@ -1538,6 +1538,7 @@ class TestDetectEncoding(TestCase):
         self.assertEqual(encoding, 'utf-8')
         self.assertEqual(consumed_lines, [b'print("#coding=fake")'])
 
+    @support.thread_unsafe
     def test_open(self):
         filename = os_helper.TESTFN + '.py'
         self.addCleanup(os_helper.unlink, filename)
diff --git a/Misc/NEWS.d/next/Tests/2024-12-16-19-15-10.gh-issue-128003.GVBrfa.rst b/Misc/NEWS.d/next/Tests/2024-12-16-19-15-10.gh-issue-128003.GVBrfa.rst
new file mode 100644 (file)
index 0000000..05711c7
--- /dev/null
@@ -0,0 +1,4 @@
+Add an option ``--parallel-threads=N`` to the regression test runner that
+runs individual tests in multiple threads in parallel in order to find
+concurrency bugs.  Note that most of the test suite is not yet reviewed for
+thread-safety or annotated with ``@thread_unsafe`` when necessary.