]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally ...
authorAjay Kamdar <140011370+ogbiggles@users.noreply.github.com>
Tue, 10 Jun 2025 11:28:31 +0000 (07:28 -0400)
committerGitHub <noreply@github.com>
Tue, 10 Jun 2025 11:28:31 +0000 (13:28 +0200)
When shutdown is called with wait=False, the executor thread keeps running
even after the ProcessPoolExecutor's state is reset. The executor then tries
to replenish the worker processes pool resulting in an error and a potential hang
when it comes across a worker that has died. Fixed the issue by having
_adjust_process_count() return without doing anything if the ProcessPoolExecutor's
state has been reset.

Added unit tests to validate two scenarios:
max_workers < num_tasks (exception)
max_workers > num_tasks (exception + hang)

Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures/test_shutdown.py
Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst [new file with mode: 0644]

index 76b7b2abe836d8265481d8258a84afb9cffc622d..a14650bf5fa47cd65037b078a4e6572ace4480c5 100644 (file)
@@ -755,6 +755,11 @@ class ProcessPoolExecutor(_base.Executor):
                 self._executor_manager_thread_wakeup
 
     def _adjust_process_count(self):
+        # gh-132969: avoid error when state is reset and executor is still running,
+        # which will happen when shutdown(wait=False) is called.
+        if self._processes is None:
+            return
+
         # if there's an idle process, we don't need to spawn a new one.
         if self._idle_worker_semaphore.acquire(blocking=False):
             return
index 7a4065afd46fc840fa62d4450258e0a0c6ee05ce..99b315b47e253004aca382590aac426b29e9927f 100644 (file)
@@ -330,6 +330,64 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
         # shutdown.
         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 
+    @classmethod
+    def _failing_task_gh_132969(cls, n):
+        raise ValueError("failing task")
+
+    @classmethod
+    def _good_task_gh_132969(cls, n):
+        time.sleep(0.1 * n)
+        return n
+
+    def _run_test_issue_gh_132969(self, max_workers):
+        # max_workers=2 will repro exception
+        # max_workers=4 will repro exception and then hang
+
+        # Repro conditions
+        #   max_tasks_per_child=1
+        #   a task ends abnormally
+        #   shutdown(wait=False) is called
+        start_method = self.get_context().get_start_method()
+        if (start_method == "fork" or
+           (start_method == "forkserver" and sys.platform.startswith("win"))):
+                self.skipTest(f"Skipping test for {start_method = }")
+        executor = futures.ProcessPoolExecutor(
+                max_workers=max_workers,
+                max_tasks_per_child=1,
+                mp_context=self.get_context())
+        f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
+        f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2)
+        f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
+        result = 0
+        try:
+            result += f1.result()
+            result += f2.result()
+            result += f3.result()
+        except ValueError:
+            # stop processing results upon first exception
+            pass
+
+        # Ensure that the executor cleans up after called
+        # shutdown with wait=False
+        executor_manager_thread = executor._executor_manager_thread
+        executor.shutdown(wait=False)
+        time.sleep(0.2)
+        executor_manager_thread.join()
+        return result
+
+    def test_shutdown_gh_132969_case_1(self):
+        # gh-132969: test that exception "object of type 'NoneType' has no len()"
+        # is not raised when shutdown(wait=False) is called.
+        result = self._run_test_issue_gh_132969(2)
+        self.assertEqual(result, 1)
+
+    def test_shutdown_gh_132969_case_2(self):
+        # gh-132969: test that process does not hang and
+        # exception "object of type 'NoneType' has no len()" is not raised
+        # when shutdown(wait=False) is called.
+        result = self._run_test_issue_gh_132969(4)
+        self.assertEqual(result, 1)
+
 
 create_executor_tests(globals(), ProcessPoolShutdownTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
new file mode 100644 (file)
index 0000000..7364c42
--- /dev/null
@@ -0,0 +1,7 @@
+Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
+which remains running when :meth:`shutdown(wait=False)
+<concurrent.futures.Executor.shutdown>`, from
+attempting to adjust the pool's worker processes after the object state has already been reset during shutdown.
+A combination of conditions, including a worker process having terminated abormally,
+resulted in an exception and a potential hang when the still-running executor thread
+attempted to replace dead workers within the pool.