]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and...
authorGéry Ogam <gery.ogam@gmail.com>
Tue, 3 May 2022 23:49:57 +0000 (01:49 +0200)
committerGitHub <noreply@github.com>
Tue, 3 May 2022 23:49:57 +0000 (17:49 -0600)
Lib/multiprocessing/queues.py
Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst [new file with mode: 0644]

index a2901814876d6c0a64c50903e778812b5eace037..f37f114a968871ba367ac33d5f0b109da8869834 100644 (file)
@@ -139,13 +139,10 @@ class Queue(object):
 
     def close(self):
         self._closed = True
-        try:
-            self._reader.close()
-        finally:
-            close = self._close
-            if close:
-                self._close = None
-                close()
+        close = self._close
+        if close:
+            self._close = None
+            close()
 
     def join_thread(self):
         debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ class Queue(object):
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send_bytes,
-                  self._wlock, self._writer.close, self._ignore_epipe,
-                  self._on_queue_feeder_error, self._sem),
+                  self._wlock, self._reader.close, self._writer.close,
+                  self._ignore_epipe, self._on_queue_feeder_error,
+                  self._sem),
             name='QueueFeederThread'
         )
         self._thread.daemon = True
@@ -211,8 +209,8 @@ class Queue(object):
             notempty.notify()
 
     @staticmethod
-    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
-              onerror, queue_sem):
+    def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+              writer_close, ignore_epipe, onerror, queue_sem):
         debug('starting thread to feed data to pipe')
         nacquire = notempty.acquire
         nrelease = notempty.release
@@ -238,7 +236,8 @@ class Queue(object):
                         obj = bpopleft()
                         if obj is sentinel:
                             debug('feeder thread got sentinel -- exiting')
-                            close()
+                            reader_close()
+                            writer_close()
                             return
 
                         # serialize the data before acquiring the lock
diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
new file mode 100644 (file)
index 0000000..cc05467
--- /dev/null
@@ -0,0 +1,4 @@
+Always close the read end of the pipe used by :class:`multiprocessing.Queue`
+*after* the last write of buffered data to the write end of the pipe to avoid
+:exc:`BrokenPipeError` at garbage collection and at
+:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.