require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
+ .. method:: terminate_workers()
+
+ Attempt to terminate all living worker processes immediately by calling
+ :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
+ Internally, it will also call :meth:`Executor.shutdown` to ensure that all
+ other resources associated with the executor are freed.
+
+ After calling this method the caller should no longer submit tasks to the
+ executor.
+
+ .. versionadded:: next
+
+ .. method:: kill_workers()
+
+ Attempt to kill all living worker processes immediately by calling
+ :meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
+ Internally, it will also call :meth:`Executor.shutdown` to ensure that all
+ other resources associated with the executor are freed.
+
+ After calling this method the caller should no longer submit tasks to the
+ executor.
+
+ .. versionadded:: next
+
.. _processpoolexecutor-example:
ProcessPoolExecutor Example
while a future was in the running state.
"""
+_TERMINATE = "terminate"
+_KILL = "kill"
+
+_SHUTDOWN_CALLBACK_OPERATION = {
+ _TERMINATE,
+ _KILL
+}
+
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+ def _force_shutdown(self, operation):
+ """Attempts to terminate or kill the executor's workers based off the
+ given operation. Iterates through all of the current processes and
+ performs the relevant task if the process is still alive.
+
+ After terminating workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ if operation not in _SHUTDOWN_CALLBACK_OPERATION:
+ raise ValueError(f"Unsupported operation: {operation!r}")
+
+ processes = {}
+ if self._processes:
+ processes = self._processes.copy()
+
+ # shutdown will invalidate ._processes, so we copy it right before
+ # calling. If we waited here, we would deadlock if a process decides not
+ # to exit.
+ self.shutdown(wait=False, cancel_futures=True)
+
+ if not processes:
+ return
+
+ for proc in processes.values():
+ try:
+ if not proc.is_alive():
+ continue
+ except ValueError:
+ # The process is already exited/closed out.
+ continue
+
+ try:
+ if operation == _TERMINATE:
+ proc.terminate()
+ elif operation == _KILL:
+ proc.kill()
+ except ProcessLookupError:
+ # The process just ended before our signal
+ continue
+
+ def terminate_workers(self):
+ """Attempts to terminate the executor's workers.
+ Iterates through all of the current worker processes and terminates
+ each one that is still alive.
+
+ After terminating workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ return self._force_shutdown(operation=_TERMINATE)
+
+ def kill_workers(self):
+ """Attempts to kill the executor's workers.
+ Iterates through all of the current worker processes and kills
+ each one that is still alive.
+
+ After killing workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ return self._force_shutdown(operation=_KILL)
import os
+import queue
+import signal
import sys
import threading
import time
import unittest
+import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
from test import support
from test.support import hashlib_helper
+from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul
from .util import (
def __del__(self):
self.event.set()
+TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
+KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
+FORCE_SHUTDOWN_PARAMS = [
+ dict(function_name=TERMINATE_WORKERS),
+ dict(function_name=KILL_WORKERS),
+]
+
+def _put_sleep_put(queue):
+ """ Used as part of test_terminate_workers """
+ queue.put('started')
+ time.sleep(2)
+ queue.put('finished')
+
class ProcessPoolExecutorTest(ExecutorTest):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()
+ def test_terminate_workers(self):
+ mock_fn = unittest.mock.Mock()
+ with self.executor_type(max_workers=1) as executor:
+ executor._force_shutdown = mock_fn
+ executor.terminate_workers()
+
+ mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
+
+ def test_kill_workers(self):
+ mock_fn = unittest.mock.Mock()
+ with self.executor_type(max_workers=1) as executor:
+ executor._force_shutdown = mock_fn
+ executor.kill_workers()
+
+ mock_fn.assert_called_once_with(operation=futures.process._KILL)
+
+ def test_force_shutdown_workers_invalid_op(self):
+ with self.executor_type(max_workers=1) as executor:
+ self.assertRaises(ValueError,
+ executor._force_shutdown,
+ operation='invalid operation'),
+
+ @parameterize(*FORCE_SHUTDOWN_PARAMS)
+ def test_force_shutdown_workers(self, function_name):
+ manager = self.get_context().Manager()
+ q = manager.Queue()
+
+ with self.executor_type(max_workers=1) as executor:
+ executor.submit(_put_sleep_put, q)
+
+ # We should get started, but not finished since we'll terminate the
+ # workers just after
+ self.assertEqual(q.get(timeout=5), 'started')
+
+ worker_process = list(executor._processes.values())[0]
+ getattr(executor, function_name)()
+ worker_process.join()
+
+ if function_name == TERMINATE_WORKERS or \
+ sys.platform == 'win32':
+ # On windows, kill and terminate both send SIGTERM
+ self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
+ elif function_name == KILL_WORKERS:
+ self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
+ else:
+ self.fail(f"Unknown operation: {function_name}")
+
+ self.assertRaises(queue.Empty, q.get, timeout=1)
+
+ @parameterize(*FORCE_SHUTDOWN_PARAMS)
+ def test_force_shutdown_workers_dead_workers(self, function_name):
+ with self.executor_type(max_workers=1) as executor:
+ future = executor.submit(os._exit, 1)
+ self.assertRaises(BrokenProcessPool, future.result)
+
+ # even though the pool is broken, this shouldn't raise
+ getattr(executor, function_name)()
+
+ @parameterize(*FORCE_SHUTDOWN_PARAMS)
+ def test_force_shutdown_workers_not_started_yet(self, function_name):
+ ctx = self.get_context()
+ with unittest.mock.patch.object(ctx, 'Process') as mock_process:
+ with self.executor_type(max_workers=1, mp_context=ctx) as executor:
+ # The worker has not been started yet, terminate/kill_workers
+ # should basically no-op
+ getattr(executor, function_name)()
+
+ mock_process.return_value.kill.assert_not_called()
+ mock_process.return_value.terminate.assert_not_called()
+
+ @parameterize(*FORCE_SHUTDOWN_PARAMS)
+ def test_force_shutdown_workers_stops_pool(self, function_name):
+ with self.executor_type(max_workers=1) as executor:
+ task = executor.submit(time.sleep, 0)
+ self.assertIsNone(task.result())
+
+ getattr(executor, function_name)()
+
+ self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
+
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,