]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] gh-103607: Fix `pause_reading` to work when called from `connection_made`...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Thu, 27 Apr 2023 10:29:54 +0000 (03:29 -0700)
committerGitHub <noreply@github.com>
Thu, 27 Apr 2023 10:29:54 +0000 (10:29 +0000)
gh-103607: Fix `pause_reading` to work when called from `connection_made` in `asyncio`. (GH-17425)
(cherry picked from commit 78942ecd9b1dbbd95e99cc298b0154fe126dac12)

Co-authored-by: Itayazolay <itayazolay@gmail.com>
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
Lib/asyncio/selector_events.py
Lib/asyncio/unix_events.py
Lib/test/test_asyncio/test_proactor_events.py
Lib/test/test_asyncio/test_selector_events.py
Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst [new file with mode: 0644]

index 8ab420d5bd719ddd4749a1405de243166e3614f0..96e61f7c3a4e9fd1dc8c392de524bb09e2650e3e 100644 (file)
@@ -783,6 +783,8 @@ class _SelectorTransport(transports._FlowControlMixin,
         self._buffer = self._buffer_factory()
         self._conn_lost = 0  # Set when call to connection_lost scheduled.
         self._closing = False  # Set when close() called.
+        self._paused = False  # Set when pause_reading() called
+
         if self._server is not None:
             self._server._attach()
         loop._transports[self._sock_fd] = self
@@ -828,6 +830,25 @@ class _SelectorTransport(transports._FlowControlMixin,
     def is_closing(self):
         return self._closing
 
+    def is_reading(self):
+        return not self.is_closing() and not self._paused
+
+    def pause_reading(self):
+        if not self.is_reading():
+            return
+        self._paused = True
+        self._loop._remove_reader(self._sock_fd)
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
+
+    def resume_reading(self):
+        if self._closing or not self._paused:
+            return
+        self._paused = False
+        self._add_reader(self._sock_fd, self._read_ready)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
+
     def close(self):
         if self._closing:
             return
@@ -887,9 +908,8 @@ class _SelectorTransport(transports._FlowControlMixin,
         return len(self._buffer)
 
     def _add_reader(self, fd, callback, *args):
-        if self._closing:
+        if not self.is_reading():
             return
-
         self._loop._add_reader(fd, callback, *args)
 
 
@@ -904,7 +924,6 @@ class _SelectorSocketTransport(_SelectorTransport):
         self._read_ready_cb = None
         super().__init__(loop, sock, protocol, extra, server)
         self._eof = False
-        self._paused = False
         self._empty_waiter = None
 
         # Disable the Nagle algorithm -- small writes will be
@@ -929,25 +948,6 @@ class _SelectorSocketTransport(_SelectorTransport):
 
         super().set_protocol(protocol)
 
-    def is_reading(self):
-        return not self._paused and not self._closing
-
-    def pause_reading(self):
-        if self._closing or self._paused:
-            return
-        self._paused = True
-        self._loop._remove_reader(self._sock_fd)
-        if self._loop.get_debug():
-            logger.debug("%r pauses reading", self)
-
-    def resume_reading(self):
-        if self._closing or not self._paused:
-            return
-        self._paused = False
-        self._add_reader(self._sock_fd, self._read_ready)
-        if self._loop.get_debug():
-            logger.debug("%r resumes reading", self)
-
     def _read_ready(self):
         self._read_ready_cb()
 
index 0495f332f3127ae7a9b5ee8bca9cc68cbd8685e9..ac4519acc4307bee85c4d02773d28df11c4f6edb 100644 (file)
@@ -482,13 +482,21 @@ class _UnixReadPipeTransport(transports.ReadTransport):
 
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop._add_reader,
+        self._loop.call_soon(self._add_reader,
                              self._fileno, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
             self._loop.call_soon(futures._set_result_unless_cancelled,
                                  waiter, None)
 
+    def _add_reader(self, fd, callback):
+        if not self.is_reading():
+            return
+        self._loop._add_reader(fd, callback)
+
+    def is_reading(self):
+        return not self._paused and not self._closing
+
     def __repr__(self):
         info = [self.__class__.__name__]
         if self._pipe is None:
@@ -529,7 +537,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
                 self._loop.call_soon(self._call_connection_lost, None)
 
     def pause_reading(self):
-        if self._closing or self._paused:
+        if not self.is_reading():
             return
         self._paused = True
         self._loop._remove_reader(self._fileno)
index ae30185cef776a046a5d9c44877fdfebd9703c9d..44fc7706abee1d44dac7d27c5f3d52ce4d142d2f 100644 (file)
@@ -444,6 +444,19 @@ class ProactorSocketTransportTests(test_utils.TestCase):
 
         self.assertFalse(tr.is_reading())
 
+    def test_pause_reading_connection_made(self):
+        tr = self.socket_transport()
+        self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
+        test_utils.run_briefly(self.loop)
+        self.assertFalse(tr.is_reading())
+        self.loop.assert_no_reader(7)
+
+        tr.resume_reading()
+        self.assertTrue(tr.is_reading())
+
+        tr.close()
+        self.assertFalse(tr.is_reading())
+
 
     def pause_writing_transport(self, high):
         tr = self.socket_transport()
index 22dcfb23083522cbc99efdb2431bc0a014140829..fcab26278235e09544381fd2ff7d19bb18568218 100644 (file)
@@ -534,6 +534,22 @@ class SelectorSocketTransportTests(test_utils.TestCase):
         self.assertFalse(tr.is_reading())
         self.loop.assert_no_reader(7)
 
+    def test_pause_reading_connection_made(self):
+        tr = self.socket_transport()
+        self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
+        test_utils.run_briefly(self.loop)
+        self.assertFalse(tr.is_reading())
+        self.loop.assert_no_reader(7)
+
+        tr.resume_reading()
+        self.assertTrue(tr.is_reading())
+        self.loop.assert_reader(7, tr._read_ready)
+
+        tr.close()
+        self.assertFalse(tr.is_reading())
+        self.loop.assert_no_reader(7)
+
+
     def test_read_eof_received_error(self):
         transport = self.socket_transport()
         transport.close = mock.Mock()
diff --git a/Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst b/Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst
new file mode 100644 (file)
index 0000000..13c054f
--- /dev/null
@@ -0,0 +1 @@
+Fix :func:`!pause_reading` to work when called from :func:`!connection_made` in :mod:`asyncio`.