self.tempdir = None
self._add_python_opts = True
self.xmlpath = None
+ self.single_process = False
super().__init__(**kwargs)
group.add_argument('-j', '--multiprocess', metavar='PROCESSES',
dest='use_mp', type=int,
help='run PROCESSES processes at once')
+ group.add_argument('--single-process', action='store_true',
+ dest='single_process',
+ help='always run all tests sequentially in '
+ 'a single process, ignore -jN option, '
+ 'and failed tests are also rerun sequentially '
+ 'in the same process')
group.add_argument('-T', '--coverage', action='store_true',
dest='trace',
help='turn on code coverage tracing using the trace '
else:
ns._add_python_opts = False
+ # --singleprocess overrides -jN option
+ if ns.single_process:
+ ns.use_mp = None
+
# When both --slow-ci and --fast-ci options are present,
# --slow-ci has the priority
if ns.slow_ci:
self.cmdline_args: TestList = ns.args
# Workers
- if ns.use_mp is None:
- num_workers = 0 # run sequentially
+ self.single_process: bool = ns.single_process
+ if self.single_process or ns.use_mp is None:
+ num_workers = 0 # run sequentially in a single process
elif ns.use_mp <= 0:
- num_workers = -1 # use the number of CPUs
+ num_workers = -1 # run in parallel, use the number of CPUs
else:
- num_workers = ns.use_mp
+ num_workers = ns.use_mp # run in parallel
self.num_workers: int = num_workers
self.worker_json: StrJSON | None = ns.worker_json
def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests
- if self.num_workers == 0:
+ if self.num_workers == 0 and not self.single_process:
# Always run tests in fresh processes to have more deterministic
# initial state. Don't re-run tests in parallel but limit to a
# single worker process to have side effects (on the system load
tests, match_tests_dict = self.results.prepare_rerun()
# Re-run failed tests
- self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = runtests.copy(
tests=tests,
rerun=True,
match_tests_dict=match_tests_dict,
output_on_failure=False)
self.logger.set_tests(runtests)
- self._run_tests_mp(runtests, self.num_workers)
+
+ msg = f"Re-running {len(tests)} failed tests in verbose mode"
+ if not self.single_process:
+ msg = f"{msg} in subprocesses"
+ self.log(msg)
+ self._run_tests_mp(runtests, self.num_workers)
+ else:
+ self.log(msg)
+ self.run_tests_sequentially(runtests)
return runtests
def rerun_failed_tests(self, runtests: RunTests):
tests = count(jobs, 'test')
else:
tests = 'tests'
- msg = f"Run {tests} sequentially"
+ msg = f"Run {tests} sequentially in a single process"
if runtests.timeout:
msg += " (timeout: %s)" % format_duration(runtests.timeout)
self.log(msg)
keep_environ = True
if cross_compile and hostrunner:
- if self.num_workers == 0:
+ if self.num_workers == 0 and not self.single_process:
# For now use only two cores for cross-compiled builds;
# hostrunner can be expensive.
regrtest_opts.extend(['-j', '2'])
self.assertEqual(regrtest.hunt_refleak.runs, 10)
self.assertFalse(regrtest.output_on_failure)
+ def test_single_process(self):
+ args = ['-j2', '--single-process']
+ with support.captured_stderr():
+ regrtest = self.create_regrtest(args)
+ self.assertEqual(regrtest.num_workers, 0)
+ self.assertTrue(regrtest.single_process)
+
+ args = ['--fast-ci', '--single-process']
+ with support.captured_stderr():
+ regrtest = self.create_regrtest(args)
+ self.assertEqual(regrtest.num_workers, 0)
+ self.assertTrue(regrtest.single_process)
+
@dataclasses.dataclass(slots=True)
class Rerun: