[3.14] gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268) (GH-151431)
* gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268)
(cherry picked from commit
3c00ebc2bbd902495b163def850bc931420209fc)
(cherry picked from commit
4e8c9c6a5742659ca9f77719178802cce06a3507)
* Drop the abrupt-exit-code reporting from the 3.14 backport
Reporting the exit codes of processes that died without a known cause
is a new feature, not part of the gh-101267 bugfix. Keep only the
bugfix on 3.14: each failed future gets its own BrokenProcessPool
exception instead of one shared instance.
---------
(cherry picked from commit
27ff2c8c53e29e5075cba7d393f593a3a682a2f3)
Co-authored-by: Gregory P. Smith <68491+gpshead@users.noreply.github.com>
Co-authored-by: Daniel Shields <daniel.shields@twosigma.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
executor._shutdown_thread = True
executor = None
- # All pending tasks are to be marked failed with the following
- # BrokenProcessPool error
- bpe = BrokenProcessPool("A process in the process pool was "
- "terminated abruptly while the future was "
- "running or pending.")
+ # All pending tasks are to be marked failed with a
+ # BrokenProcessPool error, as separate instances to avoid sharing
+ # a traceback (gh-101267).
+ cause_tb = None
if cause is not None:
- bpe.__cause__ = _RemoteTraceback(
- f"\n'''\n{''.join(cause)}'''")
+ cause_tb = f"\n'''\n{''.join(cause)}'''"
# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
+ bpe = BrokenProcessPool("A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending.")
+ if cause_tb is not None:
+ bpe.__cause__ = _RemoteTraceback(cause_tb)
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError:
import sys
import threading
import time
+import traceback
import unittest
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+ def test_broken_process_pool_traceback(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken", and a BrokenProcessPool exception should be created
+ # for each future instead of sharing one exception among all futures.
+ event = self.create_event()
+ futures = [self.executor.submit(event.wait) for _ in range(3)]
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ # Don't use assertRaises(): it clears the traceback off exc.
+ try:
+ fut.result()
+ except BrokenProcessPool as exc:
+ tb = exc.__traceback__
+ else:
+ self.fail("BrokenProcessPool not raised")
+ count = sum(
+ 1
+ for frame_summary in traceback.extract_tb(tb)
+ if frame_summary.filename == __file__
+ )
+ # This code file should appear exactly once in the traceback.
+ # A shared exception would accumulate a frame per result() call.
+ self.assertEqual(count, 1)
+
def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
Bruce Sherwood
Gregory Shevchenko
Hai Shi
+Daniel Shields
Alexander Shigin
Pete Shinners
Michael Shiplett
--- /dev/null
+When a worker process terminates unexpectedly,
+:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate
+:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending
+future instead of sharing a single instance among them all. Sharing one
+exception produced malformed tracebacks: each
+:meth:`Future.result() <concurrent.futures.Future.result>` call re-raised the
+same object, appending another copy of the traceback to it.