]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-109162: Refactor libregrtest.runtest_mp (#109205)
authorVictor Stinner <vstinner@python.org>
Sun, 10 Sep 2023 00:24:38 +0000 (02:24 +0200)
committerGitHub <noreply@github.com>
Sun, 10 Sep 2023 00:24:38 +0000 (00:24 +0000)
* Add attributes to Regrtest and RunTests:

  * fail_env_changed
  * num_workers

* Rename MultiprocessTestRunner to RunWorkers. Add num_workers
  parameters to RunWorkers constructor. Remove RunWorkers.ns
  attribute.
* Rename TestWorkerProcess to WorkerThread.
* get_running() now returns a string like: "running (...): ...".
* Regrtest.action_run_tests() now selects the number of worker
  processes, instead of the command line parser.

Lib/test/libregrtest/cmdline.py
Lib/test/libregrtest/main.py
Lib/test/libregrtest/runtest.py
Lib/test/libregrtest/runtest_mp.py

index 9afb13224975ee490e299d6ef15cef173f953d69..2835546fc713cf7c84724777a9ead218df2ba357 100644 (file)
@@ -1,5 +1,5 @@
 import argparse
-import os
+import os.path
 import shlex
 import sys
 from test.support import os_helper
@@ -410,10 +410,6 @@ def _parse_args(args, **kwargs):
     if ns.timeout is not None:
         if ns.timeout <= 0:
             ns.timeout = None
-    if ns.use_mp is not None:
-        if ns.use_mp <= 0:
-            # Use all cores + extras for tests that like to sleep
-            ns.use_mp = 2 + (os.cpu_count() or 1)
     if ns.use:
         for a in ns.use:
             for r in a:
index 4066d06c98600ecedf23a7e87bc55597d08f6148..1fa7b07a09d7019d860d63cd7099394d31451248 100644 (file)
@@ -83,8 +83,18 @@ class Regrtest:
         self.fromfile: str | None = ns.fromfile
         self.starting_test: str | None = ns.start
 
+        # Run tests
+        if ns.use_mp is None:
+            num_workers = 0  # run sequentially
+        elif ns.use_mp <= 0:
+            num_workers = -1  # use the number of CPUs
+        else:
+            num_workers = ns.use_mp
+        self.num_workers: int = num_workers
+
         # Options to run tests
         self.fail_fast: bool = ns.failfast
+        self.fail_env_changed: bool = ns.fail_env_changed
         self.forever: bool = ns.forever
         self.randomize: bool = ns.randomize
         self.random_seed: int | None = ns.random_seed
@@ -150,7 +160,6 @@ class Regrtest:
                 | set(self.run_no_tests))
 
     def accumulate_result(self, result, rerun=False):
-        fail_env_changed = self.ns.fail_env_changed
         test_name = result.test_name
 
         match result.state:
@@ -167,7 +176,7 @@ class Regrtest:
             case State.DID_NOT_RUN:
                 self.run_no_tests.append(test_name)
             case _:
-                if result.is_failed(fail_env_changed):
+                if result.is_failed(self.fail_env_changed):
                     self.bad.append(test_name)
                     self.need_rerun.append(result)
                 else:
@@ -339,9 +348,8 @@ class Regrtest:
 
     def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
         # Configure the runner to re-run tests
-        ns = self.ns
-        if ns.use_mp is None:
-            ns.use_mp = 1
+        if self.num_workers == 0:
+            self.num_workers = 1
 
         # Get tests to re-run
         tests = [result.test_name for result in need_rerun]
@@ -363,7 +371,7 @@ class Regrtest:
             match_tests_dict=match_tests_dict,
             output_on_failure=False)
         self.set_tests(runtests)
-        self._run_tests_mp(runtests)
+        self._run_tests_mp(runtests, self.num_workers)
         return runtests
 
     def rerun_failed_tests(self, need_rerun, runtests: RunTests):
@@ -471,7 +479,6 @@ class Regrtest:
     def run_tests_sequentially(self, runtests):
         ns = self.ns
         coverage = ns.trace
-        fail_env_changed = ns.fail_env_changed
 
         if coverage:
             import trace
@@ -503,7 +510,7 @@ class Regrtest:
                 if module not in save_modules and module.startswith("test."):
                     support.unload(module)
 
-            if result.must_stop(self.fail_fast, fail_env_changed):
+            if result.must_stop(self.fail_fast, self.fail_env_changed):
                 break
 
             previous_test = str(result)
@@ -564,12 +571,10 @@ class Regrtest:
                         self.environment_changed))
 
     def get_tests_state(self):
-        fail_env_changed = self.ns.fail_env_changed
-
         result = []
         if self.bad:
             result.append("FAILURE")
-        elif fail_env_changed and self.environment_changed:
+        elif self.fail_env_changed and self.environment_changed:
             result.append("ENV CHANGED")
         elif self.no_tests_run():
             result.append("NO TESTS RAN")
@@ -585,8 +590,9 @@ class Regrtest:
             result = '%s then %s' % (self.first_state, result)
         return result
 
-    def _run_tests_mp(self, runtests: RunTests) -> None:
-        from test.libregrtest.runtest_mp import run_tests_multiprocess
+    def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
+        from test.libregrtest.runtest_mp import RunWorkers
+
         # If we're on windows and this is the parent runner (not a worker),
         # track the load average.
         if sys.platform == 'win32':
@@ -600,7 +606,7 @@ class Regrtest:
                 print(f'Failed to create WindowsLoadTracker: {error}')
 
         try:
-            run_tests_multiprocess(self, runtests)
+            RunWorkers(self, runtests, num_workers).run()
         finally:
             if self.win_load_tracker is not None:
                 self.win_load_tracker.close()
@@ -618,8 +624,8 @@ class Regrtest:
     def run_tests(self, runtests: RunTests):
         self.first_runtests = runtests
         self.set_tests(runtests)
-        if self.ns.use_mp:
-            self._run_tests_mp(runtests)
+        if self.num_workers:
+            self._run_tests_mp(runtests, self.num_workers)
             tracer = None
         else:
             tracer = self.run_tests_sequentially(runtests)
@@ -843,7 +849,7 @@ class Regrtest:
             exitcode = EXITCODE_BAD_TEST
         elif self.interrupted:
             exitcode = EXITCODE_INTERRUPTED
-        elif self.ns.fail_env_changed and self.environment_changed:
+        elif self.fail_env_changed and self.environment_changed:
             exitcode = EXITCODE_ENV_CHANGED
         elif self.no_tests_run():
             exitcode = EXITCODE_NO_TESTS_RAN
@@ -866,6 +872,10 @@ class Regrtest:
         if self.randomize:
             print("Using random seed", self.random_seed)
 
+        if self.num_workers < 0:
+            # Use all cores + extras for tests that like to sleep
+            self.num_workers = 2 + (os.cpu_count() or 1)
+
         runtests = RunTests(
             tuple(self.selected),
             fail_fast=self.fail_fast,
index dc574eda8b99f5510201d94f246a4c34d4f2633c..667701778d9a793c4450aed37fdffa4d0eff21a0 100644 (file)
@@ -217,6 +217,7 @@ class TestResult:
 class RunTests:
     tests: TestTuple
     fail_fast: bool = False
+    fail_env_changed: bool = False
     match_tests: FilterTuple | None = None
     ignore_tests: FilterTuple | None = None
     match_tests_dict: FilterDict | None = None
index e4a9301656436b6f8d9f41232d3e71543a640824..28c05b5976e1591c40dcad8ba813dbc4c799cb07 100644 (file)
@@ -16,7 +16,6 @@ from test import support
 from test.support import os_helper
 from test.support import TestStats
 
-from test.libregrtest.cmdline import Namespace
 from test.libregrtest.main import Regrtest
 from test.libregrtest.runtest import (
     run_single_test, TestResult, State, PROGRESS_MIN_TIME,
@@ -150,14 +149,13 @@ class ExitThread(Exception):
     pass
 
 
-class TestWorkerProcess(threading.Thread):
-    def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
+class WorkerThread(threading.Thread):
+    def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
         super().__init__()
         self.worker_id = worker_id
         self.runtests = runner.runtests
         self.pending = runner.pending
         self.output = runner.output
-        self.ns = runner.ns
         self.timeout = runner.worker_timeout
         self.regrtest = runner.regrtest
         self.current_test_name = None
@@ -167,7 +165,7 @@ class TestWorkerProcess(threading.Thread):
         self._stopped = False
 
     def __repr__(self) -> str:
-        info = [f'TestWorkerProcess #{self.worker_id}']
+        info = [f'WorkerThread #{self.worker_id}']
         if self.is_alive():
             info.append("running")
         else:
@@ -203,7 +201,7 @@ class TestWorkerProcess(threading.Thread):
             else:
                 popen.kill()
         except ProcessLookupError:
-            # popen.kill(): the process completed, the TestWorkerProcess thread
+            # popen.kill(): the process completed, the WorkerThread thread
             # read its exit status, but Popen.send_signal() read the returncode
             # just before Popen.wait() set returncode.
             pass
@@ -362,7 +360,7 @@ class TestWorkerProcess(threading.Thread):
 
     def run(self) -> None:
         fail_fast = self.runtests.fail_fast
-        fail_env_changed = self.ns.fail_env_changed
+        fail_env_changed = self.runtests.fail_env_changed
         while not self._stopped:
             try:
                 try:
@@ -394,10 +392,10 @@ class TestWorkerProcess(threading.Thread):
                           f"{exc!r}")
 
     def wait_stopped(self, start_time: float) -> None:
-        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
+        # bpo-38207: RunWorkers.stop_workers() called self.stop()
         # which killed the process. Sometimes, killing the process from the
         # main thread does not interrupt popen.communicate() in
-        # TestWorkerProcess thread. This loop with a timeout is a workaround
+        # WorkerThread thread. This loop with a timeout is a workaround
         # for that.
         #
         # Moreover, if this method fails to join the thread, it is likely
@@ -417,7 +415,7 @@ class TestWorkerProcess(threading.Thread):
                 break
 
 
-def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
+def get_running(workers: list[WorkerThread]) -> list[str]:
     running = []
     for worker in workers:
         current_test_name = worker.current_test_name
@@ -427,18 +425,17 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
         if dt >= PROGRESS_MIN_TIME:
             text = '%s (%s)' % (current_test_name, format_duration(dt))
             running.append(text)
-    return running
+    if not running:
+        return None
+    return f"running ({len(running)}): {', '.join(running)}"
 
 
-class MultiprocessTestRunner:
-    def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
-        ns = regrtest.ns
-
+class RunWorkers:
+    def __init__(self, regrtest: Regrtest, runtests: RunTests, num_workers: int) -> None:
         self.regrtest = regrtest
+        self.log = regrtest.log
+        self.num_workers = num_workers
         self.runtests = runtests
-        self.rerun = runtests.rerun
-        self.log = self.regrtest.log
-        self.ns = ns
         self.output: queue.Queue[QueueOutput] = queue.Queue()
         tests_iter = runtests.iter_tests()
         self.pending = MultiprocessIterator(tests_iter)
@@ -453,9 +450,8 @@ class MultiprocessTestRunner:
         self.workers = None
 
     def start_workers(self) -> None:
-        use_mp = self.ns.use_mp
-        self.workers = [TestWorkerProcess(index, self)
-                        for index in range(1, use_mp + 1)]
+        self.workers = [WorkerThread(index, self)
+                        for index in range(1, self.num_workers + 1)]
         msg = f"Run tests in parallel using {len(self.workers)} child processes"
         if self.timeout:
             msg += (" (timeout: %s, worker timeout: %s)"
@@ -489,10 +485,11 @@ class MultiprocessTestRunner:
             except queue.Empty:
                 pass
 
-            # display progress
-            running = get_running(self.workers)
-            if running and not pgo:
-                self.log('running: %s' % ', '.join(running))
+            if not pgo:
+                # display progress
+                running = get_running(self.workers)
+                if running:
+                    self.log(running)
 
         # all worker threads are done: consume pending results
         try:
@@ -510,9 +507,10 @@ class MultiprocessTestRunner:
             text += ' (%s)' % mp_result.err_msg
         elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
             text += ' (%s)' % format_duration(result.duration)
-        running = get_running(self.workers)
-        if running and not pgo:
-            text += ' -- running: %s' % ', '.join(running)
+        if not pgo:
+            running = get_running(self.workers)
+            if running:
+                text += f' -- {running}'
         self.regrtest.display_progress(self.test_index, text)
 
     def _process_result(self, item: QueueOutput) -> bool:
@@ -537,9 +535,9 @@ class MultiprocessTestRunner:
 
         return result
 
-    def run_tests(self) -> None:
+    def run(self) -> None:
         fail_fast = self.runtests.fail_fast
-        fail_env_changed = self.ns.fail_env_changed
+        fail_env_changed = self.runtests.fail_env_changed
 
         self.start_workers()
 
@@ -566,10 +564,6 @@ class MultiprocessTestRunner:
             self.stop_workers()
 
 
-def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
-    MultiprocessTestRunner(regrtest, runtests).run_tests()
-
-
 class EncodeTestResult(json.JSONEncoder):
     """Encode a TestResult (sub)class object into a JSON dict."""