self.junit_filename: StrPath | None = ns.xmlpath
self.memory_limit: str | None = ns.memlimit
self.gc_threshold: int | None = ns.threshold
- self.use_resources: list[str] = ns.use_resources
- self.python_cmd: list[str] | None = ns.python
+ self.use_resources: tuple[str] = tuple(ns.use_resources)
+ if ns.python:
+ self.python_cmd: tuple[str] = tuple(ns.python)
+ else:
+ self.python_cmd = None
self.coverage: bool = ns.trace
self.coverage_dir: StrPath | None = ns.coverdir
self.tmp_dir: StrPath | None = ns.tempdir
return RunTests(
tests,
fail_fast=self.fail_fast,
+ fail_env_changed=self.fail_env_changed,
match_tests=self.match_tests,
ignore_tests=self.ignore_tests,
+ match_tests_dict=None,
+ rerun=None,
forever=self.forever,
pgo=self.pgo,
pgo_extended=self.pgo_extended,
gc_threshold=self.gc_threshold,
use_resources=self.use_resources,
python_cmd=self.python_cmd,
+ randomize=self.randomize,
+ random_seed=self.random_seed,
+ json_fd=None,
)
def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
@dataclasses.dataclass(slots=True, frozen=True)
class RunTests:
tests: TestTuple
- fail_fast: bool = False
- fail_env_changed: bool = False
- match_tests: FilterTuple | None = None
- ignore_tests: FilterTuple | None = None
- match_tests_dict: FilterDict | None = None
- rerun: bool = False
- forever: bool = False
- pgo: bool = False
- pgo_extended: bool = False
- output_on_failure: bool = False
- timeout: float | None = None
- verbose: int = 0
- quiet: bool = False
- hunt_refleak: HuntRefleak | None = None
- test_dir: StrPath | None = None
- use_junit: bool = False
- memory_limit: str | None = None
- gc_threshold: int | None = None
- use_resources: list[str] = dataclasses.field(default_factory=list)
- python_cmd: list[str] | None = None
+ fail_fast: bool
+ fail_env_changed: bool
+ match_tests: FilterTuple | None
+ ignore_tests: FilterTuple | None
+ match_tests_dict: FilterDict | None
+ rerun: bool
+ forever: bool
+ pgo: bool
+ pgo_extended: bool
+ output_on_failure: bool
+ timeout: float | None
+ verbose: int
+ quiet: bool
+ hunt_refleak: HuntRefleak | None
+ test_dir: StrPath | None
+ use_junit: bool
+ memory_limit: str | None
+ gc_threshold: int | None
+ use_resources: tuple[str]
+ python_cmd: tuple[str] | None
+ randomize: bool
+ random_seed: int | None
# On Unix, it's a file descriptor.
# On Windows, it's a handle.
- json_fd: int | None = None
+ json_fd: int | None
def copy(self, **override):
state = dataclasses.asdict(self)
import locale
import os.path
import platform
+import random
import re
import subprocess
import sys
if rerun is not None:
regex = list_regex('%s re-run test%s', [rerun.name])
self.check_line(output, regex)
- regex = LOG_PREFIX + fr"Re-running 1 failed tests in verbose mode"
+ regex = LOG_PREFIX + r"Re-running 1 failed tests in verbose mode"
self.check_line(output, regex)
regex = fr"Re-running {rerun.name} in verbose mode"
if rerun.match:
forever=True)
@without_optimizer
- def check_leak(self, code, what, *, multiprocessing=False):
+ def check_leak(self, code, what, *, run_workers=False):
test = self.create_test('huntrleaks', code=code)
filename = 'reflog.txt'
self.addCleanup(os_helper.unlink, filename)
cmd = ['--huntrleaks', '3:3:']
- if multiprocessing:
+ if run_workers:
cmd.append('-j1')
cmd.append(test)
output = self.run_tests(*cmd,
self.assertIn(line2, reflog)
@unittest.skipUnless(support.Py_DEBUG, 'need a debug build')
- def check_huntrleaks(self, *, multiprocessing: bool):
+ def check_huntrleaks(self, *, run_workers: bool):
# test --huntrleaks
code = textwrap.dedent("""
import unittest
def test_leak(self):
GLOBAL_LIST.append(object())
""")
- self.check_leak(code, 'references', multiprocessing=multiprocessing)
+ self.check_leak(code, 'references', run_workers=run_workers)
def test_huntrleaks(self):
- self.check_huntrleaks(multiprocessing=False)
+ self.check_huntrleaks(run_workers=False)
def test_huntrleaks_mp(self):
- self.check_huntrleaks(multiprocessing=True)
+ self.check_huntrleaks(run_workers=True)
@unittest.skipUnless(support.Py_DEBUG, 'need a debug build')
def test_huntrleaks_fd_leak(self):
def test_method4(self):
pass
""")
- all_methods = ['test_method1', 'test_method2',
- 'test_method3', 'test_method4']
testname = self.create_test(code=code)
# only run a subset
if encoding is None:
encoding = sys.__stdout__.encoding
if encoding is None:
- self.skipTest(f"cannot get regrtest worker encoding")
+ self.skipTest("cannot get regrtest worker encoding")
nonascii = b"byte:\xa0\xa9\xff\n"
try:
stats=0)
def test_doctest(self):
- code = textwrap.dedent(fr'''
+ code = textwrap.dedent(r'''
import doctest
import sys
from test import support
randomize=True,
stats=TestStats(1, 1, 0))
+ def _check_random_seed(self, run_workers: bool):
+ # gh-109276: When -r/--randomize is used, random.seed() is called
+ # with the same random seed before running each test file.
+ code = textwrap.dedent(r'''
+ import random
+ import unittest
+
+ class RandomSeedTest(unittest.TestCase):
+ def test_randint(self):
+ numbers = [random.randint(0, 1000) for _ in range(10)]
+ print(f"Random numbers: {numbers}")
+ ''')
+ tests = [self.create_test(name=f'test_random{i}', code=code)
+ for i in range(1, 3+1)]
+
+ random_seed = 856_656_202
+ cmd = ["--randomize", f"--randseed={random_seed}"]
+ if run_workers:
+ # run as many worker processes than the number of tests
+ cmd.append(f'-j{len(tests)}')
+ cmd.extend(tests)
+ output = self.run_tests(*cmd)
+
+ random.seed(random_seed)
+ # Make the assumption that nothing consume entropy between libregrest
+ # setup_tests() which calls random.seed() and RandomSeedTest calling
+ # random.randint().
+ numbers = [random.randint(0, 1000) for _ in range(10)]
+ expected = f"Random numbers: {numbers}"
+
+ regex = r'^Random numbers: .*$'
+ matches = re.findall(regex, output, flags=re.MULTILINE)
+ self.assertEqual(matches, [expected] * len(tests))
+
+ def test_random_seed(self):
+ self._check_random_seed(run_workers=False)
+
+ def test_random_seed_workers(self):
+ self._check_random_seed(run_workers=True)
+
class TestUtils(unittest.TestCase):
def test_format_duration(self):