]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Read less aggressively in IOStream.
authorBen Darnell <ben@bendarnell.com>
Sat, 19 Apr 2014 02:16:55 +0000 (22:16 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 19 Apr 2014 18:35:25 +0000 (14:35 -0400)
Previously, we would drain the socket buffer into the IOStream read buffer
before trying to satisfy application requests.  This would result in
unnecessary memory use, and on a fast network reads could fail because the
buffer would fill up.  Now we stop reading when we can satisfy the current
request (at least for read_bytes and read_until with max_bytes; unbounded
read_until still reads aggressively).

This commit includes a change to IOStream close_callback semantics.  Since
the only way to reliably detect a closed connection (across all IOLoop
implementations) is to read past the end of the stream, the IOStream will
not detect a closed connection while there is some buffered data that could
satisfy a future read.

Closes #772.

tornado/iostream.py
tornado/test/iostream_test.py

index 5d7fb018b3c8a6b548fc381fa845d95190849765..386c55e9bdea1de1f1f7acce788841473613069e 100644 (file)
@@ -386,7 +386,11 @@ class BaseIOStream(object):
                 state |= self.io_loop.READ
             if self.writing():
                 state |= self.io_loop.WRITE
-            if state == self.io_loop.ERROR:
+            if state == self.io_loop.ERROR and self._read_buffer_size == 0:
+                # If the connection is idle, listen for reads too so
+                # we can tell if the connection is closed.  If there is
+                # data in the read buffer we won't run the close callback
+                # yet anyway, so we don't need to listen in this case.
                 state |= self.io_loop.READ
             if state != self._state:
                 assert self._state is not None, \
@@ -439,6 +443,17 @@ class BaseIOStream(object):
     def _read_to_buffer_loop(self):
         # This method is called from _handle_read and _try_inline_read.
         try:
+            if self._read_bytes is not None:
+                target_bytes = self._read_bytes
+            elif self._read_max_bytes is not None:
+                target_bytes = self._read_max_bytes
+            elif self.reading():
+                # For read_until without max_bytes, or
+                # read_until_close, read as much as we can before
+                # scanning for the delimiter.
+                target_bytes = None
+            else:
+                target_bytes = 0
             # Pretend to have a pending callback so that an EOF in
             # _read_to_buffer doesn't trigger an immediate close
             # callback.  At the end of this method we'll either
@@ -458,12 +473,27 @@ class BaseIOStream(object):
                 # try to read it.
                 if self._read_to_buffer() == 0:
                     break
+
+                # If we've read all the bytes we can use, break out of
+                # this loop.  It would be better to call
+                # read_from_buffer here, but
+                # A) this has subtle interactions with the
+                # pending_callback and error_listener mechanisms
+                # B) Simply calling read_from_buffer on every iteration
+                # is inefficient for delimited reads.
+                # TODO: restructure this so we can call read_from_buffer
+                # for unbounded delimited reads.
+                if (target_bytes is not None and
+                    self._read_buffer_size >= target_bytes):
+                    break
         finally:
             self._pending_callbacks -= 1
 
     def _handle_read(self):
         try:
             self._read_to_buffer_loop()
+        except UnsatisfiableReadError:
+            raise
         except Exception:
             gen_log.warning("error on read", exc_info=True)
             self.close(exc_info=True)
@@ -523,7 +553,12 @@ class BaseIOStream(object):
             raise
         if self._read_from_buffer():
             return
-        self._maybe_add_error_listener()
+        # We couldn't satisfy the read inline, so either close the stream
+        # or listen for new data.
+        if self.closed():
+            self._maybe_run_close_callback()
+        else:
+            self._add_io_state(ioloop.IOLoop.READ)
 
     def _read_to_buffer(self):
         """Reads from the socket and appends the result to the read buffer.
@@ -680,10 +715,18 @@ class BaseIOStream(object):
             raise StreamClosedError("Stream is closed")
 
     def _maybe_add_error_listener(self):
-        if self._state is None and self._pending_callbacks == 0:
+        # This method is part of an optimization: to detect a connection that
+        # is closed when we're not actively reading or writing, we must listen
+        # for read events.  However, it is inefficient to do this when the
+        # connection is first established because we are going to read or write
+        # immediately anyway.  Instead, we insert checks at various times to
+        # see if the connection is idle and add the read listener then.
+        if self._pending_callbacks != 0:
+            return
+        if self._state is None or self._state == ioloop.IOLoop.ERROR:
             if self.closed():
                 self._maybe_run_close_callback()
-            else:
+            elif self._read_buffer_size == 0:
                 self._add_io_state(ioloop.IOLoop.READ)
 
     def _add_io_state(self, state):
index 893c3214d32e6c20d51403bbb785b03dae122e48..efe35780788a2d4dceedbb6dea44b9e594055137 100644 (file)
@@ -390,14 +390,18 @@ class TestIOStreamMixin(object):
         # Similar to test_delayed_close_callback, but read_until_close takes
         # a separate code path so test it separately.
         server, client = self.make_iostream_pair()
-        client.set_close_callback(self.stop)
         try:
             server.write(b"1234")
             server.close()
-            self.wait()
+            # Read one byte to make sure the client has received the data.
+            # It won't run the close callback as long as there is more buffered
+            # data that could satisfy a later read.
+            client.read_bytes(1, self.stop)
+            data = self.wait()
+            self.assertEqual(data, b"1")
             client.read_until_close(self.stop)
             data = self.wait()
-            self.assertEqual(data, b"1234")
+            self.assertEqual(data, b"234")
         finally:
             server.close()
             client.close()
@@ -407,17 +411,18 @@ class TestIOStreamMixin(object):
         # All data should go through the streaming callback,
         # and the final read callback just gets an empty string.
         server, client = self.make_iostream_pair()
-        client.set_close_callback(self.stop)
         try:
             server.write(b"1234")
             server.close()
-            self.wait()
+            client.read_bytes(1, self.stop)
+            data = self.wait()
+            self.assertEqual(data, b"1")
             streaming_data = []
             client.read_until_close(self.stop,
                                     streaming_callback=streaming_data.append)
             data = self.wait()
             self.assertEqual(b'', data)
-            self.assertEqual(b''.join(streaming_data), b"1234")
+            self.assertEqual(b''.join(streaming_data), b"234")
         finally:
             server.close()
             client.close()
@@ -678,6 +683,36 @@ class TestIOStreamMixin(object):
             server.close()
             client.close()
 
+    def test_small_reads_from_large_buffer(self):
+        # 10KB buffer size, 100KB available to read.
+        # Read 1KB at a time and make sure that the buffer is not eagerly
+        # filled.
+        server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        try:
+            server.write(b"a" * 1024 * 100)
+            for i in range(100):
+                client.read_bytes(1024, self.stop)
+                data = self.wait()
+                self.assertEqual(data, b"a" * 1024)
+        finally:
+            server.close()
+            client.close()
+
+    def test_small_read_untils_from_large_buffer(self):
+        # 10KB buffer size, 100KB available to read.
+        # Read 1KB at a time and make sure that the buffer is not eagerly
+        # filled.
+        server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        try:
+            server.write((b"a" * 1023 + b"\n") * 100)
+            for i in range(100):
+                client.read_until(b"\n", self.stop, max_bytes=4096)
+                data = self.wait()
+                self.assertEqual(data, b"a" * 1023 + b"\n")
+        finally:
+            server.close()
+            client.close()
+
 class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
     def _make_client_iostream(self):
         return IOStream(socket.socket(), io_loop=self.io_loop)