else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
- if self._read_future is not None:
- assert callback is None
- future = self._read_future
- self._read_future = None
- future.set_result(self._consume(size))
+ if self._read_future is not None:
+ assert callback is None
+ future = self._read_future
+ self._read_future = None
+ future.set_result(self._consume(size))
if callback is not None:
- assert self._read_future is None
+ assert (self._read_future is None) or streaming
self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
server.close()
client.close()
+ def test_streaming_until_close_future(self):
+ server, client = self.make_iostream_pair()
+ try:
+ chunks = []
+
+ @gen.coroutine
+ def client_task():
+ yield client.read_until_close(streaming_callback=chunks.append)
+
+ @gen.coroutine
+ def server_task():
+ yield server.write(b"1234")
+ yield gen.moment
+ yield server.write(b"5678")
+ server.close()
+
+ @gen.coroutine
+ def f():
+ yield [client_task(), server_task()]
+ self.io_loop.run_sync(f)
+ self.assertEqual(chunks, [b"1234", b"5678"])
+ finally:
+ server.close()
+ client.close()
+
def test_delayed_close_callback(self):
# The scenario: Server closes the connection while there is a pending
# read that can be served out of buffered data. The client does not