From: Ben Darnell Date: Sat, 19 Apr 2014 02:16:55 +0000 (-0400) Subject: Read less aggressively in IOStream. X-Git-Tag: v4.0.0b1~91^2~25 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7335cc245b9363a454ecaf75f217f5611aa60efe;p=thirdparty%2Ftornado.git Read less aggressively in IOStream. Previously, we would drain the socket buffer into the IOStream read buffer before trying to satisfy application requests. This would result in unnecessary memory use, and on a fast network reads could fail because the buffer would fill up. Now we stop reading when we can satisfy the current request (at least for read_bytes and read_until with max_bytes; unbounded read_until still reads aggressively). This commit includes a change to IOStream close_callback semantics. Since the only way to reliably detect a closed connection (across all IOLoop implementations) is to read past the end of the stream, the IOStream will not detect a closed connection while there is some buffered data that could satisfy a future read. Closes #772. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index 5d7fb018b..386c55e9b 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -386,7 +386,11 @@ class BaseIOStream(object): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE - if state == self.io_loop.ERROR: + if state == self.io_loop.ERROR and self._read_buffer_size == 0: + # If the connection is idle, listen for reads too so + # we can tell if the connection is closed. If there is + # data in the read buffer we won't run the close callback + # yet anyway, so we don't need to listen in this case. state |= self.io_loop.READ if state != self._state: assert self._state is not None, \ @@ -439,6 +443,17 @@ class BaseIOStream(object): def _read_to_buffer_loop(self): # This method is called from _handle_read and _try_inline_read. try: + if self._read_bytes is not None: + target_bytes = self._read_bytes + elif self._read_max_bytes is not None: + target_bytes = self._read_max_bytes + elif self.reading(): + # For read_until without max_bytes, or + # read_until_close, read as much as we can before + # scanning for the delimiter. + target_bytes = None + else: + target_bytes = 0 # Pretend to have a pending callback so that an EOF in # _read_to_buffer doesn't trigger an immediate close # callback. At the end of this method we'll either @@ -458,12 +473,27 @@ class BaseIOStream(object): # try to read it. if self._read_to_buffer() == 0: break + + # If we've read all the bytes we can use, break out of + # this loop. It would be better to call + # read_from_buffer here, but + # A) this has subtle interactions with the + # pending_callback and error_listener mechanisms + # B) Simply calling read_from_buffer on every iteration + # is inefficient for delimited reads. + # TODO: restructure this so we can call read_from_buffer + # for unbounded delimited reads. + if (target_bytes is not None and + self._read_buffer_size >= target_bytes): + break finally: self._pending_callbacks -= 1 def _handle_read(self): try: self._read_to_buffer_loop() + except UnsatisfiableReadError: + raise except Exception: gen_log.warning("error on read", exc_info=True) self.close(exc_info=True) @@ -523,7 +553,12 @@ class BaseIOStream(object): raise if self._read_from_buffer(): return - self._maybe_add_error_listener() + # We couldn't satisfy the read inline, so either close the stream + # or listen for new data. + if self.closed(): + self._maybe_run_close_callback() + else: + self._add_io_state(ioloop.IOLoop.READ) def _read_to_buffer(self): """Reads from the socket and appends the result to the read buffer. @@ -680,10 +715,18 @@ class BaseIOStream(object): raise StreamClosedError("Stream is closed") def _maybe_add_error_listener(self): - if self._state is None and self._pending_callbacks == 0: + # This method is part of an optimization: to detect a connection that + # is closed when we're not actively reading or writing, we must listen + # for read events. However, it is inefficient to do this when the + # connection is first established because we are going to read or write + # immediately anyway. Instead, we insert checks at various times to + # see if the connection is idle and add the read listener then. + if self._pending_callbacks != 0: + return + if self._state is None or self._state == ioloop.IOLoop.ERROR: if self.closed(): self._maybe_run_close_callback() - else: + elif self._read_buffer_size == 0: self._add_io_state(ioloop.IOLoop.READ) def _add_io_state(self, state): diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index 893c3214d..efe357807 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -390,14 +390,18 @@ class TestIOStreamMixin(object): # Similar to test_delayed_close_callback, but read_until_close takes # a separate code path so test it separately. server, client = self.make_iostream_pair() - client.set_close_callback(self.stop) try: server.write(b"1234") server.close() - self.wait() + # Read one byte to make sure the client has received the data. + # It won't run the close callback as long as there is more buffered + # data that could satisfy a later read. + client.read_bytes(1, self.stop) + data = self.wait() + self.assertEqual(data, b"1") client.read_until_close(self.stop) data = self.wait() - self.assertEqual(data, b"1234") + self.assertEqual(data, b"234") finally: server.close() client.close() @@ -407,17 +411,18 @@ class TestIOStreamMixin(object): # All data should go through the streaming callback, # and the final read callback just gets an empty string. server, client = self.make_iostream_pair() - client.set_close_callback(self.stop) try: server.write(b"1234") server.close() - self.wait() + client.read_bytes(1, self.stop) + data = self.wait() + self.assertEqual(data, b"1") streaming_data = [] client.read_until_close(self.stop, streaming_callback=streaming_data.append) data = self.wait() self.assertEqual(b'', data) - self.assertEqual(b''.join(streaming_data), b"1234") + self.assertEqual(b''.join(streaming_data), b"234") finally: server.close() client.close() @@ -678,6 +683,36 @@ class TestIOStreamMixin(object): server.close() client.close() + def test_small_reads_from_large_buffer(self): + # 10KB buffer size, 100KB available to read. + # Read 1KB at a time and make sure that the buffer is not eagerly + # filled. + server, client = self.make_iostream_pair(max_buffer_size=10 * 1024) + try: + server.write(b"a" * 1024 * 100) + for i in range(100): + client.read_bytes(1024, self.stop) + data = self.wait() + self.assertEqual(data, b"a" * 1024) + finally: + server.close() + client.close() + + def test_small_read_untils_from_large_buffer(self): + # 10KB buffer size, 100KB available to read. + # Read 1KB at a time and make sure that the buffer is not eagerly + # filled. + server, client = self.make_iostream_pair(max_buffer_size=10 * 1024) + try: + server.write((b"a" * 1023 + b"\n") * 100) + for i in range(100): + client.read_until(b"\n", self.stop, max_bytes=4096) + data = self.wait() + self.assertEqual(data, b"a" * 1023 + b"\n") + finally: + server.close() + client.close() + class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase): def _make_client_iostream(self): return IOStream(socket.socket(), io_loop=self.io_loop)