]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
iostream: Use recv_into and friends in read_from_fd
authorBen Darnell <ben@bendarnell.com>
Sun, 21 Jan 2018 00:00:30 +0000 (19:00 -0500)
committerBen Darnell <ben@bendarnell.com>
Sun, 21 Jan 2018 03:15:30 +0000 (22:15 -0500)
This has the same memory-allocation behavior as before, but it moves
the buffer out of the recv() call to python code.

tornado/iostream.py

index 3bf058749429ed49e2187e112660c72b090e6325..56b4002cba877ece34e4262b84e7b092f0018a21 100644 (file)
@@ -300,13 +300,18 @@ class BaseIOStream(object):
         """
         raise NotImplementedError()
 
-    def read_from_fd(self):
+    def read_from_fd(self, buf):
         """Attempts to read from the underlying file.
 
-        Returns ``None`` if there was nothing to read (the socket
-        returned `~errno.EWOULDBLOCK` or equivalent), otherwise
-        returns the data.  When possible, should return no more than
-        ``self.read_chunk_size`` bytes at a time.
+        Reads up to ``len(buf)`` bytes, storing them in the buffer.
+        Returns the number of bytes read. Returns None if there was
+        nothing to read (the socket returned `~errno.EWOULDBLOCK` or
+        equivalent), and zero on EOF.
+
+        .. versionchanged:: 5.0
+
+           Interface redesigned to take a buffer and return a number
+           of bytes instead of a freshly-allocated object.
         """
         raise NotImplementedError()
 
@@ -820,31 +825,40 @@ class BaseIOStream(object):
         to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
         error closes the socket and raises an exception.
         """
-        while True:
-            try:
-                chunk = self.read_from_fd()
-            except (socket.error, IOError, OSError) as e:
-                if errno_from_exception(e) == errno.EINTR:
-                    continue
-                # ssl.SSLError is a subclass of socket.error
-                if self._is_connreset(e):
-                    # Treat ECONNRESET as a connection close rather than
-                    # an error to minimize log spam  (the exception will
-                    # be available on self.error for apps that care).
+        try:
+            while True:
+                try:
+                    buf = bytearray(self.read_chunk_size)
+                    bytes_read = self.read_from_fd(buf)
+                except (socket.error, IOError, OSError) as e:
+                    if errno_from_exception(e) == errno.EINTR:
+                        continue
+                    # ssl.SSLError is a subclass of socket.error
+                    if self._is_connreset(e):
+                        # Treat ECONNRESET as a connection close rather than
+                        # an error to minimize log spam  (the exception will
+                        # be available on self.error for apps that care).
+                        self.close(exc_info=e)
+                        return
                     self.close(exc_info=e)
-                    return
-                self.close(exc_info=e)
-                raise
-            break
-        if chunk is None:
-            return 0
-        self._read_buffer += chunk
-        self._read_buffer_size += len(chunk)
+                    raise
+                break
+            if bytes_read is None:
+                return 0
+            elif bytes_read == 0:
+                self.close()
+                return 0
+            self._read_buffer += memoryview(buf)[:bytes_read]
+            self._read_buffer_size += bytes_read
+        finally:
+            # Break the reference to buf so we don't waste a chunk's worth of
+            # memory in case an exception hangs on to our stack frame.
+            buf = None
         if self._read_buffer_size > self.max_buffer_size:
             gen_log.error("Reached maximum read buffer size")
             self.close()
             raise StreamBufferFullError("Reached maximum read buffer size")
-        return len(chunk)
+        return bytes_read
 
     def _run_streaming_callback(self):
         if self._streaming_callback is not None and self._read_buffer_size:
@@ -1106,18 +1120,16 @@ class IOStream(BaseIOStream):
                                        socket.SO_ERROR)
         return socket.error(errno, os.strerror(errno))
 
-    def read_from_fd(self):
+    def read_from_fd(self, buf):
         try:
-            chunk = self.socket.recv(self.read_chunk_size)
+            return self.socket.recv_into(buf)
         except socket.error as e:
             if e.args[0] in _ERRNO_WOULDBLOCK:
                 return None
             else:
                 raise
-        if not chunk:
-            self.close()
-            return None
-        return chunk
+        finally:
+            buf = None
 
     def write_to_fd(self, data):
         try:
@@ -1528,35 +1540,29 @@ class SSLIOStream(IOStream):
             # See https://github.com/tornadoweb/tornado/pull/2008
             del data
 
-    def read_from_fd(self):
-        if self._ssl_accepting:
-            # If the handshake hasn't finished yet, there can't be anything
-            # to read (attempting to read may or may not raise an exception
-            # depending on the SSL version)
-            return None
+    def read_from_fd(self, buf):
         try:
-            # SSLSocket objects have both a read() and recv() method,
-            # while regular sockets only have recv().
-            # The recv() method blocks (at least in python 2.6) if it is
-            # called when there is nothing to read, so we have to use
-            # read() instead.
-            chunk = self.socket.read(self.read_chunk_size)
-        except ssl.SSLError as e:
-            # SSLError is a subclass of socket.error, so this except
-            # block must come first.
-            if e.args[0] == ssl.SSL_ERROR_WANT_READ:
-                return None
-            else:
-                raise
-        except socket.error as e:
-            if e.args[0] in _ERRNO_WOULDBLOCK:
+            if self._ssl_accepting:
+                # If the handshake hasn't finished yet, there can't be anything
+                # to read (attempting to read may or may not raise an exception
+                # depending on the SSL version)
                 return None
-            else:
-                raise
-        if not chunk:
-            self.close()
-            return None
-        return chunk
+            try:
+                return self.socket.recv_into(buf)
+            except ssl.SSLError as e:
+                # SSLError is a subclass of socket.error, so this except
+                # block must come first.
+                if e.args[0] == ssl.SSL_ERROR_WANT_READ:
+                    return None
+                else:
+                    raise
+            except socket.error as e:
+                if e.args[0] in _ERRNO_WOULDBLOCK:
+                    return None
+                else:
+                    raise
+        finally:
+            buf = None
 
     def _is_connreset(self, e):
         if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
@@ -1592,9 +1598,9 @@ class PipeIOStream(BaseIOStream):
             # See https://github.com/tornadoweb/tornado/pull/2008
             del data
 
-    def read_from_fd(self):
+    def read_from_fd(self, buf):
         try:
-            chunk = self._fio.read(self.read_chunk_size)
+            return self._fio.readinto(buf)
         except (IOError, OSError) as e:
             if errno_from_exception(e) == errno.EBADF:
                 # If the writing half of a pipe is closed, select will
@@ -1603,13 +1609,8 @@ class PipeIOStream(BaseIOStream):
                 return None
             else:
                 raise
-        if chunk is None:
-            # Read would block
-            return None
-        if not chunk:
-            self.close()
-            return None
-        return chunk
+        finally:
+            buf = None
 
 
 def doctests():