]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-90622: Prevent max_tasks_per_child use with a fork mp_context. (#91587)
authorGregory P. Smith <greg@krypto.org>
Fri, 6 May 2022 07:04:53 +0000 (00:04 -0700)
committerGitHub <noreply@github.com>
Fri, 6 May 2022 07:04:53 +0000 (00:04 -0700)
Prevent `max_tasks_per_child` use with a "fork" mp_context to avoid deadlocks.

Also defaults to "spawn" when no mp_context is supplied for safe convenience.

Doc/library/concurrent.futures.rst
Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures.py
Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst [new file with mode: 0644]

index 959280833997e5df80ee7a02dfa342212f4fddbe..99703ff3051d4798e8b5f9aa4d3a4e39798039f2 100644 (file)
@@ -254,8 +254,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
 
    *max_tasks_per_child* is an optional argument that specifies the maximum
    number of tasks a single process can execute before it will exit and be
-   replaced with a fresh worker process. The default *max_tasks_per_child* is
-   ``None`` which means worker processes will live as long as the pool.
+   replaced with a fresh worker process. By default *max_tasks_per_child* is
+   ``None`` which means worker processes will live as long as the pool. When
+   a max is specified, the "spawn" multiprocessing start method will be used by
+   default in absense of a *mp_context* parameter. This feature is incompatible
+   with the "fork" start method.
 
    .. versionchanged:: 3.3
       When one of the worker processes terminates abruptly, a
index 0d49379c9bf7ce84392fbbbedfb1d292d8f06957..821034da21adcf556db7879f0ffffcdd98b3b241 100644 (file)
@@ -617,14 +617,16 @@ class ProcessPoolExecutor(_base.Executor):
                 execute the given calls. If None or not given then as many
                 worker processes will be created as the machine has processors.
             mp_context: A multiprocessing context to launch the workers. This
-                object should provide SimpleQueue, Queue and Process.
+                object should provide SimpleQueue, Queue and Process. Useful
+                to allow specific multiprocessing start methods.
             initializer: A callable used to initialize worker processes.
             initargs: A tuple of arguments to pass to the initializer.
-            max_tasks_per_child: The maximum number of tasks a worker process can
-                complete before it will exit and be replaced with a fresh
-                worker process, to enable unused resources to be freed. The
-                default value is None, which means worker process will live
-                as long as the executor will live.
+            max_tasks_per_child: The maximum number of tasks a worker process
+                can complete before it will exit and be replaced with a fresh
+                worker process. The default of None means worker process will
+                live as long as the executor. Requires a non-'fork' mp_context
+                start method. When given, we default to using 'spawn' if no
+                mp_context is supplied.
         """
         _check_system_limits()
 
@@ -644,7 +646,10 @@ class ProcessPoolExecutor(_base.Executor):
             self._max_workers = max_workers
 
         if mp_context is None:
-            mp_context = mp.get_context()
+            if max_tasks_per_child is not None:
+                mp_context = mp.get_context("spawn")
+            else:
+                mp_context = mp.get_context()
         self._mp_context = mp_context
 
         if initializer is not None and not callable(initializer):
@@ -657,6 +662,11 @@ class ProcessPoolExecutor(_base.Executor):
                 raise TypeError("max_tasks_per_child must be an integer")
             elif max_tasks_per_child <= 0:
                 raise ValueError("max_tasks_per_child must be >= 1")
+            if self._mp_context.get_start_method(allow_none=False) == "fork":
+                # https://github.com/python/cpython/issues/90622
+                raise ValueError("max_tasks_per_child is incompatible with"
+                                 " the 'fork' multiprocessing start method;"
+                                 " supply a different mp_context.")
         self._max_tasks_per_child = max_tasks_per_child
 
         # Management thread
index 978a748df7fa3c08fff484615044ae40363de97e..4363e90b8bbabb75c53933182240a80fc52a28d3 100644 (file)
@@ -1039,10 +1039,15 @@ class ProcessPoolExecutorTest(ExecutorTest):
         executor.shutdown()
 
     def test_max_tasks_per_child(self):
+        context = self.get_context()
+        if context.get_start_method(allow_none=False) == "fork":
+            with self.assertRaises(ValueError):
+                self.executor_type(1, mp_context=context, max_tasks_per_child=3)
+            return
         # not using self.executor as we need to control construction.
         # arguably this could go in another class w/o that mixin.
         executor = self.executor_type(
-                1, mp_context=self.get_context(), max_tasks_per_child=3)
+                1, mp_context=context, max_tasks_per_child=3)
         f1 = executor.submit(os.getpid)
         original_pid = f1.result()
         # The worker pid remains the same as the worker could be reused
@@ -1061,11 +1066,20 @@ class ProcessPoolExecutorTest(ExecutorTest):
 
         executor.shutdown()
 
+    def test_max_tasks_per_child_defaults_to_spawn_context(self):
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(1, max_tasks_per_child=3)
+        self.assertEqual(executor._mp_context.get_start_method(), "spawn")
+
     def test_max_tasks_early_shutdown(self):
+        context = self.get_context()
+        if context.get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
         # not using self.executor as we need to control construction.
         # arguably this could go in another class w/o that mixin.
         executor = self.executor_type(
-                3, mp_context=self.get_context(), max_tasks_per_child=1)
+                3, mp_context=context, max_tasks_per_child=1)
         futures = []
         for i in range(6):
             futures.append(executor.submit(mul, i, i))
diff --git a/Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst b/Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst
new file mode 100644 (file)
index 0000000..4144e4c
--- /dev/null
@@ -0,0 +1,5 @@
+In ``concurrent.futures.process.ProcessPoolExecutor`` disallow the "fork"
+multiprocessing start method when the new ``max_tasks_per_child`` feature is
+used as the mix of threads+fork can hang the child processes. Default to
+using the safe "spawn" start method in that circumstance if no
+``mp_context`` was supplied.