]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (#125492)
authorSam Gross <colesbury@gmail.com>
Wed, 16 Oct 2024 15:39:17 +0000 (11:39 -0400)
committerGitHub <noreply@github.com>
Wed, 16 Oct 2024 15:39:17 +0000 (11:39 -0400)
There was a deadlock when `ProcessPoolExecutor` shuts down at the same
time that a queueing thread handles an error processing a task.

Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use
an internal lock instead. This fixes the ordering deadlock where the
`ExecutorManagerThread` holds the `_shutdown_lock` and joins the
queueing thread, while the queueing thread is attempting to acquire the
`_shutdown_lock` while closing the `_ThreadWakeup`.

Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures/test_shutdown.py
Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst [new file with mode: 0644]

index 7092b4757b5429ff45eb38dd56eaee070ff35929..42eee72bc1457f71b046c571dc04214b1d34e460 100644 (file)
@@ -68,27 +68,31 @@ _global_shutdown = False
 class _ThreadWakeup:
     def __init__(self):
         self._closed = False
+        self._lock = threading.Lock()
         self._reader, self._writer = mp.Pipe(duplex=False)
 
     def close(self):
-        # Please note that we do not take the shutdown lock when
+        # Please note that we do not take the self._lock when
         # calling clear() (to avoid deadlocking) so this method can
         # only be called safely from the same thread as all calls to
-        # clear() even if you hold the shutdown lock. Otherwise we
+        # clear() even if you hold the lock. Otherwise we
         # might try to read from the closed pipe.
-        if not self._closed:
-            self._closed = True
-            self._writer.close()
-            self._reader.close()
+        with self._lock:
+            if not self._closed:
+                self._closed = True
+                self._writer.close()
+                self._reader.close()
 
     def wakeup(self):
-        if not self._closed:
-            self._writer.send_bytes(b"")
+        with self._lock:
+            if not self._closed:
+                self._writer.send_bytes(b"")
 
     def clear(self):
-        if not self._closed:
-            while self._reader.poll():
-                self._reader.recv_bytes()
+        if self._closed:
+            raise RuntimeError('operation on closed _ThreadWakeup')
+        while self._reader.poll():
+            self._reader.recv_bytes()
 
 
 def _python_exit():
@@ -167,10 +171,8 @@ class _CallItem(object):
 
 class _SafeQueue(Queue):
     """Safe Queue set exception to the future object linked to a job"""
-    def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
-                 thread_wakeup):
+    def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
         self.pending_work_items = pending_work_items
-        self.shutdown_lock = shutdown_lock
         self.thread_wakeup = thread_wakeup
         super().__init__(max_size, ctx=ctx)
 
@@ -179,8 +181,7 @@ class _SafeQueue(Queue):
             tb = format_exception(type(e), e, e.__traceback__)
             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
             work_item = self.pending_work_items.pop(obj.work_id, None)
-            with self.shutdown_lock:
-                self.thread_wakeup.wakeup()
+            self.thread_wakeup.wakeup()
             # work_item can be None if another process terminated. In this
             # case, the executor_manager_thread fails all work_items
             # with BrokenProcessPool
@@ -296,12 +297,10 @@ class _ExecutorManagerThread(threading.Thread):
         # if there is no pending work item.
         def weakref_cb(_,
                        thread_wakeup=self.thread_wakeup,
-                       shutdown_lock=self.shutdown_lock,
                        mp_util_debug=mp.util.debug):
             mp_util_debug('Executor collected: triggering callback for'
                           ' QueueManager wakeup')
-            with shutdown_lock:
-                thread_wakeup.wakeup()
+            thread_wakeup.wakeup()
 
         self.executor_reference = weakref.ref(executor, weakref_cb)
 
@@ -429,11 +428,6 @@ class _ExecutorManagerThread(threading.Thread):
         elif wakeup_reader in ready:
             is_broken = False
 
-        # No need to hold the _shutdown_lock here because:
-        # 1. we're the only thread to use the wakeup reader
-        # 2. we're also the only thread to call thread_wakeup.close()
-        # 3. we want to avoid a possible deadlock when both reader and writer
-        #    would block (gh-105829)
         self.thread_wakeup.clear()
 
         return result_item, is_broken, cause
@@ -721,10 +715,9 @@ class ProcessPoolExecutor(_base.Executor):
         # as it could result in a deadlock if a worker process dies with the
         # _result_queue write lock still acquired.
         #
-        # _shutdown_lock must be locked to access _ThreadWakeup.close() and
-        # .wakeup(). Care must also be taken to not call clear or close from
-        # more than one thread since _ThreadWakeup.clear() is not protected by
-        # the _shutdown_lock
+        # Care must be taken to only call clear and close from the
+        # executor_manager_thread, since _ThreadWakeup.clear() is not protected
+        # by a lock.
         self._executor_manager_thread_wakeup = _ThreadWakeup()
 
         # Create communication channels for the executor
@@ -735,7 +728,6 @@ class ProcessPoolExecutor(_base.Executor):
         self._call_queue = _SafeQueue(
             max_size=queue_size, ctx=self._mp_context,
             pending_work_items=self._pending_work_items,
-            shutdown_lock=self._shutdown_lock,
             thread_wakeup=self._executor_manager_thread_wakeup)
         # Killed worker processes can produce spurious "broken pipe"
         # tracebacks in the queue's own worker thread. But we detect killed
index ba3618614a9bf9c7958647f6e5b40fd1f4209ba9..7a4065afd46fc840fa62d4450258e0a0c6ee05ce 100644 (file)
@@ -253,9 +253,6 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
 
 
 class ProcessPoolShutdownTest(ExecutorShutdownTest):
-    # gh-125451: 'lock' cannot be serialized, the test is broken
-    # and hangs randomly
-    @unittest.skipIf(True, "broken test")
     def test_processes_terminate(self):
         def acquire_lock(lock):
             lock.acquire()
diff --git a/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst b/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst
new file mode 100644 (file)
index 0000000..589988d
--- /dev/null
@@ -0,0 +1,2 @@
+Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down
+concurrently with an error when feeding a job to a worker process.