]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure...
authorThomas Moreau <thomas.moreau.2010@gmail.com>
Sun, 16 Feb 2020 18:09:26 +0000 (19:09 +0100)
committerGitHub <noreply@github.com>
Sun, 16 Feb 2020 18:09:26 +0000 (10:09 -0800)
As reported initially by @rad-pat in #6084, the following script causes a deadlock.

```
from concurrent.futures import ProcessPoolExecutor

class ObjectWithPickleError():
    """Triggers a RuntimeError when sending job to the workers"""

    def __reduce__(self):
        raise RuntimeError()

if __name__ == "__main__":
    e = ProcessPoolExecutor()
    f = e.submit(id, ObjectWithPickleError())
    e.shutdown(wait=False)
    f.result()  # Deadlock on get
```

This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing.

https://bugs.python.org/issue39104

Automerge-Triggered-By: @pitrou
Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures.py
Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst [new file with mode: 0644]

index fd9f572b6c71160e22cde46c4b20b80c17c3cb53..d77322831a6c6a58846a2624e9ed89ff2923f5a9 100644 (file)
@@ -80,18 +80,23 @@ _global_shutdown = False
 
 class _ThreadWakeup:
     def __init__(self):
+        self._closed = False
         self._reader, self._writer = mp.Pipe(duplex=False)
 
     def close(self):
-        self._writer.close()
-        self._reader.close()
+        if not self._closed:
+            self._closed = True
+            self._writer.close()
+            self._reader.close()
 
     def wakeup(self):
-        self._writer.send_bytes(b"")
+        if not self._closed:
+            self._writer.send_bytes(b"")
 
     def clear(self):
-        while self._reader.poll():
-            self._reader.recv_bytes()
+        if not self._closed:
+            while self._reader.poll():
+                self._reader.recv_bytes()
 
 
 def _python_exit():
@@ -160,8 +165,9 @@ class _CallItem(object):
 
 class _SafeQueue(Queue):
     """Safe Queue set exception to the future object linked to a job"""
-    def __init__(self, max_size=0, *, ctx, pending_work_items):
+    def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
         self.pending_work_items = pending_work_items
+        self.thread_wakeup = thread_wakeup
         super().__init__(max_size, ctx=ctx)
 
     def _on_queue_feeder_error(self, e, obj):
@@ -169,6 +175,7 @@ class _SafeQueue(Queue):
             tb = traceback.format_exception(type(e), e, e.__traceback__)
             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
             work_item = self.pending_work_items.pop(obj.work_id, None)
+            self.thread_wakeup.wakeup()
             # work_item can be None if another process terminated. In this case,
             # the queue_manager_thread fails all work_items with BrokenProcessPool
             if work_item is not None:
@@ -339,6 +346,8 @@ def _queue_management_worker(executor_reference,
 
         # Release the queue's resources as soon as possible.
         call_queue.close()
+        call_queue.join_thread()
+        thread_wakeup.close()
         # If .join() is not called on the created processes then
         # some ctx.Queue methods may deadlock on Mac OS X.
         for p in processes.values():
@@ -566,6 +575,14 @@ class ProcessPoolExecutor(_base.Executor):
         self._pending_work_items = {}
         self._cancel_pending_futures = False
 
+        # _ThreadWakeup is a communication channel used to interrupt the wait
+        # of the main loop of queue_manager_thread from another thread (e.g.
+        # when calling executor.submit or executor.shutdown). We do not use the
+        # _result_queue to send the wakeup signal to the queue_manager_thread
+        # as it could result in a deadlock if a worker process dies with the
+        # _result_queue write lock still acquired.
+        self._queue_management_thread_wakeup = _ThreadWakeup()
+
         # Create communication channels for the executor
         # Make the call queue slightly larger than the number of processes to
         # prevent the worker processes from idling. But don't make it too big
@@ -573,7 +590,8 @@ class ProcessPoolExecutor(_base.Executor):
         queue_size = self._max_workers + EXTRA_QUEUED_CALLS
         self._call_queue = _SafeQueue(
             max_size=queue_size, ctx=self._mp_context,
-            pending_work_items=self._pending_work_items)
+            pending_work_items=self._pending_work_items,
+            thread_wakeup=self._queue_management_thread_wakeup)
         # Killed worker processes can produce spurious "broken pipe"
         # tracebacks in the queue's own worker thread. But we detect killed
         # processes anyway, so silence the tracebacks.
@@ -581,14 +599,6 @@ class ProcessPoolExecutor(_base.Executor):
         self._result_queue = mp_context.SimpleQueue()
         self._work_ids = queue.Queue()
 
-        # _ThreadWakeup is a communication channel used to interrupt the wait
-        # of the main loop of queue_manager_thread from another thread (e.g.
-        # when calling executor.submit or executor.shutdown). We do not use the
-        # _result_queue to send the wakeup signal to the queue_manager_thread
-        # as it could result in a deadlock if a worker process dies with the
-        # _result_queue write lock still acquired.
-        self._queue_management_thread_wakeup = _ThreadWakeup()
-
     def _start_queue_management_thread(self):
         if self._queue_management_thread is None:
             # When the executor gets garbarge collected, the weakref callback
@@ -692,16 +702,11 @@ class ProcessPoolExecutor(_base.Executor):
         # To reduce the risk of opening too many files, remove references to
         # objects that use file descriptors.
         self._queue_management_thread = None
-        if self._call_queue is not None:
-            self._call_queue.close()
-            if wait:
-                self._call_queue.join_thread()
-            self._call_queue = None
+        self._call_queue = None
         self._result_queue = None
         self._processes = None
 
         if self._queue_management_thread_wakeup:
-            self._queue_management_thread_wakeup.close()
             self._queue_management_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
index af77f813419104b0520cdd8c9e3cd4e53ae7aadd..a7381f9d13eb1e6b5c35ead3621a3b6d31a10f6f 100644 (file)
@@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
 
     def test_del_shutdown(self):
         executor = futures.ThreadPoolExecutor(max_workers=5)
-        executor.map(abs, range(-5, 5))
+        res = executor.map(abs, range(-5, 5))
         threads = executor._threads
         del executor
 
         for t in threads:
             t.join()
 
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+    def test_shutdown_no_wait(self):
+        # Ensure that the executor cleans up the threads when calling
+        # shutdown with wait=False
+        executor = futures.ThreadPoolExecutor(max_workers=5)
+        res = executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        executor.shutdown(wait=False)
+        for t in threads:
+            t.join()
+
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
     def test_thread_names_assigned(self):
         executor = futures.ThreadPoolExecutor(
             max_workers=5, thread_name_prefix='SpecialPool')
@@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
 
     def test_del_shutdown(self):
         executor = futures.ProcessPoolExecutor(max_workers=5)
-        list(executor.map(abs, range(-5, 5)))
+        res = executor.map(abs, range(-5, 5))
         queue_management_thread = executor._queue_management_thread
         processes = executor._processes
         call_queue = executor._call_queue
@@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
             p.join()
         call_queue.join_thread()
 
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+    def test_shutdown_no_wait(self):
+        # Ensure that the executor cleans up the processes when calling
+        # shutdown with wait=False
+        executor = futures.ProcessPoolExecutor(max_workers=5)
+        res = executor.map(abs, range(-5, 5))
+        processes = executor._processes
+        call_queue = executor._call_queue
+        queue_management_thread = executor._queue_management_thread
+        executor.shutdown(wait=False)
+
+        # Make sure that all the executor resources were properly cleaned by
+        # the shutdown process
+        queue_management_thread.join()
+        for p in processes.values():
+            p.join()
+        call_queue.join_thread()
+
+        # Make sure the results were all computed before the executor got
+        # shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
 
 create_executor_tests(ProcessPoolShutdownTest,
                       executor_mixins=(ProcessPoolForkMixin,
@@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest:
             with self.assertRaises(BrokenProcessPool):
                 f.result()
 
+    def test_shutdown_deadlock_pickle(self):
+        # Test that the pool calling shutdown with wait=False does not cause
+        # a deadlock if a task fails at pickle after the shutdown call.
+        # Reported in bpo-39104.
+        self.executor.shutdown(wait=True)
+        with self.executor_type(max_workers=2,
+                                mp_context=get_context(self.ctx)) as executor:
+            self.executor = executor  # Allow clean up in fail_on_deadlock
+
+            # Start the executor and get the queue_management_thread to collect
+            # the threads and avoid dangling thread that should be cleaned up
+            # asynchronously.
+            executor.submit(id, 42).result()
+            queue_manager = executor._queue_management_thread
+
+            # Submit a task that fails at pickle and shutdown the executor
+            # without waiting
+            f = executor.submit(id, ErrorAtPickle())
+            executor.shutdown(wait=False)
+            with self.assertRaises(PicklingError):
+                f.result()
+
+        # Make sure the executor is eventually shutdown and do not leave
+        # dangling threads
+        queue_manager.join()
+
 
 create_executor_tests(ExecutorDeadlockTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst b/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst
new file mode 100644 (file)
index 0000000..52779bf
--- /dev/null
@@ -0,0 +1,2 @@
+Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has
+failed pickling.