]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Increase performance of IOStream.read_until and read_until_regex.
authorBen Darnell <ben@bendarnell.com>
Mon, 26 Mar 2012 00:12:00 +0000 (17:12 -0700)
committerBen Darnell <ben@bendarnell.com>
Mon, 26 Mar 2012 00:12:00 +0000 (17:12 -0700)
_handle_read and _try_inline_read now only call _read_from_buffer
once, after calling _read_to_buffer as many times as they can.  This
allows the progressive _double_prefix calls in _read_from_buffer to
work as efficiently as possible.

In testing with a 4MB read, performance improved by a factor of 32 (8
seconds to 0.25 seconds)

tornado/iostream.py
tornado/test/iostream_test.py

index d435c1cb241e9ac114adff011c13d1afe70eaaf5..4e8ce0505ea8c81c55ad12db8d704bd3ed7ba705 100644 (file)
@@ -217,12 +217,16 @@ class IOStream(object):
                 self._state = None
             self.socket.close()
             self.socket = None
-            if self._close_callback and self._pending_callbacks == 0:
-                # if there are pending callbacks, don't run the close callback
-                # until they're done (see _maybe_add_error_handler)
-                cb = self._close_callback
-                self._close_callback = None
-                self._run_callback(cb)
+        self._maybe_run_close_callback()
+
+    def _maybe_run_close_callback(self):
+        if (self.socket is None and self._close_callback and
+            self._pending_callbacks == 0):
+            # if there are pending callbacks, don't run the close callback
+            # until they're done (see _maybe_add_error_handler)
+            cb = self._close_callback
+            self._close_callback = None
+            self._run_callback(cb)
 
     def reading(self):
         """Returns true if we are currently reading from the stream."""
@@ -310,22 +314,38 @@ class IOStream(object):
             self.io_loop.add_callback(wrapper)
 
     def _handle_read(self):
-        while True:
+        try:
             try:
-                # Read from the socket until we get EWOULDBLOCK or equivalent.
-                # SSL sockets do some internal buffering, and if the data is
-                # sitting in the SSL object's buffer select() and friends
-                # can't see it; the only way to find out if it's there is to
-                # try to read it.
-                result = self._read_to_buffer()
-            except Exception:
-                self.close()
-                return
-            if result == 0:
-                break
-            else:
-                if self._read_from_buffer():
-                    return
+                # Pretend to have a pending callback so that an EOF in
+                # _read_to_buffer doesn't trigger an immediate close
+                # callback.  At the end of this method we'll either
+                # estabilsh a real pending callback via
+                # _read_from_buffer or run the close callback.
+                #
+                # We need two try statements here so that
+                # pending_callbacks is decremented before the `except`
+                # clause below (which calls `close` and does need to
+                # trigger the callback)
+                self._pending_callbacks += 1
+                while True:
+                    # Read from the socket until we get EWOULDBLOCK or equivalent.
+                    # SSL sockets do some internal buffering, and if the data is
+                    # sitting in the SSL object's buffer select() and friends
+                    # can't see it; the only way to find out if it's there is to
+                    # try to read it.
+                    if self._read_to_buffer() == 0:
+                        break
+            finally:
+                self._pending_callbacks -= 1
+        except Exception:
+            logging.warning("error on read", exc_info=True)
+            self.close()
+            return
+        if self._read_from_buffer():
+            return
+        else:
+            self._maybe_run_close_callback()
+
 
     def _set_read_callback(self, callback):
         assert not self._read_callback, "Already reading"
@@ -338,13 +358,16 @@ class IOStream(object):
         read callback on the next IOLoop iteration; otherwise starts
         listening for reads on the socket.
         """
+        # See if we've already got the data from a previous read
+        if self._read_from_buffer():
+            return
+        self._check_closed()
         while True:
-            # See if we've already got the data from a previous read
-            if self._read_from_buffer():
-                return
-            self._check_closed()
             if self._read_to_buffer() == 0:
                 break
+            self._check_closed()
+        if self._read_from_buffer():
+            return
         self._add_io_state(self.io_loop.READ)
 
     def _read_from_socket(self):
@@ -520,10 +543,7 @@ class IOStream(object):
     def _maybe_add_error_listener(self):
         if self._state is None and self._pending_callbacks == 0:
             if self.socket is None:
-                cb = self._close_callback
-                if cb is not None:
-                    self._close_callback = None
-                    self._run_callback(cb)
+                self._maybe_run_close_callback()
             else:
                 self._add_io_state(ioloop.IOLoop.READ)
 
index 01f9a984f523e349c77cf789d954a29906ffa360..ac8c207aa1c09592f9e7d8c3bdfbca9230278086 100644 (file)
@@ -216,3 +216,20 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
         finally:
             server.close()
             client.close()
+
+    def test_large_read_until(self):
+        # Performance test: read_until used to have a quadratic component
+        # so a read_until of 4MB would take 8 seconds; now it takes 0.25
+        # seconds.
+        server, client = self.make_iostream_pair()
+        try:
+            NUM_KB = 4096
+            for i in xrange(NUM_KB):
+                client.write(b("A") * 1024)
+            client.write(b("\r\n"))
+            server.read_until(b("\r\n"), self.stop)
+            data = self.wait()
+            self.assertEqual(len(data), NUM_KB * 1024 + 2)
+        finally:
+            server.close()
+            client.close()