]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-107219: Fix concurrent.futures terminate_broken() (#109244)
authorVictor Stinner <vstinner@python.org>
Mon, 11 Sep 2023 08:11:31 +0000 (10:11 +0200)
committerGitHub <noreply@github.com>
Mon, 11 Sep 2023 08:11:31 +0000 (08:11 +0000)
Fix a race condition in concurrent.futures. When a process in the
process pool was terminated abruptly (while the future was running or
pending), close the connection write end. If the call queue is
blocked on sending bytes to a worker process, closing the connection
write end interrupts the send, so the queue can be closed.

Changes:

* _ExecutorManagerThread.terminate_broken() now closes
  call_queue._writer.
* multiprocessing PipeConnection.close() now interrupts
  WaitForMultipleObjects() in _send_bytes() by cancelling the
  overlapped operation.

Lib/concurrent/futures/process.py
Lib/multiprocessing/connection.py
Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst [new file with mode: 0644]

index 9933d3d0e040ea0ec76e4274f374e0e263794014..f4b5cd1d869067dd5cafbef7c32d415790204fe2 100644 (file)
@@ -510,6 +510,10 @@ class _ExecutorManagerThread(threading.Thread):
         # https://github.com/python/cpython/issues/94777
         self.call_queue._reader.close()
 
+        # gh-107219: Close the connection writer which can unblock
+        # Queue._feed() if it was stuck in send_bytes().
+        self.call_queue._writer.close()
+
         # clean up resources
         self.join_executor_internals()
 
index 04eaea811cfbbe65c37bd37e1023d866fc5a8acc..7c425a2d8e70344a19ffc5415f19690bbc3a91c9 100644 (file)
@@ -9,6 +9,7 @@
 
 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
 
+import errno
 import io
 import os
 import sys
@@ -41,6 +42,7 @@ except ImportError:
 BUFSIZE = 8192
 # A very generous timeout when it comes to local connections...
 CONNECTION_TIMEOUT = 20.
+WSA_OPERATION_ABORTED = 995
 
 _mmap_counter = itertools.count()
 
@@ -271,12 +273,22 @@ if _winapi:
         with FILE_FLAG_OVERLAPPED.
         """
         _got_empty_message = False
+        _send_ov = None
 
         def _close(self, _CloseHandle=_winapi.CloseHandle):
+            ov = self._send_ov
+            if ov is not None:
+                # Interrupt WaitForMultipleObjects() in _send_bytes()
+                ov.cancel()
             _CloseHandle(self._handle)
 
         def _send_bytes(self, buf):
+            if self._send_ov is not None:
+                # A connection should only be used by a single thread
+                raise ValueError("concurrent send_bytes() calls "
+                                 "are not supported")
             ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
+            self._send_ov = ov
             try:
                 if err == _winapi.ERROR_IO_PENDING:
                     waitres = _winapi.WaitForMultipleObjects(
@@ -286,7 +298,13 @@ if _winapi:
                 ov.cancel()
                 raise
             finally:
+                self._send_ov = None
                 nwritten, err = ov.GetOverlappedResult(True)
+            if err == WSA_OPERATION_ABORTED:
+                # close() was called by another thread while
+                # WaitForMultipleObjects() was waiting for the overlapped
+                # operation.
+                raise OSError(errno.EPIPE, "handle is closed")
             assert err == 0
             assert nwritten == len(buf)
 
diff --git a/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst b/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst
new file mode 100644 (file)
index 0000000..10afbcf
--- /dev/null
@@ -0,0 +1,5 @@
+Fix a race condition in ``concurrent.futures``. When a process in the
+process pool was terminated abruptly (while the future was running or
+pending), close the connection write end. If the call queue is blocked on
+sending bytes to a worker process, closing the connection write end interrupts
+the send, so the queue can be closed. Patch by Victor Stinner.