]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Decouple read_from_buffer's search for the endpoint from consuming the data.
authorBen Darnell <ben@bendarnell.com>
Mon, 12 May 2014 03:43:55 +0000 (23:43 -0400)
committerBen Darnell <ben@bendarnell.com>
Mon, 12 May 2014 03:43:55 +0000 (23:43 -0400)
This lets us call find_read_pos from read_to_buffer_loop, avoiding some
unnecessary reads (e.g. it previously took a minimum of two recv calls
to serve an http request, but now we can do it in one).

tornado/iostream.py

index e5423fe6926945ae19c4a6d7675c29c5bfda5e3b..7f2e364999e31fa14c0f45a058b7fdee395495be 100644 (file)
@@ -493,6 +493,7 @@ class BaseIOStream(object):
                 target_bytes = None
             else:
                 target_bytes = 0
+            next_find_pos = 0
             # Pretend to have a pending callback so that an EOF in
             # _read_to_buffer doesn't trigger an immediate close
             # callback.  At the end of this method we'll either
@@ -513,31 +514,42 @@ class BaseIOStream(object):
                 if self._read_to_buffer() == 0:
                     break
 
+                self._run_streaming_callback()
+
                 # If we've read all the bytes we can use, break out of
-                # this loop.  It would be better to call
-                # read_from_buffer here, but
-                # A) this has subtle interactions with the
-                # pending_callback and error_listener mechanisms
-                # B) Simply calling read_from_buffer on every iteration
-                # is inefficient for delimited reads.
-                # TODO: restructure this so we can call read_from_buffer
-                # for unbounded delimited reads.
+                # this loop.  We can't just call read_from_buffer here
+                # because of subtle interactions with the
+                # pending_callback and error_listener mechanisms.
+                #
+                # If we've reached target_bytes, we know we're done.
                 if (target_bytes is not None and
                     self._read_buffer_size >= target_bytes):
                     break
+
+                # Otherwise, we need to call the more expensive find_read_pos.
+                # It's inefficient to do this on every read, so instead
+                # do it on the first read and whenever the read buffer
+                # size has doubled.
+                if self._read_buffer_size >= next_find_pos:
+                    pos = self._find_read_pos()
+                    if pos is not None:
+                        return pos
+                    next_find_pos = self._read_buffer_size * 2
+            return self._find_read_pos()
         finally:
             self._pending_callbacks -= 1
 
     def _handle_read(self):
         try:
-            self._read_to_buffer_loop()
+            pos = self._read_to_buffer_loop()
         except UnsatisfiableReadError:
             raise
         except Exception:
             gen_log.warning("error on read", exc_info=True)
             self.close(exc_info=True)
             return
-        if self._read_from_buffer():
+        if pos is not None:
+            self._read_from_buffer(pos)
             return
         else:
             self._maybe_run_close_callback()
@@ -578,11 +590,14 @@ class BaseIOStream(object):
         listening for reads on the socket.
         """
         # See if we've already got the data from a previous read
-        if self._read_from_buffer():
+        self._run_streaming_callback()
+        pos = self._find_read_pos()
+        if pos is not None:
+            self._read_from_buffer(pos)
             return
         self._check_closed()
         try:
-            self._read_to_buffer_loop()
+            pos = self._read_to_buffer_loop()
         except Exception:
             # If there was an in _read_to_buffer, we called close() already,
             # but couldn't run the close callback because of _pending_callbacks.
@@ -590,7 +605,8 @@ class BaseIOStream(object):
             # applicable.
             self._maybe_run_close_callback()
             raise
-        if self._read_from_buffer():
+        if pos is not None:
+            self._read_from_buffer(pos)
             return
         # We couldn't satisfy the read inline, so either close the stream
         # or listen for new data.
@@ -628,25 +644,36 @@ class BaseIOStream(object):
             raise IOError("Reached maximum read buffer size")
         return len(chunk)
 
-    def _read_from_buffer(self):
-        """Attempts to complete the currently-pending read from the buffer.
-
-        Returns True if the read was completed.
-        """
+    def _run_streaming_callback(self):
         if self._streaming_callback is not None and self._read_buffer_size:
             bytes_to_consume = self._read_buffer_size
             if self._read_bytes is not None:
                 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
                 self._read_bytes -= bytes_to_consume
             self._run_read_callback(bytes_to_consume, True)
+
+    def _read_from_buffer(self, pos):
+        """Attempts to complete the currently-pending read from the buffer.
+
+        The argument is either a position in the read buffer or None,
+        as returned by _find_read_pos.
+        """
+        self._read_bytes = self._read_delimiter = self._read_regex = None
+        self._read_partial = False
+        self._run_read_callback(pos, False)
+
+    def _find_read_pos(self):
+        """Attempts to find a position in the read buffer that satisfies
+        the currently-pending read.
+
+        Returns a position in the buffer if the current read can be satisfied,
+        or None if it cannot.
+        """
         if (self._read_bytes is not None and
             (self._read_buffer_size >= self._read_bytes or
              (self._read_partial and self._read_buffer_size > 0))):
             num_bytes = min(self._read_bytes, self._read_buffer_size)
-            self._read_bytes = None
-            self._read_partial = False
-            self._run_read_callback(num_bytes, False)
-            return True
+            return num_bytes
         elif self._read_delimiter is not None:
             # Multi-byte delimiters (e.g. '\r\n') may straddle two
             # chunks in the read buffer, so we can't easily find them
@@ -663,9 +690,7 @@ class BaseIOStream(object):
                         delimiter_len = len(self._read_delimiter)
                         self._check_max_bytes(self._read_delimiter,
                                               loc + delimiter_len)
-                        self._read_delimiter = None
-                        self._run_read_callback(loc + delimiter_len, False)
-                        return True
+                        return loc + delimiter_len
                     if len(self._read_buffer) == 1:
                         break
                     _double_prefix(self._read_buffer)
@@ -677,15 +702,13 @@ class BaseIOStream(object):
                     m = self._read_regex.search(self._read_buffer[0])
                     if m is not None:
                         self._check_max_bytes(self._read_regex, m.end())
-                        self._read_regex = None
-                        self._run_read_callback(m.end(), False)
-                        return True
+                        return m.end()
                     if len(self._read_buffer) == 1:
                         break
                     _double_prefix(self._read_buffer)
                 self._check_max_bytes(self._read_regex,
                                       len(self._read_buffer[0]))
-        return False
+        return None
 
     def _check_max_bytes(self, delimiter, size):
         if (self._read_max_bytes is not None and