Decorator for invoking :func:`check_impl_detail` on *guards*. If that
returns ``False``, then uses *msg* as the reason for skipping the test.
+.. decorator:: thread_unsafe(reason=None)
+
+ Decorator for marking tests as thread-unsafe. This test always runs in one
+ thread even when invoked with ``--parallel-threads``.
+
.. decorator:: no_tracing
self.print_slow = False
self.random_seed = None
self.use_mp = None
+ self.parallel_threads = None
self.forever = False
self.header = False
self.failfast = False
'a single process, ignore -jN option, '
'and failed tests are also rerun sequentially '
'in the same process')
+ group.add_argument('--parallel-threads', metavar='PARALLEL_THREADS',
+ type=int,
+ help='run copies of each test in PARALLEL_THREADS at '
+ 'once')
group.add_argument('-T', '--coverage', action='store_true',
dest='trace',
help='turn on code coverage tracing using the trace '
else:
self.random_seed = ns.random_seed
+ self.parallel_threads = ns.parallel_threads
+
# tests
self.first_runtests: RunTests | None = None
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
+ parallel_threads=self.parallel_threads,
)
def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
--- /dev/null
+"""Run a test case multiple times in parallel threads."""
+
+import copy
+import functools
+import threading
+import unittest
+
+from unittest import TestCase
+
+
+class ParallelTestCase(TestCase):
+ def __init__(self, test_case: TestCase, num_threads: int):
+ self.test_case = test_case
+ self.num_threads = num_threads
+ self._testMethodName = test_case._testMethodName
+ self._testMethodDoc = test_case._testMethodDoc
+
+ def __str__(self):
+ return f"{str(self.test_case)} [threads={self.num_threads}]"
+
+ def run_worker(self, test_case: TestCase, result: unittest.TestResult,
+ barrier: threading.Barrier):
+ barrier.wait()
+ test_case.run(result)
+
+ def run(self, result=None):
+ if result is None:
+ result = test_case.defaultTestResult()
+ startTestRun = getattr(result, 'startTestRun', None)
+ stopTestRun = getattr(result, 'stopTestRun', None)
+ if startTestRun is not None:
+ startTestRun()
+ else:
+ stopTestRun = None
+
+ # Called at the beginning of each test. See TestCase.run.
+ result.startTest(self)
+
+ cases = [copy.copy(self.test_case) for _ in range(self.num_threads)]
+ results = [unittest.TestResult() for _ in range(self.num_threads)]
+
+ barrier = threading.Barrier(self.num_threads)
+ threads = []
+ for i, (case, r) in enumerate(zip(cases, results)):
+ thread = threading.Thread(target=self.run_worker,
+ args=(case, r, barrier),
+ name=f"{str(self.test_case)}-{i}",
+ daemon=True)
+ threads.append(thread)
+
+ for thread in threads:
+ thread.start()
+
+ for threads in threads:
+ threads.join()
+
+ # Aggregate test results
+ if all(r.wasSuccessful() for r in results):
+ result.addSuccess(self)
+
+ # Note: We can't call result.addError, result.addFailure, etc. because
+ # we no longer have the original exception, just the string format.
+ for r in results:
+ if len(r.errors) > 0 or len(r.failures) > 0:
+ result._mirrorOutput = True
+ result.errors.extend(r.errors)
+ result.failures.extend(r.failures)
+ result.skipped.extend(r.skipped)
+ result.expectedFailures.extend(r.expectedFailures)
+ result.unexpectedSuccesses.extend(r.unexpectedSuccesses)
+ result.collectedDurations.extend(r.collectedDurations)
+
+ if any(r.shouldStop for r in results):
+ result.stop()
+
+ # Test has finished running
+ result.stopTest(self)
+ if stopTestRun is not None:
+ stopTestRun()
python_cmd: tuple[str, ...] | None
randomize: bool
random_seed: int | str
+ parallel_threads: int | None
def copy(self, **override) -> 'RunTests':
state = dataclasses.asdict(self)
args.extend(("--python", cmd))
if self.randomize:
args.append(f"--randomize")
+ if self.parallel_threads:
+ args.append(f"--parallel-threads={self.parallel_threads}")
args.append(f"--randseed={self.random_seed}")
return args
from .save_env import saved_test_environment
from .setup import setup_tests
from .testresult import get_test_runner
+from .parallel_case import ParallelTestCase
from .utils import (
TestName,
clear_caches, remove_testfn, abs_module_name, print_warning)
PROGRESS_MIN_TIME = 30.0 # seconds
-def run_unittest(test_mod):
+def run_unittest(test_mod, runtests: RunTests):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(test_mod)
+
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
_filter_suite(tests, match_test)
+ if runtests.parallel_threads:
+ _parallelize_tests(tests, runtests.parallel_threads)
return _run_suite(tests)
def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests
+def _parallelize_tests(suite, parallel_threads: int):
+ def is_thread_unsafe(test):
+ test_method = getattr(test, test._testMethodName)
+ instance = test_method.__self__
+ return (getattr(test_method, "__unittest_thread_unsafe__", False) or
+ getattr(instance, "__unittest_thread_unsafe__", False))
+
+ newtests: list[object] = []
+ for test in suite._tests:
+ if isinstance(test, unittest.TestSuite):
+ _parallelize_tests(test, parallel_threads)
+ newtests.append(test)
+ continue
+
+ if is_thread_unsafe(test):
+ # Don't parallelize thread-unsafe tests
+ newtests.append(test)
+ continue
+
+ newtests.append(ParallelTestCase(test, parallel_threads))
+ suite._tests = newtests
+
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
raise Exception(f"Module {test_name} defines test_main() which "
f"is no longer supported by regrtest")
def test_func():
- return run_unittest(test_mod)
+ return run_unittest(test_mod, runtests)
try:
regrtest_runner(result, test_func, runtests)
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
- "requires_limited_api", "requires_specialization",
+ "requires_limited_api", "requires_specialization", "thread_unsafe",
# sys
"MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi",
"is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval",
return decorator
+def thread_unsafe(reason):
+ """Mark a test as not thread safe. When the test runner is run with
+ --parallel-threads=N, the test will be run in a single thread."""
+ def decorator(test_item):
+ test_item.__unittest_thread_unsafe__ = True
+ # the reason is not currently used
+ test_item.__unittest_thread_unsafe__why__ = reason
+ return test_item
+ if isinstance(reason, types.FunctionType):
+ test_item = reason
+ reason = ''
+ return decorator(test_item)
+ return decorator
+
+
def skip_if_buildbot(reason=None):
"""Decorator raising SkipTest if running on a buildbot."""
import getpass
"Test the functionality of Python classes implementing operators."
import unittest
+from test import support
from test.support import cpython_only, import_helper, script_helper, skip_emscripten_stack_overflow
testmeths = [
AllTests = type("AllTests", (object,), d)
del d, statictests, method, method_template
+@support.thread_unsafe("callLst is shared between threads")
class ClassTests(unittest.TestCase):
def setUp(self):
callLst[:] = []
with self.assertRaises(TypeError):
frozenset().__class__ = MyFrozenSet
+ @support.thread_unsafe
def test_slots(self):
# Testing __slots__...
class C0(object):
{pickle.dumps, pickle._dumps},
{pickle.loads, pickle._loads}))
+ @support.thread_unsafe
def test_pickle_slots(self):
# Tests pickling of classes with __slots__.
y = pickle_copier.copy(x)
self._assert_is_copy(x, y)
+ @support.thread_unsafe
def test_reduce_copying(self):
# Tests pickling and copying new-style classes and objects.
global C1
module = c_operator
+@support.thread_unsafe("swaps global operator module")
class OperatorPickleTestCase:
def copy(self, obj, proto):
with support.swap_item(sys.modules, 'operator', self.module):
self.assertEqual(encoding, 'utf-8')
self.assertEqual(consumed_lines, [b'print("#coding=fake")'])
+ @support.thread_unsafe
def test_open(self):
filename = os_helper.TESTFN + '.py'
self.addCleanup(os_helper.unlink, filename)
--- /dev/null
+Add an option ``--parallel-threads=N`` to the regression test runner that
+runs individual tests in multiple threads in parallel in order to find
+concurrency bugs. Note that most of the test suite is not yet reviewed for
+thread-safety or annotated with ``@thread_unsafe`` when necessary.