os.register_at_fork(before=_global_shutdown_lock.acquire,
after_in_child=_global_shutdown_lock._at_fork_reinit,
after_in_parent=_global_shutdown_lock.release)
+ os.register_at_fork(after_in_child=_threads_queues.clear)
class _WorkItem:
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)
+ @support.requires_fork()
+ @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
+ def test_process_fork_from_a_threadpool(self):
+ # bpo-43944: clear concurrent.futures.thread._threads_queues after fork,
+ # otherwise child process will try to join parent thread
+ def fork_process_and_return_exitcode():
+ # Ignore the warning about fork with threads.
+ with self.assertWarnsRegex(DeprecationWarning,
+ r"use of fork\(\) may lead to deadlocks in the child"):
+ p = mp.get_context('fork').Process(target=lambda: 1)
+ p.start()
+ p.join()
+ return p.exitcode
+
+ with futures.ThreadPoolExecutor(1) as pool:
+ process_exitcode = pool.submit(fork_process_and_return_exitcode).result()
+
+ self.assertEqual(process_exitcode, 0)
+
def test_executor_map_current_future_cancel(self):
stop_event = threading.Event()
log = []