]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-117293: Fix race condition in run_workers.py (#117298)
authorSam Gross <colesbury@gmail.com>
Mon, 8 Apr 2024 14:47:42 +0000 (10:47 -0400)
committerGitHub <noreply@github.com>
Mon, 8 Apr 2024 14:47:42 +0000 (10:47 -0400)
The worker thread may still be alive after it enqueues it's last result,
which can lead to a delay of 30 seconds after the test finishes. This
happens much more frequently in the free-threaded build with the GIL
disabled.

This changes run_workers.py to track of live workers by enqueueing a
`WorkerExited()` instance before the worker exits.

Lib/test/libregrtest/run_workers.py

index 9cfe1b9d6fd07dcf025b534efc586b8c40f2fda2..235047cf2e563caa06492b2b74157575e56355aa 100644 (file)
@@ -79,8 +79,12 @@ class MultiprocessResult:
     err_msg: str | None = None
 
 
+class WorkerThreadExited:
+    """Indicates that a worker thread has exited"""
+
 ExcStr = str
 QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
+QueueContent = QueueOutput | WorkerThreadExited
 
 
 class ExitThread(Exception):
@@ -376,8 +380,8 @@ class WorkerThread(threading.Thread):
     def run(self) -> None:
         fail_fast = self.runtests.fail_fast
         fail_env_changed = self.runtests.fail_env_changed
-        while not self._stopped:
-            try:
+        try:
+            while not self._stopped:
                 try:
                     test_name = next(self.pending)
                 except StopIteration:
@@ -396,11 +400,12 @@ class WorkerThread(threading.Thread):
 
                 if mp_result.result.must_stop(fail_fast, fail_env_changed):
                     break
-            except ExitThread:
-                break
-            except BaseException:
-                self.output.put((True, traceback.format_exc()))
-                break
+        except ExitThread:
+            pass
+        except BaseException:
+            self.output.put((True, traceback.format_exc()))
+        finally:
+            self.output.put(WorkerThreadExited())
 
     def _wait_completed(self) -> None:
         popen = self._popen
@@ -458,8 +463,9 @@ class RunWorkers:
         self.log = logger.log
         self.display_progress = logger.display_progress
         self.results: TestResults = results
+        self.live_worker_count = 0
 
-        self.output: queue.Queue[QueueOutput] = queue.Queue()
+        self.output: queue.Queue[QueueContent] = queue.Queue()
         tests_iter = runtests.iter_tests()
         self.pending = MultiprocessIterator(tests_iter)
         self.timeout = runtests.timeout
@@ -497,6 +503,7 @@ class RunWorkers:
         self.log(msg)
         for worker in self.workers:
             worker.start()
+            self.live_worker_count += 1
 
     def stop_workers(self) -> None:
         start_time = time.monotonic()
@@ -511,14 +518,18 @@ class RunWorkers:
 
         # bpo-46205: check the status of workers every iteration to avoid
         # waiting forever on an empty queue.
-        while any(worker.is_alive() for worker in self.workers):
+        while self.live_worker_count > 0:
             if use_faulthandler:
                 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
                                                   exit=True)
 
             # wait for a thread
             try:
-                return self.output.get(timeout=PROGRESS_UPDATE)
+                result = self.output.get(timeout=PROGRESS_UPDATE)
+                if isinstance(result, WorkerThreadExited):
+                    self.live_worker_count -= 1
+                    continue
+                return result
             except queue.Empty:
                 pass
 
@@ -528,12 +539,6 @@ class RunWorkers:
                 if running:
                     self.log(running)
 
-        # all worker threads are done: consume pending results
-        try:
-            return self.output.get(timeout=0)
-        except queue.Empty:
-            return None
-
     def display_result(self, mp_result: MultiprocessResult) -> None:
         result = mp_result.result
         pgo = self.runtests.pgo