state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
- if state == self.io_loop.ERROR:
+ if state == self.io_loop.ERROR and self._read_buffer_size == 0:
+ # If the connection is idle, listen for reads too so
+ # we can tell if the connection is closed. If there is
+ # data in the read buffer we won't run the close callback
+ # yet anyway, so we don't need to listen in this case.
state |= self.io_loop.READ
if state != self._state:
assert self._state is not None, \
def _read_to_buffer_loop(self):
# This method is called from _handle_read and _try_inline_read.
try:
+ if self._read_bytes is not None:
+ target_bytes = self._read_bytes
+ elif self._read_max_bytes is not None:
+ target_bytes = self._read_max_bytes
+ elif self.reading():
+ # For read_until without max_bytes, or
+ # read_until_close, read as much as we can before
+ # scanning for the delimiter.
+ target_bytes = None
+ else:
+ target_bytes = 0
# Pretend to have a pending callback so that an EOF in
# _read_to_buffer doesn't trigger an immediate close
# callback. At the end of this method we'll either
# try to read it.
if self._read_to_buffer() == 0:
break
+
+ # If we've read all the bytes we can use, break out of
+ # this loop. It would be better to call
+ # read_from_buffer here, but
+ # A) this has subtle interactions with the
+ # pending_callback and error_listener mechanisms
+ # B) Simply calling read_from_buffer on every iteration
+ # is inefficient for delimited reads.
+ # TODO: restructure this so we can call read_from_buffer
+ # for unbounded delimited reads.
+ if (target_bytes is not None and
+ self._read_buffer_size >= target_bytes):
+ break
finally:
self._pending_callbacks -= 1
def _handle_read(self):
try:
self._read_to_buffer_loop()
+ except UnsatisfiableReadError:
+ raise
except Exception:
gen_log.warning("error on read", exc_info=True)
self.close(exc_info=True)
raise
if self._read_from_buffer():
return
- self._maybe_add_error_listener()
+ # We couldn't satisfy the read inline, so either close the stream
+ # or listen for new data.
+ if self.closed():
+ self._maybe_run_close_callback()
+ else:
+ self._add_io_state(ioloop.IOLoop.READ)
def _read_to_buffer(self):
"""Reads from the socket and appends the result to the read buffer.
raise StreamClosedError("Stream is closed")
def _maybe_add_error_listener(self):
- if self._state is None and self._pending_callbacks == 0:
+ # This method is part of an optimization: to detect a connection that
+ # is closed when we're not actively reading or writing, we must listen
+ # for read events. However, it is inefficient to do this when the
+ # connection is first established because we are going to read or write
+ # immediately anyway. Instead, we insert checks at various times to
+ # see if the connection is idle and add the read listener then.
+ if self._pending_callbacks != 0:
+ return
+ if self._state is None or self._state == ioloop.IOLoop.ERROR:
if self.closed():
self._maybe_run_close_callback()
- else:
+ elif self._read_buffer_size == 0:
self._add_io_state(ioloop.IOLoop.READ)
def _add_io_state(self, state):
# Similar to test_delayed_close_callback, but read_until_close takes
# a separate code path so test it separately.
server, client = self.make_iostream_pair()
- client.set_close_callback(self.stop)
try:
server.write(b"1234")
server.close()
- self.wait()
+ # Read one byte to make sure the client has received the data.
+ # It won't run the close callback as long as there is more buffered
+ # data that could satisfy a later read.
+ client.read_bytes(1, self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"1")
client.read_until_close(self.stop)
data = self.wait()
- self.assertEqual(data, b"1234")
+ self.assertEqual(data, b"234")
finally:
server.close()
client.close()
# All data should go through the streaming callback,
# and the final read callback just gets an empty string.
server, client = self.make_iostream_pair()
- client.set_close_callback(self.stop)
try:
server.write(b"1234")
server.close()
- self.wait()
+ client.read_bytes(1, self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"1")
streaming_data = []
client.read_until_close(self.stop,
streaming_callback=streaming_data.append)
data = self.wait()
self.assertEqual(b'', data)
- self.assertEqual(b''.join(streaming_data), b"1234")
+ self.assertEqual(b''.join(streaming_data), b"234")
finally:
server.close()
client.close()
server.close()
client.close()
+ def test_small_reads_from_large_buffer(self):
+ # 10KB buffer size, 100KB available to read.
+ # Read 1KB at a time and make sure that the buffer is not eagerly
+ # filled.
+ server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ try:
+ server.write(b"a" * 1024 * 100)
+ for i in range(100):
+ client.read_bytes(1024, self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"a" * 1024)
+ finally:
+ server.close()
+ client.close()
+
+ def test_small_read_untils_from_large_buffer(self):
+ # 10KB buffer size, 100KB available to read.
+ # Read 1KB at a time and make sure that the buffer is not eagerly
+ # filled.
+ server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ try:
+ server.write((b"a" * 1023 + b"\n") * 100)
+ for i in range(100):
+ client.read_until(b"\n", self.stop, max_bytes=4096)
+ data = self.wait()
+ self.assertEqual(data, b"a" * 1023 + b"\n")
+ finally:
+ server.close()
+ client.close()
+
class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
def _make_client_iostream(self):
return IOStream(socket.socket(), io_loop=self.io_loop)