]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-74116: Allow multiple drain waiters for asyncio.StreamWriter (GH-94705)
authorKumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
Mon, 29 Aug 2022 18:31:11 +0000 (00:01 +0530)
committerGitHub <noreply@github.com>
Mon, 29 Aug 2022 18:31:11 +0000 (11:31 -0700)
Lib/asyncio/streams.py
Lib/test/test_asyncio/test_streams.py
Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst [new file with mode: 0644]

index 614b2cda60682fffcbb44724f6f678ed7fc6a259..c4d837a1170819de48cf034f5394cdf8a3b62048 100644 (file)
@@ -2,6 +2,7 @@ __all__ = (
     'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
     'open_connection', 'start_server')
 
+import collections
 import socket
 import sys
 import weakref
@@ -128,7 +129,7 @@ class FlowControlMixin(protocols.Protocol):
         else:
             self._loop = loop
         self._paused = False
-        self._drain_waiter = None
+        self._drain_waiters = collections.deque()
         self._connection_lost = False
 
     def pause_writing(self):
@@ -143,38 +144,34 @@ class FlowControlMixin(protocols.Protocol):
         if self._loop.get_debug():
             logger.debug("%r resumes writing", self)
 
-        waiter = self._drain_waiter
-        if waiter is not None:
-            self._drain_waiter = None
+        for waiter in self._drain_waiters:
             if not waiter.done():
                 waiter.set_result(None)
 
     def connection_lost(self, exc):
         self._connection_lost = True
-        # Wake up the writer if currently paused.
+        # Wake up the writer(s) if currently paused.
         if not self._paused:
             return
-        waiter = self._drain_waiter
-        if waiter is None:
-            return
-        self._drain_waiter = None
-        if waiter.done():
-            return
-        if exc is None:
-            waiter.set_result(None)
-        else:
-            waiter.set_exception(exc)
+
+        for waiter in self._drain_waiters:
+            if not waiter.done():
+                if exc is None:
+                    waiter.set_result(None)
+                else:
+                    waiter.set_exception(exc)
 
     async def _drain_helper(self):
         if self._connection_lost:
             raise ConnectionResetError('Connection lost')
         if not self._paused:
             return
-        waiter = self._drain_waiter
-        assert waiter is None or waiter.cancelled()
         waiter = self._loop.create_future()
-        self._drain_waiter = waiter
-        await waiter
+        self._drain_waiters.append(waiter)
+        try:
+            await waiter
+        finally:
+            self._drain_waiters.remove(waiter)
 
     def _get_close_waiter(self, stream):
         raise NotImplementedError
index 098a0da344d0fb0adb92710d27ff80b8f640d965..0c49099bc499a58298e5121134e364f68c271971 100644 (file)
@@ -864,6 +864,25 @@ os.close(fd)
         self.assertEqual(cm.filename, __file__)
         self.assertIs(protocol._loop, self.loop)
 
+    def test_multiple_drain(self):
+        # See https://github.com/python/cpython/issues/74116
+        drained = 0
+
+        async def drainer(stream):
+            nonlocal drained
+            await stream._drain_helper()
+            drained += 1
+
+        async def main():
+            loop = asyncio.get_running_loop()
+            stream = asyncio.streams.FlowControlMixin(loop)
+            stream.pause_writing()
+            loop.call_later(0.1, stream.resume_writing)
+            await asyncio.gather(*[drainer(stream) for _ in range(10)])
+            self.assertEqual(drained, 10)
+
+        self.loop.run_until_complete(main())
+
     def test_drain_raises(self):
         # See http://bugs.python.org/issue25441
 
diff --git a/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst b/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst
new file mode 100644 (file)
index 0000000..3378259
--- /dev/null
@@ -0,0 +1 @@
+Allow :meth:`asyncio.StreamWriter.drain` to be awaited concurrently by multiple tasks. Patch by Kumar Aditya.