# Main loop for the executor manager thread.
while True:
- self.add_call_item_to_queue()
+ # gh-109047: During Python finalization, self.call_queue.put()
+ # creation of a thread can fail with RuntimeError.
+ try:
+ self.add_call_item_to_queue()
+ except BaseException as exc:
+ cause = format_exception(exc)
+ self.terminate_broken(cause)
+ return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
try:
result_item = result_reader.recv()
is_broken = False
- except BaseException as e:
- cause = format_exception(type(e), e, e.__traceback__)
+ except BaseException as exc:
+ cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
return (_global_shutdown or executor is None
or executor._shutdown_thread)
- def terminate_broken(self, cause):
+ def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
for work_id, work_item in self.pending_work_items.items():
try:
work_item.future.set_exception(bpe)
- except _base.InvalidStateError as exc:
+ except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
for p in self.processes.values():
p.terminate()
- # Prevent queue writing to a pipe which is no longer read.
- # https://github.com/python/cpython/issues/94777
- self.call_queue._reader.close()
-
- # gh-107219: Close the connection writer which can unblock
- # Queue._feed() if it was stuck in send_bytes().
- if sys.platform == 'win32':
- self.call_queue._writer.close()
+ self.call_queue._terminate_broken()
# clean up resources
- self.join_executor_internals()
+ self._join_executor_internals(broken=True)
+
+ def terminate_broken(self, cause):
+ with self.shutdown_lock:
+ self._terminate_broken(cause)
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
break
def join_executor_internals(self):
- self.shutdown_workers()
+ with self.shutdown_lock:
+ self._join_executor_internals()
+
+ def _join_executor_internals(self, broken=False):
+ # If broken, call_queue was closed and so can no longer be used.
+ if not broken:
+ self.shutdown_workers()
+
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
- with self.shutdown_lock:
- self.thread_wakeup.close()
+ self.thread_wakeup.close()
+
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
+ if broken:
+ p.terminate()
p.join()
def get_n_children_alive(self):
except AttributeError:
pass
+ def _terminate_broken(self):
+ # Close a Queue on error.
+
+ # gh-94777: Prevent queue writing to a pipe which is no longer read.
+ self._reader.close()
+
+ # gh-107219: Close the connection writer which can unblock
+ # Queue._feed() if it was stuck in send_bytes().
+ if sys.platform == 'win32':
+ self._writer.close()
+
+ self.close()
+ self.join_thread()
+
def _start_thread(self):
debug('Queue._start_thread()')
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
- name='QueueFeederThread'
+ name='QueueFeederThread',
+ daemon=True,
)
- self._thread.daemon = True
- debug('doing self._thread.start()')
- self._thread.start()
- debug('... done self._thread.start()')
+ try:
+ debug('doing self._thread.start()')
+ self._thread.start()
+ debug('... done self._thread.start()')
+ except:
+ # gh-109047: During Python finalization, creating a thread
+ # can fail with RuntimeError.
+ self._thread = None
+ raise
if not self._joincancelled:
self._jointhread = Finalize(
import os
import sys
+import threading
import time
import unittest
from concurrent import futures
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
+ def test_python_finalization_error(self):
+ # gh-109047: Catch RuntimeError on thread creation
+ # during Python finalization.
+
+ context = self.get_context()
+
+ # gh-109047: Mock the threading.start_new_thread() function to inject
+ # RuntimeError: simulate the error raised during Python finalization.
+ # Block the second creation: create _ExecutorManagerThread, but block
+ # QueueFeederThread.
+ orig_start_new_thread = threading._start_new_thread
+ nthread = 0
+ def mock_start_new_thread(func, *args):
+ nonlocal nthread
+ if nthread >= 1:
+ raise RuntimeError("can't create new thread at "
+ "interpreter shutdown")
+ nthread += 1
+ return orig_start_new_thread(func, *args)
+
+ with support.swap_attr(threading, '_start_new_thread',
+ mock_start_new_thread):
+ executor = self.executor_type(max_workers=2, mp_context=context)
+ with executor:
+ with self.assertRaises(BrokenProcessPool):
+ list(executor.map(mul, [(2, 3)] * 10))
+ executor.shutdown()
+
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,