# Get tests to re-run
tests = [result.test_name for result in need_rerun]
- match_tests = self.get_rerun_match(need_rerun)
+ match_tests_dict = self.get_rerun_match(need_rerun)
# Clear previously failed tests
self.rerun_bad.extend(self.bad)
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = runtests.copy(tests=tuple(tests),
- match_tests=match_tests,
+ match_tests_dict=match_tests_dict,
rerun=True,
forever=False)
self.set_tests(runtests)
self.tmp_dir = os.path.abspath(self.tmp_dir)
def is_worker(self):
- return (self.ns.worker_args is not None)
+ return (self.ns.worker_json is not None)
def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
def _main(self):
if self.is_worker():
- from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns.worker_args)
+ from test.libregrtest.runtest_mp import worker_process
+ worker_process(self.ns.worker_json)
return
if self.want_wait:
@dataclasses.dataclass(slots=True)
class WorkerJob:
- test_name: str
+ runtests: RunTests
namespace: Namespace
- rerun: bool = False
match_tests: FilterTuple | None = None
def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
if "__worker_job__" in d:
d.pop('__worker_job__')
+ d['runtests'] = RunTests(**d['runtests'])
return WorkerJob(**d)
if "__namespace__" in d:
d.pop('__namespace__')
return d
-def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
- return json.loads(worker_json,
- object_hook=_decode_worker_job)
+def _parse_worker_json(worker_json: str) -> tuple[Namespace, str]:
+ return json.loads(worker_json, object_hook=_decode_worker_job)
-def run_test_in_subprocess(worker_job: WorkerJob,
- output_file: TextIO,
- tmp_dir: str | None = None) -> subprocess.Popen:
+def create_worker_process(worker_job: WorkerJob,
+ output_file: TextIO,
+ tmp_dir: str | None = None) -> subprocess.Popen:
ns = worker_job.namespace
python = ns.python
- worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
+ worker_json = json.dumps(worker_job, cls=_EncodeWorkerJob)
if python is not None:
executable = python
cmd = [*executable, *support.args_from_interpreter_flags(),
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
- '--worker-args', worker_args]
+ '--worker-json', worker_json]
env = dict(os.environ)
if tmp_dir is not None:
return subprocess.Popen(cmd, **kw)
-def run_tests_worker(worker_json: str) -> NoReturn:
- worker_job = _parse_worker_args(worker_json)
+def worker_process(worker_json: str) -> NoReturn:
+ worker_job = _parse_worker_json(worker_json)
+ runtests = worker_job.runtests
ns = worker_job.namespace
- test_name = worker_job.test_name
- rerun = worker_job.rerun
- match_tests = worker_job.match_tests
+ test_name = runtests.tests[0]
+ match_tests: FilterTuple | None = worker_job.match_tests
setup_tests(ns)
- if rerun:
+ if runtests.rerun:
if match_tests:
matching = "matching: " + ", ".join(match_tests)
print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
print(f"Re-running {test_name} in verbose mode", flush=True)
ns.verbose = True
- if match_tests is not None:
- ns.match_tests = match_tests
+ if match_tests is not None:
+ ns.match_tests = match_tests
result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
- print(json.dumps(result, cls=EncodeTestResult), flush=True)
+ json.dump(result, sys.stdout, cls=EncodeTestResult)
+ sys.stdout.flush()
sys.exit(0)
self.tests_iter = None
-class MultiprocessResult(NamedTuple):
+@dataclasses.dataclass(slots=True, frozen=True)
+class MultiprocessResult:
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
worker_stdout: str | None = None
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
- self.rerun = runner.rerun
self.current_test_name = None
self.start_time = None
self._popen = None
def _run_process(self, worker_job, output_file: TextIO,
tmp_dir: str | None = None) -> int:
- self.current_test_name = worker_job.test_name
try:
- popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
+ popen = create_worker_process(worker_job, output_file, tmp_dir)
self._killed = False
self._popen = popen
self.current_test_name = None
def _runtest(self, test_name: str) -> MultiprocessResult:
+ self.current_test_name = test_name
+
if sys.platform == 'win32':
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
# page for the sys.stdout encoding. If the main process runs in a
else:
encoding = sys.stdout.encoding
- match_tests = self.runtests.get_match_tests(test_name)
+ tests = (test_name,)
+ if self.runtests.rerun:
+ match_tests = self.runtests.get_match_tests(test_name)
+ else:
+ match_tests = None
+ worker_runtests = self.runtests.copy(tests=tests)
+ worker_job = WorkerJob(
+ worker_runtests,
+ namespace=self.ns,
+ match_tests=match_tests)
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
- worker_job = WorkerJob(test_name,
- namespace=self.ns,
- rerun=self.rerun,
- match_tests=match_tests)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.