]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (GH-108513...
authorelfstrom <elfstrom@users.noreply.github.com>
Sun, 24 Sep 2023 19:28:03 +0000 (21:28 +0200)
committerGitHub <noreply@github.com>
Sun, 24 Sep 2023 19:28:03 +0000 (20:28 +0100)
This fixes issue GH-105829, https://github.com/python/cpython/issues/105829

(cherry picked from commit 405b06375a8a4cdb08ff53afade09a8b66ec23d5)

Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures/test_deadlock.py
Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst [new file with mode: 0644]

index 7a069895d479eeb3c6c89b76ef150750b67f29c5..30a7ac8e6fe886235bf8aa3d83d526f7b3c4149a 100644 (file)
@@ -69,6 +69,11 @@ class _ThreadWakeup:
         self._reader, self._writer = mp.Pipe(duplex=False)
 
     def close(self):
+        # Please note that we do not take the shutdown lock when
+        # calling clear() (to avoid deadlocking) so this method can
+        # only be called safely from the same thread as all calls to
+        # clear() even if you hold the shutdown lock. Otherwise we
+        # might try to read from the closed pipe.
         if not self._closed:
             self._closed = True
             self._writer.close()
@@ -424,8 +429,12 @@ class _ExecutorManagerThread(threading.Thread):
         elif wakeup_reader in ready:
             is_broken = False
 
-        with self.shutdown_lock:
-            self.thread_wakeup.clear()
+        # No need to hold the _shutdown_lock here because:
+        # 1. we're the only thread to use the wakeup reader
+        # 2. we're also the only thread to call thread_wakeup.close()
+        # 3. we want to avoid a possible deadlock when both reader and writer
+        #    would block (gh-105829)
+        self.thread_wakeup.clear()
 
         return result_item, is_broken, cause
 
@@ -708,7 +717,10 @@ class ProcessPoolExecutor(_base.Executor):
         # as it could result in a deadlock if a worker process dies with the
         # _result_queue write lock still acquired.
         #
-        # _shutdown_lock must be locked to access _ThreadWakeup.
+        # _shutdown_lock must be locked to access _ThreadWakeup.close() and
+        # .wakeup(). Care must also be taken to not call clear or close from
+        # more than one thread since _ThreadWakeup.clear() is not protected by
+        # the _shutdown_lock
         self._executor_manager_thread_wakeup = _ThreadWakeup()
 
         # Create communication channels for the executor
index d128cf2374fc2122e41f9086837bed6fed3feae5..2f08bf846988856abae678b28cc137736453a21c 100644 (file)
@@ -1,10 +1,13 @@
 import contextlib
+import queue
+import signal
 import sys
 import time
 import unittest
+import unittest.mock
 from pickle import PicklingError
 from concurrent import futures
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
 
 from test import support
 
@@ -239,6 +242,73 @@ class ExecutorDeadlockTest:
             with self.assertRaises(BrokenProcessPool):
                 list(executor.map(_crash_with_data, [data] * 10))
 
+    def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
+        # Issue #105829: The _ExecutorManagerThread wakeup pipe could
+        # fill up and block. See: https://github.com/python/cpython/issues/105829
+
+        # Lots of cargo culting while writing this test, apologies if
+        # something is really stupid...
+
+        self.executor.shutdown(wait=True)
+
+        if not hasattr(signal, 'alarm'):
+            raise unittest.SkipTest(
+                "Tested platform does not support the alarm signal")
+
+        def timeout(_signum, _frame):
+            import faulthandler
+            faulthandler.dump_traceback()
+
+            raise RuntimeError("timed out while submitting jobs?")
+
+        thread_run = futures.process._ExecutorManagerThread.run
+        def mock_run(self):
+            # Delay thread startup so the wakeup pipe can fill up and block
+            time.sleep(3)
+            thread_run(self)
+
+        class MockWakeup(_ThreadWakeup):
+            """Mock wakeup object to force the wakeup to block"""
+            def __init__(self):
+                super().__init__()
+                self._dummy_queue = queue.Queue(maxsize=1)
+
+            def wakeup(self):
+                self._dummy_queue.put(None, block=True)
+                super().wakeup()
+
+            def clear(self):
+                try:
+                    while True:
+                        self._dummy_queue.get_nowait()
+                except queue.Empty:
+                    super().clear()
+
+        with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
+                                         'run', mock_run),
+              unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
+                                  MockWakeup)):
+            with self.executor_type(max_workers=2,
+                                    mp_context=self.get_context()) as executor:
+                self.executor = executor  # Allow clean up in fail_on_deadlock
+
+                job_num = 100
+                job_data = range(job_num)
+
+                # Need to use sigalarm for timeout detection because
+                # Executor.submit is not guarded by any timeout (both
+                # self._work_ids.put(self._queue_count) and
+                # self._executor_manager_thread_wakeup.wakeup() might
+                # timeout, maybe more?). In this specific case it was
+                # the wakeup call that deadlocked on a blocking pipe.
+                old_handler = signal.signal(signal.SIGALRM, timeout)
+                try:
+                    signal.alarm(int(self.TIMEOUT))
+                    self.assertEqual(job_num, len(list(executor.map(int, job_data))))
+                finally:
+                    signal.alarm(0)
+                    signal.signal(signal.SIGALRM, old_handler)
+
 
 create_executor_tests(globals(), ExecutorDeadlockTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst
new file mode 100644 (file)
index 0000000..eaa2a5a
--- /dev/null
@@ -0,0 +1 @@
+Fix concurrent.futures.ProcessPoolExecutor deadlock