stream.read_bytes(1)
-class TestIOStreamMixin(object):
- def _make_server_iostream(self, connection, **kwargs):
- raise NotImplementedError()
-
- def _make_client_iostream(self, connection, **kwargs):
- raise NotImplementedError()
+class TestReadWriteMixin(object):
+ # Tests where one stream reads and the other writes.
+ # These should work for BaseIOStream implementations.
def make_iostream_pair(self, **kwargs):
- listener, port = bind_unused_port()
- streams = [None, None]
-
- def accept_callback(connection, address):
- streams[0] = self._make_server_iostream(connection, **kwargs)
- self.stop()
-
- def connect_callback():
- streams[1] = client_stream
- self.stop()
- netutil.add_accept_handler(listener, accept_callback)
- client_stream = self._make_client_iostream(socket.socket(), **kwargs)
- client_stream.connect(('127.0.0.1', port),
- callback=connect_callback)
- self.wait(condition=lambda: all(streams))
- self.io_loop.remove_handler(listener.fileno())
- listener.close()
- return streams
-
- def test_streaming_callback_with_data_in_buffer(self):
- server, client = self.make_iostream_pair()
- client.write(b"abcd\r\nefgh")
- server.read_until(b"\r\n", self.stop)
- data = self.wait()
- self.assertEqual(data, b"abcd\r\n")
-
- def closed_callback(chunk):
- self.fail()
- server.read_until_close(callback=closed_callback,
- streaming_callback=self.stop)
- # self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
- data = self.wait()
- self.assertEqual(data, b"efgh")
- server.close()
- client.close()
+ raise NotImplementedError
def test_write_zero_bytes(self):
# Attempting to write zero bytes should run the callback without
# going into an infinite loop.
- server, client = self.make_iostream_pair()
- server.write(b'', callback=self.stop)
+ rs, ws = self.make_iostream_pair()
+ ws.write(b'', callback=self.stop)
self.wait()
- server.close()
- client.close()
-
- def test_connection_refused(self):
- # When a connection is refused, the connect callback should not
- # be run. (The kqueue IOLoop used to behave differently from the
- # epoll IOLoop in this respect)
- cleanup_func, port = refusing_port()
- self.addCleanup(cleanup_func)
- stream = IOStream(socket.socket())
- self.connect_called = False
-
- def connect_callback():
- self.connect_called = True
- self.stop()
- stream.set_close_callback(self.stop)
- # log messages vary by platform and ioloop implementation
- with ExpectLog(gen_log, ".*", required=False):
- stream.connect(("127.0.0.1", port), connect_callback)
- self.wait()
- self.assertFalse(self.connect_called)
- self.assertTrue(isinstance(stream.error, socket.error), stream.error)
- if sys.platform != 'cygwin':
- _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
- if hasattr(errno, "WSAECONNREFUSED"):
- _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
- # cygwin's errnos don't match those used on native windows python
- self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
-
- @unittest.skipIf(mock is None, 'mock package not present')
- def test_gaierror(self):
- # Test that IOStream sets its exc_info on getaddrinfo error.
- # It's difficult to reliably trigger a getaddrinfo error;
- # some resolvers own't even return errors for malformed names,
- # so we mock it instead. If IOStream changes to call a Resolver
- # before sock.connect, the mock target will need to change too.
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- stream = IOStream(s)
- stream.set_close_callback(self.stop)
- with mock.patch('socket.socket.connect',
- side_effect=socket.gaierror(errno.EIO, 'boom')):
- with ExpectLog(gen_log, "Connect error"):
- stream.connect(('localhost', 80), callback=self.stop)
- self.wait()
- self.assertIsInstance(stream.error, socket.gaierror)
-
- def test_read_callback_error(self):
- # Test that IOStream sets its exc_info when a read callback throws
- server, client = self.make_iostream_pair()
- try:
- server.set_close_callback(self.stop)
- with ExpectLog(
- app_log, "(Uncaught exception|Exception in callback)"
- ):
- # Clear ExceptionStackContext so IOStream catches error
- with NullContext():
- server.read_bytes(1, callback=lambda data: 1 / 0)
- client.write(b"1")
- self.wait()
- self.assertTrue(isinstance(server.error, ZeroDivisionError))
- finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_streaming_callback(self):
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
chunks = []
final_called = []
self.assertFalse(data)
final_called.append(True)
self.stop()
- server.read_bytes(6, callback=final_callback,
- streaming_callback=streaming_callback)
- client.write(b"1234")
+ rs.read_bytes(6, callback=final_callback,
+ streaming_callback=streaming_callback)
+ ws.write(b"1234")
self.wait(condition=lambda: chunks)
- client.write(b"5678")
+ ws.write(b"5678")
self.wait(condition=lambda: final_called)
self.assertEqual(chunks, [b"1234", b"56"])
# the rest of the last chunk is still in the buffer
- server.read_bytes(2, callback=self.stop)
+ rs.read_bytes(2, callback=self.stop)
data = self.wait()
self.assertEqual(data, b"78")
finally:
- server.close()
- client.close()
+ rs.close()
+ ws.close()
+
+ def test_streaming_callback_with_data_in_buffer(self):
+ rs, ws = self.make_iostream_pair()
+ ws.write(b"abcd\r\nefgh")
+ rs.read_until(b"\r\n", self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"abcd\r\n")
+
+ def closed_callback(chunk):
+ self.fail()
+ rs.read_until_close(callback=closed_callback,
+ streaming_callback=self.stop)
+ #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"efgh")
+ rs.close()
+ ws.close()
def test_streaming_until_close(self):
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
chunks = []
closed = [False]
assert not data, data
closed[0] = True
self.stop()
- client.read_until_close(callback=close_callback,
- streaming_callback=streaming_callback)
- server.write(b"1234")
+ rs.read_until_close(callback=close_callback,
+ streaming_callback=streaming_callback)
+ ws.write(b"1234")
self.wait(condition=lambda: len(chunks) == 1)
- server.write(b"5678", self.stop)
+ ws.write(b"5678", self.stop)
self.wait()
- server.close()
+ ws.close()
self.wait(condition=lambda: closed[0])
self.assertEqual(chunks, [b"1234", b"5678"])
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_streaming_until_close_future(self):
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
chunks = []
@gen.coroutine
- def client_task():
- yield client.read_until_close(streaming_callback=chunks.append)
+ def rs_task():
+ yield rs.read_until_close(streaming_callback=chunks.append)
@gen.coroutine
- def server_task():
- yield server.write(b"1234")
+ def ws_task():
+ yield ws.write(b"1234")
yield gen.sleep(0.01)
- yield server.write(b"5678")
- server.close()
+ yield ws.write(b"5678")
+ ws.close()
@gen.coroutine
def f():
- yield [client_task(), server_task()]
+ yield [rs_task(), ws_task()]
self.io_loop.run_sync(f)
self.assertEqual(chunks, [b"1234", b"5678"])
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_delayed_close_callback(self):
# The scenario: Server closes the connection while there is a pending
# read that can be served out of buffered data. The client does not
# run the close_callback as soon as it detects the close, but rather
# defers it until after the buffered read has finished.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
- client.set_close_callback(self.stop)
- server.write(b"12")
+ rs.set_close_callback(self.stop)
+ ws.write(b"12")
chunks = []
def callback1(data):
chunks.append(data)
- client.read_bytes(1, callback2)
- server.close()
+ rs.read_bytes(1, callback2)
+ ws.close()
def callback2(data):
chunks.append(data)
- client.read_bytes(1, callback1)
+ rs.read_bytes(1, callback1)
self.wait() # stopped by close_callback
self.assertEqual(chunks, [b"1", b"2"])
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_future_delayed_close_callback(self):
# Same as test_delayed_close_callback, but with the future interface.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
# We can't call make_iostream_pair inside a gen_test function
# because the ioloop is not reentrant.
@gen_test
def f(self):
- server.write(b"12")
+ ws.write(b"12")
chunks = []
- chunks.append((yield client.read_bytes(1)))
- server.close()
- chunks.append((yield client.read_bytes(1)))
+ chunks.append((yield rs.read_bytes(1)))
+ ws.close()
+ chunks.append((yield rs.read_bytes(1)))
self.assertEqual(chunks, [b"1", b"2"])
try:
f(self)
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_close_buffered_data(self):
# Similar to the previous test, but with data stored in the OS's
#
# This depends on the read_chunk_size being smaller than the
# OS socket buffer, so make it small.
- server, client = self.make_iostream_pair(read_chunk_size=256)
+ rs, ws = self.make_iostream_pair(read_chunk_size=256)
try:
- server.write(b"A" * 512)
- client.read_bytes(256, self.stop)
+ ws.write(b"A" * 512)
+ rs.read_bytes(256, self.stop)
data = self.wait()
self.assertEqual(b"A" * 256, data)
- server.close()
- # Allow the close to propagate to the client side of the
+ ws.close()
+ # Allow the close to propagate to the `rs` side of the
# connection. Using add_callback instead of add_timeout
# doesn't seem to work, even with multiple iterations
self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
self.wait()
- client.read_bytes(256, self.stop)
+ rs.read_bytes(256, self.stop)
data = self.wait()
self.assertEqual(b"A" * 256, data)
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_close_after_close(self):
# Similar to test_delayed_close_callback, but read_until_close takes
# a separate code path so test it separately.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
- server.write(b"1234")
- server.close()
+ ws.write(b"1234")
+ ws.close()
# Read one byte to make sure the client has received the data.
# It won't run the close callback as long as there is more buffered
# data that could satisfy a later read.
- client.read_bytes(1, self.stop)
+ rs.read_bytes(1, self.stop)
data = self.wait()
self.assertEqual(data, b"1")
- client.read_until_close(self.stop)
+ rs.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b"234")
finally:
- server.close()
- client.close()
-
- @unittest.skipIf(mock is None, 'mock package not present')
- def test_read_until_close_with_error(self):
- server, client = self.make_iostream_pair()
- try:
- with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
- side_effect=IOError('boom')):
- with self.assertRaisesRegexp(IOError, 'boom'):
- client.read_until_close(self.stop)
- finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_streaming_read_until_close_after_close(self):
# Same as the preceding test but with a streaming_callback.
# All data should go through the streaming callback,
# and the final read callback just gets an empty string.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
- server.write(b"1234")
- server.close()
- client.read_bytes(1, self.stop)
+ ws.write(b"1234")
+ ws.close()
+ rs.read_bytes(1, self.stop)
data = self.wait()
self.assertEqual(data, b"1")
streaming_data = []
- client.read_until_close(self.stop,
- streaming_callback=streaming_data.append)
+ rs.read_until_close(self.stop,
+ streaming_callback=streaming_data.append)
data = self.wait()
self.assertEqual(b'', data)
self.assertEqual(b''.join(streaming_data), b"234")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
# This test fails on pypy with ssl. I think it's because
# pypy's gc defeats moves objects, breaking the
# "frozen write buffer" assumption.
- if (isinstance(server, SSLIOStream) and
+ if (isinstance(rs, SSLIOStream) and
platform.python_implementation() == 'PyPy'):
raise unittest.SkipTest(
"pypy gc causes problems with openssl")
NUM_KB = 4096
for i in range(NUM_KB):
- client.write(b"A" * 1024)
- client.write(b"\r\n")
- server.read_until(b"\r\n", self.stop)
+ ws.write(b"A" * 1024)
+ ws.write(b"\r\n")
+ rs.read_until(b"\r\n", self.stop)
data = self.wait()
self.assertEqual(len(data), NUM_KB * 1024 + 2)
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_close_callback_with_pending_read(self):
# Regression test for a bug that was introduced in 2.3
# where the IOStream._close_callback would never be called
# if there were pending reads.
OK = b"OK\r\n"
- server, client = self.make_iostream_pair()
- client.set_close_callback(self.stop)
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(self.stop)
try:
- server.write(OK)
- client.read_until(b"\r\n", self.stop)
+ ws.write(OK)
+ rs.read_until(b"\r\n", self.stop)
res = self.wait()
self.assertEqual(res, OK)
- server.close()
- client.read_until(b"\r\n", lambda x: x)
+ ws.close()
+ rs.read_until(b"\r\n", lambda x: x)
# If _close_callback (self.stop) is not called,
# an AssertionError: Async operation timed out after 5 seconds
# will be raised.
res = self.wait()
self.assertTrue(res is None)
finally:
- server.close()
- client.close()
-
- @skipIfNonUnix
- @skipPypy3V58
- def test_inline_read_error(self):
- # An error on an inline read is raised without logging (on the
- # assumption that it will eventually be noticed or logged further
- # up the stack).
- #
- # This test is posix-only because windows os.close() doesn't work
- # on socket FDs, but we can't close the socket object normally
- # because we won't get the error we want if the socket knows
- # it's closed.
- server, client = self.make_iostream_pair()
- try:
- os.close(server.socket.fileno())
- with self.assertRaises(socket.error):
- server.read_bytes(1, lambda data: None)
- finally:
- server.close()
- client.close()
-
- @skipPypy3V58
- def test_async_read_error_logging(self):
- # Socket errors on asynchronous reads should be logged (but only
- # once).
- server, client = self.make_iostream_pair()
- server.set_close_callback(self.stop)
- try:
- # Start a read that will be fulfilled asynchronously.
- server.read_bytes(1, lambda data: None)
- client.write(b'a')
- # Stub out read_from_fd to make it fail.
-
- def fake_read_from_fd():
- os.close(server.socket.fileno())
- server.__class__.read_from_fd(server)
- server.read_from_fd = fake_read_from_fd
- # This log message is from _handle_read (not read_from_fd).
- with ExpectLog(gen_log, "error on read"):
- self.wait()
- finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
closed = [False]
def close_callback():
closed[0] = True
self.stop()
- server.set_close_callback(close_callback)
+ rs.set_close_callback(close_callback)
try:
- client.write(b'a')
- future = server.read_bytes(1)
+ ws.write(b'a')
+ future = rs.read_bytes(1)
self.io_loop.add_future(future, self.stop)
self.assertEqual(self.wait().result(), b'a')
self.assertFalse(closed[0])
- client.close()
+ ws.close()
self.wait()
self.assertTrue(closed[0])
finally:
- server.close()
- client.close()
+ rs.close()
+ ws.close()
def test_write_memoryview(self):
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
- client.read_bytes(4, self.stop)
- server.write(memoryview(b"hello"))
+ rs.read_bytes(4, self.stop)
+ ws.write(memoryview(b"hello"))
data = self.wait()
self.assertEqual(data, b"hell")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_bytes_partial(self):
- server, client = self.make_iostream_pair()
+ rs, ws = self.make_iostream_pair()
try:
# Ask for more than is available with partial=True
- client.read_bytes(50, self.stop, partial=True)
- server.write(b"hello")
+ rs.read_bytes(50, self.stop, partial=True)
+ ws.write(b"hello")
data = self.wait()
self.assertEqual(data, b"hello")
# Ask for less than what is available; num_bytes is still
# respected.
- client.read_bytes(3, self.stop, partial=True)
- server.write(b"world")
+ rs.read_bytes(3, self.stop, partial=True)
+ ws.write(b"world")
data = self.wait()
self.assertEqual(data, b"wor")
# Partial reads won't return an empty string, but read_bytes(0)
# will.
- client.read_bytes(0, self.stop, partial=True)
+ rs.read_bytes(0, self.stop, partial=True)
data = self.wait()
self.assertEqual(data, b'')
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_max_bytes(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Extra room under the limit
- client.read_until(b"def", self.stop, max_bytes=50)
- server.write(b"abcdef")
+ rs.read_until(b"def", self.stop, max_bytes=50)
+ ws.write(b"abcdef")
data = self.wait()
self.assertEqual(data, b"abcdef")
# Just enough space
- client.read_until(b"def", self.stop, max_bytes=6)
- server.write(b"abcdef")
+ rs.read_until(b"def", self.stop, max_bytes=6)
+ ws.write(b"abcdef")
data = self.wait()
self.assertEqual(data, b"abcdef")
# Not enough space, but we don't know it until all we can do is
# log a warning and close the connection.
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until(b"def", self.stop, max_bytes=5)
- server.write(b"123456")
+ rs.read_until(b"def", self.stop, max_bytes=5)
+ ws.write(b"123456")
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_max_bytes_inline(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Similar to the error case in the previous test, but the
- # server writes first so client reads are satisfied
+ # ws writes first so rs reads are satisfied
# inline. For consistency with the out-of-line case, we
# do not raise the error synchronously.
- server.write(b"123456")
+ ws.write(b"123456")
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until(b"def", self.stop, max_bytes=5)
+ rs.read_until(b"def", self.stop, max_bytes=5)
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_max_bytes_ignores_extra(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Even though data that matches arrives the same packet that
# puts us over the limit, we fail the request because it was not
# found within the limit.
- server.write(b"abcdef")
+ ws.write(b"abcdef")
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until(b"def", self.stop, max_bytes=5)
+ rs.read_until(b"def", self.stop, max_bytes=5)
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_regex_max_bytes(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Extra room under the limit
- client.read_until_regex(b"def", self.stop, max_bytes=50)
- server.write(b"abcdef")
+ rs.read_until_regex(b"def", self.stop, max_bytes=50)
+ ws.write(b"abcdef")
data = self.wait()
self.assertEqual(data, b"abcdef")
# Just enough space
- client.read_until_regex(b"def", self.stop, max_bytes=6)
- server.write(b"abcdef")
+ rs.read_until_regex(b"def", self.stop, max_bytes=6)
+ ws.write(b"abcdef")
data = self.wait()
self.assertEqual(data, b"abcdef")
# Not enough space, but we don't know it until all we can do is
# log a warning and close the connection.
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until_regex(b"def", self.stop, max_bytes=5)
- server.write(b"123456")
+ rs.read_until_regex(b"def", self.stop, max_bytes=5)
+ ws.write(b"123456")
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_regex_max_bytes_inline(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Similar to the error case in the previous test, but the
- # server writes first so client reads are satisfied
+ # ws writes first so rs reads are satisfied
# inline. For consistency with the out-of-line case, we
# do not raise the error synchronously.
- server.write(b"123456")
+ ws.write(b"123456")
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until_regex(b"def", self.stop, max_bytes=5)
+ rs.read_until_regex(b"def", self.stop, max_bytes=5)
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_read_until_regex_max_bytes_ignores_extra(self):
- server, client = self.make_iostream_pair()
- client.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = self.make_iostream_pair()
+ rs.set_close_callback(lambda: self.stop("closed"))
try:
# Even though data that matches arrives the same packet that
# puts us over the limit, we fail the request because it was not
# found within the limit.
- server.write(b"abcdef")
+ ws.write(b"abcdef")
with ExpectLog(gen_log, "Unsatisfiable read"):
- client.read_until_regex(b"def", self.stop, max_bytes=5)
+ rs.read_until_regex(b"def", self.stop, max_bytes=5)
data = self.wait()
self.assertEqual(data, "closed")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_small_reads_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
- server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
try:
- server.write(b"a" * 1024 * 100)
+ ws.write(b"a" * 1024 * 100)
for i in range(100):
- client.read_bytes(1024, self.stop)
+ rs.read_bytes(1024, self.stop)
data = self.wait()
self.assertEqual(data, b"a" * 1024)
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_small_read_untils_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
- server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
try:
- server.write((b"a" * 1023 + b"\n") * 100)
+ ws.write((b"a" * 1023 + b"\n") * 100)
for i in range(100):
- client.read_until(b"\n", self.stop, max_bytes=4096)
+ rs.read_until(b"\n", self.stop, max_bytes=4096)
data = self.wait()
self.assertEqual(data, b"a" * 1023 + b"\n")
finally:
- server.close()
- client.close()
+ ws.close()
+ rs.close()
def test_flow_control(self):
MB = 1024 * 1024
- server, client = self.make_iostream_pair(max_buffer_size=5 * MB)
+ rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB)
try:
- # Client writes more than the server will accept.
- client.write(b"a" * 10 * MB)
- # The server pauses while reading.
- server.read_bytes(MB, self.stop)
+ # Client writes more than the rs will accept.
+ ws.write(b"a" * 10 * MB)
+ # The rs pauses while reading.
+ rs.read_bytes(MB, self.stop)
self.wait()
self.io_loop.call_later(0.1, self.stop)
self.wait()
- # The client's writes have been blocked; the server can
+ # The ws's writes have been blocked; the rs can
# continue to read gradually.
for i in range(9):
- server.read_bytes(MB, self.stop)
+ rs.read_bytes(MB, self.stop)
+ self.wait()
+ finally:
+ rs.close()
+ ws.close()
+
+
+class TestIOStreamMixin(TestReadWriteMixin):
+ def _make_server_iostream(self, connection, **kwargs):
+ raise NotImplementedError()
+
+ def _make_client_iostream(self, connection, **kwargs):
+ raise NotImplementedError()
+
+ def make_iostream_pair(self, **kwargs):
+ listener, port = bind_unused_port()
+ streams = [None, None]
+
+ def accept_callback(connection, address):
+ streams[0] = self._make_server_iostream(connection, **kwargs)
+ self.stop()
+
+ def connect_callback():
+ streams[1] = client_stream
+ self.stop()
+ netutil.add_accept_handler(listener, accept_callback)
+ client_stream = self._make_client_iostream(socket.socket(), **kwargs)
+ client_stream.connect(('127.0.0.1', port),
+ callback=connect_callback)
+ self.wait(condition=lambda: all(streams))
+ self.io_loop.remove_handler(listener.fileno())
+ listener.close()
+ return streams
+
+ def test_connection_refused(self):
+ # When a connection is refused, the connect callback should not
+ # be run. (The kqueue IOLoop used to behave differently from the
+ # epoll IOLoop in this respect)
+ cleanup_func, port = refusing_port()
+ self.addCleanup(cleanup_func)
+ stream = IOStream(socket.socket())
+ self.connect_called = False
+
+ def connect_callback():
+ self.connect_called = True
+ self.stop()
+ stream.set_close_callback(self.stop)
+ # log messages vary by platform and ioloop implementation
+ with ExpectLog(gen_log, ".*", required=False):
+ stream.connect(("127.0.0.1", port), connect_callback)
+ self.wait()
+ self.assertFalse(self.connect_called)
+ self.assertTrue(isinstance(stream.error, socket.error), stream.error)
+ if sys.platform != 'cygwin':
+ _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
+ if hasattr(errno, "WSAECONNREFUSED"):
+ _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
+ # cygwin's errnos don't match those used on native windows python
+ self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
+
+ @unittest.skipIf(mock is None, 'mock package not present')
+ def test_gaierror(self):
+ # Test that IOStream sets its exc_info on getaddrinfo error.
+ # It's difficult to reliably trigger a getaddrinfo error;
+ # some resolvers own't even return errors for malformed names,
+ # so we mock it instead. If IOStream changes to call a Resolver
+ # before sock.connect, the mock target will need to change too.
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ stream = IOStream(s)
+ stream.set_close_callback(self.stop)
+ with mock.patch('socket.socket.connect',
+ side_effect=socket.gaierror(errno.EIO, 'boom')):
+ with ExpectLog(gen_log, "Connect error"):
+ stream.connect(('localhost', 80), callback=self.stop)
+ self.wait()
+ self.assertIsInstance(stream.error, socket.gaierror)
+
+ def test_read_callback_error(self):
+ # Test that IOStream sets its exc_info when a read callback throws
+ server, client = self.make_iostream_pair()
+ try:
+ server.set_close_callback(self.stop)
+ with ExpectLog(
+ app_log, "(Uncaught exception|Exception in callback)"
+ ):
+ # Clear ExceptionStackContext so IOStream catches error
+ with NullContext():
+ server.read_bytes(1, callback=lambda data: 1 / 0)
+ client.write(b"1")
+ self.wait()
+ self.assertTrue(isinstance(server.error, ZeroDivisionError))
+ finally:
+ server.close()
+ client.close()
+
+ @unittest.skipIf(mock is None, 'mock package not present')
+ def test_read_until_close_with_error(self):
+ server, client = self.make_iostream_pair()
+ try:
+ with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
+ side_effect=IOError('boom')):
+ with self.assertRaisesRegexp(IOError, 'boom'):
+ client.read_until_close(self.stop)
+ finally:
+ server.close()
+ client.close()
+
+ @skipIfNonUnix
+ @skipPypy3V58
+ def test_inline_read_error(self):
+ # An error on an inline read is raised without logging (on the
+ # assumption that it will eventually be noticed or logged further
+ # up the stack).
+ #
+ # This test is posix-only because windows os.close() doesn't work
+ # on socket FDs, but we can't close the socket object normally
+ # because we won't get the error we want if the socket knows
+ # it's closed.
+ server, client = self.make_iostream_pair()
+ try:
+ os.close(server.socket.fileno())
+ with self.assertRaises(socket.error):
+ server.read_bytes(1, lambda data: None)
+ finally:
+ server.close()
+ client.close()
+
+ @skipPypy3V58
+ def test_async_read_error_logging(self):
+ # Socket errors on asynchronous reads should be logged (but only
+ # once).
+ server, client = self.make_iostream_pair()
+ server.set_close_callback(self.stop)
+ try:
+ # Start a read that will be fulfilled asynchronously.
+ server.read_bytes(1, lambda data: None)
+ client.write(b'a')
+ # Stub out read_from_fd to make it fail.
+
+ def fake_read_from_fd():
+ os.close(server.socket.fileno())
+ server.__class__.read_from_fd(server)
+ server.read_from_fd = fake_read_from_fd
+ # This log message is from _handle_read (not read_from_fd).
+ with ExpectLog(gen_log, "error on read"):
self.wait()
finally:
server.close()
@skipIfNonUnix
-class TestPipeIOStream(AsyncTestCase):
- def test_pipe_iostream(self):
+class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
+ def make_iostream_pair(self, **kwargs):
r, w = os.pipe()
- rs = PipeIOStream(r)
- ws = PipeIOStream(w)
+ return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
+
+ def test_pipe_iostream(self):
+ rs, ws = self.make_iostream_pair()
ws.write(b"hel")
ws.write(b"lo world")
rs.close()
def test_pipe_iostream_big_write(self):
- r, w = os.pipe()
-
- rs = PipeIOStream(r)
- ws = PipeIOStream(w)
+ rs, ws = self.make_iostream_pair()
NUM_BYTES = 1048576