]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
authorKyle Stanley <aeros167@gmail.com>
Sun, 2 Feb 2020 12:49:00 +0000 (07:49 -0500)
committerGitHub <noreply@github.com>
Sun, 2 Feb 2020 12:49:00 +0000 (13:49 +0100)
Doc/library/concurrent.futures.rst
Doc/whatsnew/3.9.rst
Lib/concurrent/futures/process.py
Lib/concurrent/futures/thread.py
Lib/test/test_concurrent_futures.py
Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst [new file with mode: 0644]

index d71f2d80c9e2d0ba38abde8300c4e40ed3cda5d8..b21d5594c84fa026a23a060f4b714c46e89ca2d5 100644 (file)
@@ -67,7 +67,7 @@ Executor Objects
        .. versionchanged:: 3.5
           Added the *chunksize* argument.
 
-    .. method:: shutdown(wait=True)
+    .. method:: shutdown(wait=True, \*, cancel_futures=False)
 
        Signal the executor that it should free any resources that it is using
        when the currently pending futures are done executing.  Calls to
@@ -82,6 +82,15 @@ Executor Objects
        value of *wait*, the entire Python program will not exit until all
        pending futures are done executing.
 
+       If *cancel_futures* is ``True``, this method will cancel all pending
+       futures that the executor has not started running. Any futures that
+       are completed or running won't be cancelled, regardless of the value
+       of *cancel_futures*.
+
+       If both *cancel_futures* and *wait* are ``True``, all futures that the
+       executor has started running will be completed prior to this method
+       returning. The remaining futures are cancelled.
+
        You can avoid having to call this method explicitly if you use the
        :keyword:`with` statement, which will shutdown the :class:`Executor`
        (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
@@ -94,6 +103,9 @@ Executor Objects
               e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
               e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
 
+       .. versionchanged:: 3.9
+          Added *cancel_futures*.
+
 
 ThreadPoolExecutor
 ------------------
index ee37b5a5b72653ec40fc70a6f723b189ee005270..931f8bf13fbde6a9bc00578ace4fccecc64151ab 100644 (file)
@@ -146,6 +146,15 @@ that schedules a shutdown for the default executor that waits on the
 Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
 implementation that polls process file descriptors. (:issue:`38692`)
 
+concurrent.futures
+------------------
+
+Added a new *cancel_futures* parameter to
+:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures
+which have not started running, instead of waiting for them to complete before
+shutting down the executor.
+(Contributed by Kyle Stanley in :issue:`39349`.)
+
 curses
 ------
 
index 9e2ab9db64f6648b2f6d684e047ba52ad24f5051..fd9f572b6c71160e22cde46c4b20b80c17c3cb53 100644 (file)
@@ -435,6 +435,24 @@ def _queue_management_worker(executor_reference,
                 # is not gc-ed yet.
                 if executor is not None:
                     executor._shutdown_thread = True
+                    # Unless there are pending work items, we have nothing to cancel.
+                    if pending_work_items and executor._cancel_pending_futures:
+                        # Cancel all pending futures and update pending_work_items
+                        # to only have futures that are currently running.
+                        new_pending_work_items = {}
+                        for work_id, work_item in pending_work_items.items():
+                            if not work_item.future.cancel():
+                                new_pending_work_items[work_id] = work_item
+
+                        pending_work_items = new_pending_work_items
+                        # Drain work_ids_queue since we no longer need to
+                        # add items to the call queue.
+                        while True:
+                            try:
+                                work_ids_queue.get_nowait()
+                            except queue.Empty:
+                                break
+
                 # Since no new work items can be added, it is safe to shutdown
                 # this thread if there are no pending work items.
                 if not pending_work_items:
@@ -546,6 +564,7 @@ class ProcessPoolExecutor(_base.Executor):
         self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
+        self._cancel_pending_futures = False
 
         # Create communication channels for the executor
         # Make the call queue slightly larger than the number of processes to
@@ -660,9 +679,11 @@ class ProcessPoolExecutor(_base.Executor):
                               timeout=timeout)
         return _chain_from_iterable_of_lists(results)
 
-    def shutdown(self, wait=True):
+    def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
+            self._cancel_pending_futures = cancel_futures
             self._shutdown_thread = True
+
         if self._queue_management_thread:
             # Wake up queue management thread
             self._queue_management_thread_wakeup.wakeup()
index b89f8f24d4d6738fb0e7b81229ba3de6a914ab34..be79161bf8561dc620b955d9b4e5b088d286dac7 100644 (file)
@@ -215,9 +215,22 @@ class ThreadPoolExecutor(_base.Executor):
                 if work_item is not None:
                     work_item.future.set_exception(BrokenThreadPool(self._broken))
 
-    def shutdown(self, wait=True):
+    def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
             self._shutdown = True
+            if cancel_futures:
+                # Drain all work items from the queue, and then cancel their
+                # associated futures.
+                while True:
+                    try:
+                        work_item = self._work_queue.get_nowait()
+                    except queue.Empty:
+                        break
+                    if work_item is not None:
+                        work_item.future.cancel()
+
+            # Send a wake-up to prevent threads calling
+            # _work_queue.get(block=True) from permanently blocking.
             self._work_queue.put(None)
         if wait:
             for t in self._threads:
index c8fa35e9eeafa752c7a5852b59f47d2fd1b5304b..af77f813419104b0520cdd8c9e3cd4e53ae7aadd 100644 (file)
@@ -342,6 +342,29 @@ class ExecutorShutdownTest:
         for f in fs:
             f.result()
 
+    def test_cancel_futures(self):
+        executor = self.executor_type(max_workers=3)
+        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
+        executor.shutdown(cancel_futures=True)
+        # We can't guarantee the exact number of cancellations, but we can
+        # guarantee that *some* were cancelled. With setting max_workers to 3,
+        # most of the submitted futures should have been cancelled.
+        cancelled = [fut for fut in fs if fut.cancelled()]
+        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
+
+        # Ensure the other futures were able to finish.
+        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
+        # that may have been left in a pending state.
+        others = [fut for fut in fs if not fut.cancelled()]
+        for fut in others:
+            self.assertTrue(fut.done(), msg=f"{fut._state=}")
+            self.assertIsNone(fut.exception())
+
+        # Similar to the number of cancelled futures, we can't guarantee the
+        # exact number that completed. But, we can guarantee that at least
+        # one finished.
+        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
+
     def test_hang_issue39205(self):
         """shutdown(wait=False) doesn't hang at exit with running futures.
 
@@ -422,6 +445,22 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
             self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
             t.join()
 
+    def test_cancel_futures_wait_false(self):
+        # Can only be reliably tested for TPE, since PPE often hangs with
+        # `wait=False` (even without *cancel_futures*).
+        rc, out, err = assert_python_ok('-c', """if True:
+            from concurrent.futures import ThreadPoolExecutor
+            from test.test_concurrent_futures import sleep_and_print
+            if __name__ == "__main__":
+                t = ThreadPoolExecutor()
+                t.submit(sleep_and_print, .1, "apple")
+                t.shutdown(wait=False, cancel_futures=True)
+            """.format(executor_type=self.executor_type.__name__))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
 
 class ProcessPoolShutdownTest(ExecutorShutdownTest):
     def _prime_executor(self):
diff --git a/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst
new file mode 100644 (file)
index 0000000..cc52700
--- /dev/null
@@ -0,0 +1,4 @@
+Added a new *cancel_futures* parameter to\r
+:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures\r
+which have not started running, instead of waiting for them to complete before\r
+shutting down the executor.
\ No newline at end of file