]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-117531: Unblock getters after non-immediate queue shutdown (#117532)
authorLaurie O <laurie_opperman@hotmail.com>
Wed, 10 Apr 2024 15:01:42 +0000 (01:01 +1000)
committerGitHub <noreply@github.com>
Wed, 10 Apr 2024 15:01:42 +0000 (08:01 -0700)
(This is a small tweak of the original gh-104750 which added shutdown.)

Doc/library/queue.rst
Lib/queue.py
Lib/test/test_queue.py

index f2a6dbf589fd872388cbef1762fe4ed468de9ea9..fce23313c7de28615340a5ce08cddfa882adf748 100644 (file)
@@ -245,8 +245,10 @@ them down.
    queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise
    immediately instead.
 
-   All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate*
-   is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`.
+   All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be
+   unblocked. If *immediate* is true, a task will be marked as done for each
+   remaining item in the queue, which may unblock callers of
+   :meth:`~Queue.join`.
 
    .. versionadded:: 3.13
 
index 387ce5425879a44b9599467fa459467823400db5..25beb46e30d6bd90611453eabe5b6fb7d9a9805c 100644 (file)
@@ -239,8 +239,9 @@ class Queue:
         By default, gets will only raise once the queue is empty. Set
         'immediate' to True to make gets raise immediately instead.
 
-        All blocked callers of put() will be unblocked, and also get()
-        and join() if 'immediate'.
+        All blocked callers of put() and get() will be unblocked. If
+        'immediate', a task is marked as done for each item remaining in
+        the queue, which may unblock callers of join().
         '''
         with self.mutex:
             self.is_shutdown = True
@@ -249,9 +250,10 @@ class Queue:
                     self._get()
                     if self.unfinished_tasks > 0:
                         self.unfinished_tasks -= 1
-                self.not_empty.notify_all()
                 # release all blocked threads in `join()`
                 self.all_tasks_done.notify_all()
+            # All getters need to re-check queue-empty to raise ShutDown
+            self.not_empty.notify_all()
             self.not_full.notify_all()
 
     # Override these methods to implement other queue organizations
index c4d101101323933e3cf2887e13c75cb6fa389ce0..d5927fbf39142b3095a810ac7828370ff36c4f9f 100644 (file)
@@ -636,6 +636,23 @@ class BaseQueueTestMixin(BlockingTestMixin):
 
         self.assertEqual(results, [True]*len(thrds))
 
+    def test_shutdown_pending_get(self):
+        def get():
+            try:
+                results.append(q.get())
+            except Exception as e:
+                results.append(e)
+
+        q = self.type2test()
+        results = []
+        get_thread = threading.Thread(target=get)
+        get_thread.start()
+        q.shutdown(immediate=False)
+        get_thread.join(timeout=10.0)
+        self.assertFalse(get_thread.is_alive())
+        self.assertEqual(len(results), 1)
+        self.assertIsInstance(results[0], self.queue.ShutDown)
+
 
 class QueueTest(BaseQueueTestMixin):