]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-41273: asyncio's proactor read transport's better performance by using recv_into...
authorTony Solomonik <tony.solomonik@gmail.com>
Tue, 14 Jul 2020 19:41:24 +0000 (22:41 +0300)
committerGitHub <noreply@github.com>
Tue, 14 Jul 2020 19:41:24 +0000 (12:41 -0700)
* bpo-41273: Proactor transport read loop to use recv_into

By using recv_into instead of recv we do not allocate a new buffer each
time _loop_reading calls recv.

This betters performance for any stream using proactor (basically any
asyncio stream on windows).

* bpo-41273: Double proactor read transport buffer size

By doubling the read buffer size we get better performance.

Lib/asyncio/proactor_events.py
Lib/test/test_asyncio/test_proactor_events.py
Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst [new file with mode: 0644]

index 8338449aaa0a3ef843f18411081b6bb836b92542..d0b7100f5e05631216da4f87a1b36066b315eba7 100644 (file)
@@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
     """Transport for read pipes."""
 
     def __init__(self, loop, sock, protocol, waiter=None,
-                 extra=None, server=None):
-        self._pending_data = None
+                 extra=None, server=None, buffer_size=65536):
+        self._pending_data_length = -1
         self._paused = True
         super().__init__(loop, sock, protocol, waiter, extra, server)
 
+        self._data = bytearray(buffer_size)
         self._loop.call_soon(self._loop_reading)
         self._paused = False
 
@@ -217,12 +218,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
         if self._read_fut is None:
             self._loop.call_soon(self._loop_reading, None)
 
-        data = self._pending_data
-        self._pending_data = None
-        if data is not None:
+        length = self._pending_data_length
+        self._pending_data_length = -1
+        if length > -1:
             # Call the protocol methode after calling _loop_reading(),
             # since the protocol can decide to pause reading again.
-            self._loop.call_soon(self._data_received, data)
+            self._loop.call_soon(self._data_received, self._data[:length], length)
 
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
@@ -243,15 +244,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
         if not keep_open:
             self.close()
 
-    def _data_received(self, data):
+    def _data_received(self, data, length):
         if self._paused:
             # Don't call any protocol method while reading is paused.
             # The protocol will be called on resume_reading().
-            assert self._pending_data is None
-            self._pending_data = data
+            assert self._pending_data_length == -1
+            self._pending_data_length = length
             return
 
-        if not data:
+        if length == 0:
             self._eof_received()
             return
 
@@ -269,6 +270,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
             self._protocol.data_received(data)
 
     def _loop_reading(self, fut=None):
+        length = -1
         data = None
         try:
             if fut is not None:
@@ -277,18 +279,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
                 self._read_fut = None
                 if fut.done():
                     # deliver data later in "finally" clause
-                    data = fut.result()
+                    length = fut.result()
+                    if length == 0:
+                        # we got end-of-file so no need to reschedule a new read
+                        return
+
+                    data = self._data[:length]
                 else:
                     # the future will be replaced by next proactor.recv call
                     fut.cancel()
 
             if self._closing:
                 # since close() has been called we ignore any read data
-                data = None
-                return
-
-            if data == b'':
-                # we got end-of-file so no need to reschedule a new read
                 return
 
             # bpo-33694: buffer_updated() has currently no fast path because of
@@ -296,7 +298,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
 
             if not self._paused:
                 # reschedule a new read
-                self._read_fut = self._loop._proactor.recv(self._sock, 32768)
+                self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
         except ConnectionAbortedError as exc:
             if not self._closing:
                 self._fatal_error(exc, 'Fatal read error on pipe transport')
@@ -314,8 +316,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
             if not self._paused:
                 self._read_fut.add_done_callback(self._loop_reading)
         finally:
-            if data is not None:
-                self._data_received(data)
+            if length > -1:
+                self._data_received(data, length)
 
 
 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
index b5d1df93efd650325d50ea6fefe0353c45b55d8c..50ba4c19d425cacce08a88d7ade7508cf296ca82 100644 (file)
@@ -40,6 +40,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         self.loop._proactor = self.proactor
         self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
         self.sock = mock.Mock(socket.socket)
+        self.buffer_size = 65536
 
     def socket_transport(self, waiter=None):
         transport = _ProactorSocketTransport(self.loop, self.sock,
@@ -53,28 +54,32 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         test_utils.run_briefly(self.loop)
         self.assertIsNone(fut.result())
         self.protocol.connection_made(tr)
-        self.proactor.recv.assert_called_with(self.sock, 32768)
+        self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
 
     def test_loop_reading(self):
         tr = self.socket_transport()
         tr._loop_reading()
-        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
+        self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
         self.assertFalse(self.protocol.data_received.called)
         self.assertFalse(self.protocol.eof_received.called)
 
     def test_loop_reading_data(self):
+        buf = b'data'
         res = self.loop.create_future()
-        res.set_result(b'data')
+        res.set_result(len(buf))
 
         tr = self.socket_transport()
         tr._read_fut = res
+        tr._data[:len(buf)] = buf
         tr._loop_reading(res)
-        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
-        self.protocol.data_received.assert_called_with(b'data')
+        called_buf = bytearray(self.buffer_size)
+        called_buf[:len(buf)] = buf
+        self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
+        self.protocol.data_received.assert_called_with(bytearray(buf))
 
     def test_loop_reading_no_data(self):
         res = self.loop.create_future()
-        res.set_result(b'')
+        res.set_result(0)
 
         tr = self.socket_transport()
         self.assertRaises(AssertionError, tr._loop_reading, res)
@@ -82,12 +87,12 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         tr.close = mock.Mock()
         tr._read_fut = res
         tr._loop_reading(res)
-        self.assertFalse(self.loop._proactor.recv.called)
+        self.assertFalse(self.loop._proactor.recv_into.called)
         self.assertTrue(self.protocol.eof_received.called)
         self.assertTrue(tr.close.called)
 
     def test_loop_reading_aborted(self):
-        err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+        err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
 
         tr = self.socket_transport()
         tr._fatal_error = mock.Mock()
@@ -97,7 +102,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
                             'Fatal read error on pipe transport')
 
     def test_loop_reading_aborted_closing(self):
-        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
 
         tr = self.socket_transport()
         tr._closing = True
@@ -106,7 +111,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         self.assertFalse(tr._fatal_error.called)
 
     def test_loop_reading_aborted_is_fatal(self):
-        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
         tr = self.socket_transport()
         tr._closing = False
         tr._fatal_error = mock.Mock()
@@ -114,7 +119,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         self.assertTrue(tr._fatal_error.called)
 
     def test_loop_reading_conn_reset_lost(self):
-        err = self.loop._proactor.recv.side_effect = ConnectionResetError()
+        err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
 
         tr = self.socket_transport()
         tr._closing = False
@@ -125,7 +130,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         tr._force_close.assert_called_with(err)
 
     def test_loop_reading_exception(self):
-        err = self.loop._proactor.recv.side_effect = (OSError())
+        err = self.loop._proactor.recv_into.side_effect = (OSError())
 
         tr = self.socket_transport()
         tr._fatal_error = mock.Mock()
@@ -351,20 +356,31 @@ class ProactorSocketTransportTests(test_utils.TestCase):
 
     def test_pause_resume_reading(self):
         tr = self.socket_transport()
-        futures = []
-        for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
+        index = 0
+        msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
+        reversed_msgs = list(reversed(msgs))
+
+        def recv_into(sock, data):
             f = self.loop.create_future()
-            f.set_result(msg)
-            futures.append(f)
+            msg = reversed_msgs.pop()
+
+            result = f.result
+            def monkey():
+                data[:len(msg)] = msg
+                return result()
+            f.result = monkey
+
+            f.set_result(len(msg))
+            return f
 
-        self.loop._proactor.recv.side_effect = futures
+        self.loop._proactor.recv_into.side_effect = recv_into
         self.loop._run_once()
         self.assertFalse(tr._paused)
         self.assertTrue(tr.is_reading())
-        self.loop._run_once()
-        self.protocol.data_received.assert_called_with(b'data1')
-        self.loop._run_once()
-        self.protocol.data_received.assert_called_with(b'data2')
+
+        for msg in msgs[:2]:
+            self.loop._run_once()
+            self.protocol.data_received.assert_called_with(bytearray(msg))
 
         tr.pause_reading()
         tr.pause_reading()
@@ -372,23 +388,23 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         self.assertFalse(tr.is_reading())
         for i in range(10):
             self.loop._run_once()
-        self.protocol.data_received.assert_called_with(b'data2')
+        self.protocol.data_received.assert_called_with(bytearray(msgs[1]))
 
         tr.resume_reading()
         tr.resume_reading()
         self.assertFalse(tr._paused)
         self.assertTrue(tr.is_reading())
-        self.loop._run_once()
-        self.protocol.data_received.assert_called_with(b'data3')
-        self.loop._run_once()
-        self.protocol.data_received.assert_called_with(b'data4')
+
+        for msg in msgs[2:4]:
+            self.loop._run_once()
+            self.protocol.data_received.assert_called_with(bytearray(msg))
 
         tr.pause_reading()
         tr.resume_reading()
         self.loop.call_exception_handler = mock.Mock()
         self.loop._run_once()
         self.loop.call_exception_handler.assert_not_called()
-        self.protocol.data_received.assert_called_with(b'data5')
+        self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
         tr.close()
 
         self.assertFalse(tr.is_reading())
diff --git a/Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst b/Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst
new file mode 100644 (file)
index 0000000..c08204b
--- /dev/null
@@ -0,0 +1,3 @@
+Speed up any transport using ``_ProactorReadPipeTransport`` by calling
+``recv_into`` instead of ``recv``, thus not creating a new buffer for each
+``recv`` call in the transport's read loop.