]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-109047: concurrent.futures catches PythonFinalizationError (#109810)
authorVictor Stinner <vstinner@python.org>
Fri, 29 Sep 2023 19:31:19 +0000 (21:31 +0200)
committerGitHub <noreply@github.com>
Fri, 29 Sep 2023 19:31:19 +0000 (19:31 +0000)
concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.

Add test_python_finalization_error() to test_concurrent_futures.

concurrent.futures._ExecutorManagerThread changes:

* terminate_broken() no longer calls shutdown_workers() since the
  call queue is no longer working anymore (read and write ends of
  the queue pipe are closed).
* terminate_broken() now terminates child processes, not only
  wait until they complete.
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
  to prevent race conditons with ProcessPoolExecutor.submit().

multiprocessing.Queue changes:

* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
  leaking "dangling threads" even if the thread was not started
  yet.

Lib/concurrent/futures/process.py
Lib/multiprocessing/queues.py
Lib/test/test_concurrent_futures/test_process_pool.py
Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst [new file with mode: 0644]

index 73bdcbe869399188e480eb47aa692fe9e6cef41d..3990e6b1833d7860888cea13ea27cd98b5150d62 100644 (file)
@@ -341,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread):
         # Main loop for the executor manager thread.
 
         while True:
-            self.add_call_item_to_queue()
+            # gh-109047: During Python finalization, self.call_queue.put()
+            # creation of a thread can fail with RuntimeError.
+            try:
+                self.add_call_item_to_queue()
+            except BaseException as exc:
+                cause = format_exception(exc)
+                self.terminate_broken(cause)
+                return
 
             result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
 
@@ -425,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread):
             try:
                 result_item = result_reader.recv()
                 is_broken = False
-            except BaseException as e:
-                cause = format_exception(type(e), e, e.__traceback__)
+            except BaseException as exc:
+                cause = format_exception(exc)
 
         elif wakeup_reader in ready:
             is_broken = False
@@ -463,7 +470,7 @@ class _ExecutorManagerThread(threading.Thread):
         return (_global_shutdown or executor is None
                 or executor._shutdown_thread)
 
-    def terminate_broken(self, cause):
+    def _terminate_broken(self, cause):
         # Terminate the executor because it is in a broken state. The cause
         # argument can be used to display more information on the error that
         # lead the executor into becoming broken.
@@ -490,7 +497,7 @@ class _ExecutorManagerThread(threading.Thread):
         for work_id, work_item in self.pending_work_items.items():
             try:
                 work_item.future.set_exception(bpe)
-            except _base.InvalidStateError as exc:
+            except _base.InvalidStateError:
                 # set_exception() fails if the future is cancelled: ignore it.
                 # Trying to check if the future is cancelled before calling
                 # set_exception() would leave a race condition if the future is
@@ -505,17 +512,14 @@ class _ExecutorManagerThread(threading.Thread):
         for p in self.processes.values():
             p.terminate()
 
-        # Prevent queue writing to a pipe which is no longer read.
-        # https://github.com/python/cpython/issues/94777
-        self.call_queue._reader.close()
-
-        # gh-107219: Close the connection writer which can unblock
-        # Queue._feed() if it was stuck in send_bytes().
-        if sys.platform == 'win32':
-            self.call_queue._writer.close()
+        self.call_queue._terminate_broken()
 
         # clean up resources
-        self.join_executor_internals()
+        self._join_executor_internals(broken=True)
+
+    def terminate_broken(self, cause):
+        with self.shutdown_lock:
+            self._terminate_broken(cause)
 
     def flag_executor_shutting_down(self):
         # Flag the executor as shutting down and cancel remaining tasks if
@@ -558,15 +562,24 @@ class _ExecutorManagerThread(threading.Thread):
                     break
 
     def join_executor_internals(self):
-        self.shutdown_workers()
+        with self.shutdown_lock:
+            self._join_executor_internals()
+
+    def _join_executor_internals(self, broken=False):
+        # If broken, call_queue was closed and so can no longer be used.
+        if not broken:
+            self.shutdown_workers()
+
         # Release the queue's resources as soon as possible.
         self.call_queue.close()
         self.call_queue.join_thread()
-        with self.shutdown_lock:
-            self.thread_wakeup.close()
+        self.thread_wakeup.close()
+
         # If .join() is not called on the created processes then
         # some ctx.Queue methods may deadlock on Mac OS X.
         for p in self.processes.values():
+            if broken:
+                p.terminate()
             p.join()
 
     def get_n_children_alive(self):
index daf9ee94a194318555e439aea1349e42b45f7cb2..852ae87b27686122a096dfa691c5364d87ebe8aa 100644 (file)
@@ -158,6 +158,20 @@ class Queue(object):
         except AttributeError:
             pass
 
+    def _terminate_broken(self):
+        # Close a Queue on error.
+
+        # gh-94777: Prevent queue writing to a pipe which is no longer read.
+        self._reader.close()
+
+        # gh-107219: Close the connection writer which can unblock
+        # Queue._feed() if it was stuck in send_bytes().
+        if sys.platform == 'win32':
+            self._writer.close()
+
+        self.close()
+        self.join_thread()
+
     def _start_thread(self):
         debug('Queue._start_thread()')
 
@@ -169,13 +183,19 @@ class Queue(object):
                   self._wlock, self._reader.close, self._writer.close,
                   self._ignore_epipe, self._on_queue_feeder_error,
                   self._sem),
-            name='QueueFeederThread'
+            name='QueueFeederThread',
+            daemon=True,
         )
-        self._thread.daemon = True
 
-        debug('doing self._thread.start()')
-        self._thread.start()
-        debug('... done self._thread.start()')
+        try:
+            debug('doing self._thread.start()')
+            self._thread.start()
+            debug('... done self._thread.start()')
+        except:
+            # gh-109047: During Python finalization, creating a thread
+            # can fail with RuntimeError.
+            self._thread = None
+            raise
 
         if not self._joincancelled:
             self._jointhread = Finalize(
index 7763a4946f110cbe56fba5dfe75afb137c61f399..c73c2da1a010882a40de4a13489e5c1950f28a8b 100644 (file)
@@ -1,5 +1,6 @@
 import os
 import sys
+import threading
 import time
 import unittest
 from concurrent import futures
@@ -187,6 +188,34 @@ class ProcessPoolExecutorTest(ExecutorTest):
         for i, future in enumerate(futures):
             self.assertEqual(future.result(), mul(i, i))
 
+    def test_python_finalization_error(self):
+        # gh-109047: Catch RuntimeError on thread creation
+        # during Python finalization.
+
+        context = self.get_context()
+
+        # gh-109047: Mock the threading.start_new_thread() function to inject
+        # RuntimeError: simulate the error raised during Python finalization.
+        # Block the second creation: create _ExecutorManagerThread, but block
+        # QueueFeederThread.
+        orig_start_new_thread = threading._start_new_thread
+        nthread = 0
+        def mock_start_new_thread(func, *args):
+            nonlocal nthread
+            if nthread >= 1:
+                raise RuntimeError("can't create new thread at "
+                                   "interpreter shutdown")
+            nthread += 1
+            return orig_start_new_thread(func, *args)
+
+        with support.swap_attr(threading, '_start_new_thread',
+                               mock_start_new_thread):
+            executor = self.executor_type(max_workers=2, mp_context=context)
+            with executor:
+                with self.assertRaises(BrokenProcessPool):
+                    list(executor.map(mul, [(2, 3)] * 10))
+            executor.shutdown()
+
 
 create_executor_tests(globals(), ProcessPoolExecutorTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst
new file mode 100644 (file)
index 0000000..71cb5a8
--- /dev/null
@@ -0,0 +1,4 @@
+:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
+when adding an item to the *call queue*. During Python finalization, creating a
+new thread can now raise :exc:`RuntimeError`. Catch the exception and call
+``terminate_broken()`` in this case. Patch by Victor Stinner.