From: Antoine Pitrou Date: Sun, 22 Oct 2017 17:05:41 +0000 (+0200) Subject: iostream: Use file objects instead of raw descriptors in PipeIOStream X-Git-Tag: v5.0.0~15^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=81dd461da39e7dd9dcd060a958437795bafeb533;p=thirdparty%2Ftornado.git iostream: Use file objects instead of raw descriptors in PipeIOStream This makes the implementation of PipeIOStream more compatible with socket streams, and allows for more of the test suite to be used with it. Extracted from PR #2193 --- diff --git a/tornado/iostream.py b/tornado/iostream.py index d77460320..3bf058749 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -27,6 +27,7 @@ from __future__ import absolute_import, division, print_function import collections import errno +import io import numbers import os import socket @@ -1573,6 +1574,7 @@ class PipeIOStream(BaseIOStream): """ def __init__(self, fd, *args, **kwargs): self.fd = fd + self._fio = io.FileIO(self.fd, "r+") _set_nonblocking(fd) super(PipeIOStream, self).__init__(*args, **kwargs) @@ -1580,7 +1582,7 @@ class PipeIOStream(BaseIOStream): return self.fd def close_fd(self): - os.close(self.fd) + self._fio.close() def write_to_fd(self, data): try: @@ -1592,17 +1594,18 @@ class PipeIOStream(BaseIOStream): def read_from_fd(self): try: - chunk = os.read(self.fd, self.read_chunk_size) + chunk = self._fio.read(self.read_chunk_size) except (IOError, OSError) as e: - if errno_from_exception(e) in _ERRNO_WOULDBLOCK: - return None - elif errno_from_exception(e) == errno.EBADF: + if errno_from_exception(e) == errno.EBADF: # If the writing half of a pipe is closed, select will # report it as readable but reads will fail with EBADF. self.close(exc_info=e) return None else: raise + if chunk is None: + # Read would block + return None if not chunk: self.close() return None diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index bdc5ee14f..1bfb0b34c 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -168,122 +168,24 @@ class TestIOStreamWebMixin(object): stream.read_bytes(1) -class TestIOStreamMixin(object): - def _make_server_iostream(self, connection, **kwargs): - raise NotImplementedError() - - def _make_client_iostream(self, connection, **kwargs): - raise NotImplementedError() +class TestReadWriteMixin(object): + # Tests where one stream reads and the other writes. + # These should work for BaseIOStream implementations. def make_iostream_pair(self, **kwargs): - listener, port = bind_unused_port() - streams = [None, None] - - def accept_callback(connection, address): - streams[0] = self._make_server_iostream(connection, **kwargs) - self.stop() - - def connect_callback(): - streams[1] = client_stream - self.stop() - netutil.add_accept_handler(listener, accept_callback) - client_stream = self._make_client_iostream(socket.socket(), **kwargs) - client_stream.connect(('127.0.0.1', port), - callback=connect_callback) - self.wait(condition=lambda: all(streams)) - self.io_loop.remove_handler(listener.fileno()) - listener.close() - return streams - - def test_streaming_callback_with_data_in_buffer(self): - server, client = self.make_iostream_pair() - client.write(b"abcd\r\nefgh") - server.read_until(b"\r\n", self.stop) - data = self.wait() - self.assertEqual(data, b"abcd\r\n") - - def closed_callback(chunk): - self.fail() - server.read_until_close(callback=closed_callback, - streaming_callback=self.stop) - # self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop) - data = self.wait() - self.assertEqual(data, b"efgh") - server.close() - client.close() + raise NotImplementedError def test_write_zero_bytes(self): # Attempting to write zero bytes should run the callback without # going into an infinite loop. - server, client = self.make_iostream_pair() - server.write(b'', callback=self.stop) + rs, ws = self.make_iostream_pair() + ws.write(b'', callback=self.stop) self.wait() - server.close() - client.close() - - def test_connection_refused(self): - # When a connection is refused, the connect callback should not - # be run. (The kqueue IOLoop used to behave differently from the - # epoll IOLoop in this respect) - cleanup_func, port = refusing_port() - self.addCleanup(cleanup_func) - stream = IOStream(socket.socket()) - self.connect_called = False - - def connect_callback(): - self.connect_called = True - self.stop() - stream.set_close_callback(self.stop) - # log messages vary by platform and ioloop implementation - with ExpectLog(gen_log, ".*", required=False): - stream.connect(("127.0.0.1", port), connect_callback) - self.wait() - self.assertFalse(self.connect_called) - self.assertTrue(isinstance(stream.error, socket.error), stream.error) - if sys.platform != 'cygwin': - _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,) - if hasattr(errno, "WSAECONNREFUSED"): - _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,) - # cygwin's errnos don't match those used on native windows python - self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED) - - @unittest.skipIf(mock is None, 'mock package not present') - def test_gaierror(self): - # Test that IOStream sets its exc_info on getaddrinfo error. - # It's difficult to reliably trigger a getaddrinfo error; - # some resolvers own't even return errors for malformed names, - # so we mock it instead. If IOStream changes to call a Resolver - # before sock.connect, the mock target will need to change too. - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - stream = IOStream(s) - stream.set_close_callback(self.stop) - with mock.patch('socket.socket.connect', - side_effect=socket.gaierror(errno.EIO, 'boom')): - with ExpectLog(gen_log, "Connect error"): - stream.connect(('localhost', 80), callback=self.stop) - self.wait() - self.assertIsInstance(stream.error, socket.gaierror) - - def test_read_callback_error(self): - # Test that IOStream sets its exc_info when a read callback throws - server, client = self.make_iostream_pair() - try: - server.set_close_callback(self.stop) - with ExpectLog( - app_log, "(Uncaught exception|Exception in callback)" - ): - # Clear ExceptionStackContext so IOStream catches error - with NullContext(): - server.read_bytes(1, callback=lambda data: 1 / 0) - client.write(b"1") - self.wait() - self.assertTrue(isinstance(server.error, ZeroDivisionError)) - finally: - server.close() - client.close() + ws.close() + rs.close() def test_streaming_callback(self): - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: chunks = [] final_called = [] @@ -296,24 +198,41 @@ class TestIOStreamMixin(object): self.assertFalse(data) final_called.append(True) self.stop() - server.read_bytes(6, callback=final_callback, - streaming_callback=streaming_callback) - client.write(b"1234") + rs.read_bytes(6, callback=final_callback, + streaming_callback=streaming_callback) + ws.write(b"1234") self.wait(condition=lambda: chunks) - client.write(b"5678") + ws.write(b"5678") self.wait(condition=lambda: final_called) self.assertEqual(chunks, [b"1234", b"56"]) # the rest of the last chunk is still in the buffer - server.read_bytes(2, callback=self.stop) + rs.read_bytes(2, callback=self.stop) data = self.wait() self.assertEqual(data, b"78") finally: - server.close() - client.close() + rs.close() + ws.close() + + def test_streaming_callback_with_data_in_buffer(self): + rs, ws = self.make_iostream_pair() + ws.write(b"abcd\r\nefgh") + rs.read_until(b"\r\n", self.stop) + data = self.wait() + self.assertEqual(data, b"abcd\r\n") + + def closed_callback(chunk): + self.fail() + rs.read_until_close(callback=closed_callback, + streaming_callback=self.stop) + #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop) + data = self.wait() + self.assertEqual(data, b"efgh") + rs.close() + ws.close() def test_streaming_until_close(self): - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: chunks = [] closed = [False] @@ -326,88 +245,88 @@ class TestIOStreamMixin(object): assert not data, data closed[0] = True self.stop() - client.read_until_close(callback=close_callback, - streaming_callback=streaming_callback) - server.write(b"1234") + rs.read_until_close(callback=close_callback, + streaming_callback=streaming_callback) + ws.write(b"1234") self.wait(condition=lambda: len(chunks) == 1) - server.write(b"5678", self.stop) + ws.write(b"5678", self.stop) self.wait() - server.close() + ws.close() self.wait(condition=lambda: closed[0]) self.assertEqual(chunks, [b"1234", b"5678"]) finally: - server.close() - client.close() + ws.close() + rs.close() def test_streaming_until_close_future(self): - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: chunks = [] @gen.coroutine - def client_task(): - yield client.read_until_close(streaming_callback=chunks.append) + def rs_task(): + yield rs.read_until_close(streaming_callback=chunks.append) @gen.coroutine - def server_task(): - yield server.write(b"1234") + def ws_task(): + yield ws.write(b"1234") yield gen.sleep(0.01) - yield server.write(b"5678") - server.close() + yield ws.write(b"5678") + ws.close() @gen.coroutine def f(): - yield [client_task(), server_task()] + yield [rs_task(), ws_task()] self.io_loop.run_sync(f) self.assertEqual(chunks, [b"1234", b"5678"]) finally: - server.close() - client.close() + ws.close() + rs.close() def test_delayed_close_callback(self): # The scenario: Server closes the connection while there is a pending # read that can be served out of buffered data. The client does not # run the close_callback as soon as it detects the close, but rather # defers it until after the buffered read has finished. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: - client.set_close_callback(self.stop) - server.write(b"12") + rs.set_close_callback(self.stop) + ws.write(b"12") chunks = [] def callback1(data): chunks.append(data) - client.read_bytes(1, callback2) - server.close() + rs.read_bytes(1, callback2) + ws.close() def callback2(data): chunks.append(data) - client.read_bytes(1, callback1) + rs.read_bytes(1, callback1) self.wait() # stopped by close_callback self.assertEqual(chunks, [b"1", b"2"]) finally: - server.close() - client.close() + ws.close() + rs.close() def test_future_delayed_close_callback(self): # Same as test_delayed_close_callback, but with the future interface. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() # We can't call make_iostream_pair inside a gen_test function # because the ioloop is not reentrant. @gen_test def f(self): - server.write(b"12") + ws.write(b"12") chunks = [] - chunks.append((yield client.read_bytes(1))) - server.close() - chunks.append((yield client.read_bytes(1))) + chunks.append((yield rs.read_bytes(1))) + ws.close() + chunks.append((yield rs.read_bytes(1))) self.assertEqual(chunks, [b"1", b"2"]) try: f(self) finally: - server.close() - client.close() + ws.close() + rs.close() def test_close_buffered_data(self): # Similar to the previous test, but with data stored in the OS's @@ -418,393 +337,482 @@ class TestIOStreamMixin(object): # # This depends on the read_chunk_size being smaller than the # OS socket buffer, so make it small. - server, client = self.make_iostream_pair(read_chunk_size=256) + rs, ws = self.make_iostream_pair(read_chunk_size=256) try: - server.write(b"A" * 512) - client.read_bytes(256, self.stop) + ws.write(b"A" * 512) + rs.read_bytes(256, self.stop) data = self.wait() self.assertEqual(b"A" * 256, data) - server.close() - # Allow the close to propagate to the client side of the + ws.close() + # Allow the close to propagate to the `rs` side of the # connection. Using add_callback instead of add_timeout # doesn't seem to work, even with multiple iterations self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop) self.wait() - client.read_bytes(256, self.stop) + rs.read_bytes(256, self.stop) data = self.wait() self.assertEqual(b"A" * 256, data) finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_close_after_close(self): # Similar to test_delayed_close_callback, but read_until_close takes # a separate code path so test it separately. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: - server.write(b"1234") - server.close() + ws.write(b"1234") + ws.close() # Read one byte to make sure the client has received the data. # It won't run the close callback as long as there is more buffered # data that could satisfy a later read. - client.read_bytes(1, self.stop) + rs.read_bytes(1, self.stop) data = self.wait() self.assertEqual(data, b"1") - client.read_until_close(self.stop) + rs.read_until_close(self.stop) data = self.wait() self.assertEqual(data, b"234") finally: - server.close() - client.close() - - @unittest.skipIf(mock is None, 'mock package not present') - def test_read_until_close_with_error(self): - server, client = self.make_iostream_pair() - try: - with mock.patch('tornado.iostream.BaseIOStream._try_inline_read', - side_effect=IOError('boom')): - with self.assertRaisesRegexp(IOError, 'boom'): - client.read_until_close(self.stop) - finally: - server.close() - client.close() + ws.close() + rs.close() def test_streaming_read_until_close_after_close(self): # Same as the preceding test but with a streaming_callback. # All data should go through the streaming callback, # and the final read callback just gets an empty string. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: - server.write(b"1234") - server.close() - client.read_bytes(1, self.stop) + ws.write(b"1234") + ws.close() + rs.read_bytes(1, self.stop) data = self.wait() self.assertEqual(data, b"1") streaming_data = [] - client.read_until_close(self.stop, - streaming_callback=streaming_data.append) + rs.read_until_close(self.stop, + streaming_callback=streaming_data.append) data = self.wait() self.assertEqual(b'', data) self.assertEqual(b''.join(streaming_data), b"234") finally: - server.close() - client.close() + ws.close() + rs.close() def test_large_read_until(self): # Performance test: read_until used to have a quadratic component # so a read_until of 4MB would take 8 seconds; now it takes 0.25 # seconds. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: # This test fails on pypy with ssl. I think it's because # pypy's gc defeats moves objects, breaking the # "frozen write buffer" assumption. - if (isinstance(server, SSLIOStream) and + if (isinstance(rs, SSLIOStream) and platform.python_implementation() == 'PyPy'): raise unittest.SkipTest( "pypy gc causes problems with openssl") NUM_KB = 4096 for i in range(NUM_KB): - client.write(b"A" * 1024) - client.write(b"\r\n") - server.read_until(b"\r\n", self.stop) + ws.write(b"A" * 1024) + ws.write(b"\r\n") + rs.read_until(b"\r\n", self.stop) data = self.wait() self.assertEqual(len(data), NUM_KB * 1024 + 2) finally: - server.close() - client.close() + ws.close() + rs.close() def test_close_callback_with_pending_read(self): # Regression test for a bug that was introduced in 2.3 # where the IOStream._close_callback would never be called # if there were pending reads. OK = b"OK\r\n" - server, client = self.make_iostream_pair() - client.set_close_callback(self.stop) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(self.stop) try: - server.write(OK) - client.read_until(b"\r\n", self.stop) + ws.write(OK) + rs.read_until(b"\r\n", self.stop) res = self.wait() self.assertEqual(res, OK) - server.close() - client.read_until(b"\r\n", lambda x: x) + ws.close() + rs.read_until(b"\r\n", lambda x: x) # If _close_callback (self.stop) is not called, # an AssertionError: Async operation timed out after 5 seconds # will be raised. res = self.wait() self.assertTrue(res is None) finally: - server.close() - client.close() - - @skipIfNonUnix - @skipPypy3V58 - def test_inline_read_error(self): - # An error on an inline read is raised without logging (on the - # assumption that it will eventually be noticed or logged further - # up the stack). - # - # This test is posix-only because windows os.close() doesn't work - # on socket FDs, but we can't close the socket object normally - # because we won't get the error we want if the socket knows - # it's closed. - server, client = self.make_iostream_pair() - try: - os.close(server.socket.fileno()) - with self.assertRaises(socket.error): - server.read_bytes(1, lambda data: None) - finally: - server.close() - client.close() - - @skipPypy3V58 - def test_async_read_error_logging(self): - # Socket errors on asynchronous reads should be logged (but only - # once). - server, client = self.make_iostream_pair() - server.set_close_callback(self.stop) - try: - # Start a read that will be fulfilled asynchronously. - server.read_bytes(1, lambda data: None) - client.write(b'a') - # Stub out read_from_fd to make it fail. - - def fake_read_from_fd(): - os.close(server.socket.fileno()) - server.__class__.read_from_fd(server) - server.read_from_fd = fake_read_from_fd - # This log message is from _handle_read (not read_from_fd). - with ExpectLog(gen_log, "error on read"): - self.wait() - finally: - server.close() - client.close() + ws.close() + rs.close() def test_future_close_callback(self): # Regression test for interaction between the Future read interfaces # and IOStream._maybe_add_error_listener. - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() closed = [False] def close_callback(): closed[0] = True self.stop() - server.set_close_callback(close_callback) + rs.set_close_callback(close_callback) try: - client.write(b'a') - future = server.read_bytes(1) + ws.write(b'a') + future = rs.read_bytes(1) self.io_loop.add_future(future, self.stop) self.assertEqual(self.wait().result(), b'a') self.assertFalse(closed[0]) - client.close() + ws.close() self.wait() self.assertTrue(closed[0]) finally: - server.close() - client.close() + rs.close() + ws.close() def test_write_memoryview(self): - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: - client.read_bytes(4, self.stop) - server.write(memoryview(b"hello")) + rs.read_bytes(4, self.stop) + ws.write(memoryview(b"hello")) data = self.wait() self.assertEqual(data, b"hell") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_bytes_partial(self): - server, client = self.make_iostream_pair() + rs, ws = self.make_iostream_pair() try: # Ask for more than is available with partial=True - client.read_bytes(50, self.stop, partial=True) - server.write(b"hello") + rs.read_bytes(50, self.stop, partial=True) + ws.write(b"hello") data = self.wait() self.assertEqual(data, b"hello") # Ask for less than what is available; num_bytes is still # respected. - client.read_bytes(3, self.stop, partial=True) - server.write(b"world") + rs.read_bytes(3, self.stop, partial=True) + ws.write(b"world") data = self.wait() self.assertEqual(data, b"wor") # Partial reads won't return an empty string, but read_bytes(0) # will. - client.read_bytes(0, self.stop, partial=True) + rs.read_bytes(0, self.stop, partial=True) data = self.wait() self.assertEqual(data, b'') finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_max_bytes(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Extra room under the limit - client.read_until(b"def", self.stop, max_bytes=50) - server.write(b"abcdef") + rs.read_until(b"def", self.stop, max_bytes=50) + ws.write(b"abcdef") data = self.wait() self.assertEqual(data, b"abcdef") # Just enough space - client.read_until(b"def", self.stop, max_bytes=6) - server.write(b"abcdef") + rs.read_until(b"def", self.stop, max_bytes=6) + ws.write(b"abcdef") data = self.wait() self.assertEqual(data, b"abcdef") # Not enough space, but we don't know it until all we can do is # log a warning and close the connection. with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until(b"def", self.stop, max_bytes=5) - server.write(b"123456") + rs.read_until(b"def", self.stop, max_bytes=5) + ws.write(b"123456") data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_max_bytes_inline(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Similar to the error case in the previous test, but the - # server writes first so client reads are satisfied + # ws writes first so rs reads are satisfied # inline. For consistency with the out-of-line case, we # do not raise the error synchronously. - server.write(b"123456") + ws.write(b"123456") with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until(b"def", self.stop, max_bytes=5) + rs.read_until(b"def", self.stop, max_bytes=5) data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_max_bytes_ignores_extra(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Even though data that matches arrives the same packet that # puts us over the limit, we fail the request because it was not # found within the limit. - server.write(b"abcdef") + ws.write(b"abcdef") with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until(b"def", self.stop, max_bytes=5) + rs.read_until(b"def", self.stop, max_bytes=5) data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_regex_max_bytes(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Extra room under the limit - client.read_until_regex(b"def", self.stop, max_bytes=50) - server.write(b"abcdef") + rs.read_until_regex(b"def", self.stop, max_bytes=50) + ws.write(b"abcdef") data = self.wait() self.assertEqual(data, b"abcdef") # Just enough space - client.read_until_regex(b"def", self.stop, max_bytes=6) - server.write(b"abcdef") + rs.read_until_regex(b"def", self.stop, max_bytes=6) + ws.write(b"abcdef") data = self.wait() self.assertEqual(data, b"abcdef") # Not enough space, but we don't know it until all we can do is # log a warning and close the connection. with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until_regex(b"def", self.stop, max_bytes=5) - server.write(b"123456") + rs.read_until_regex(b"def", self.stop, max_bytes=5) + ws.write(b"123456") data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_regex_max_bytes_inline(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Similar to the error case in the previous test, but the - # server writes first so client reads are satisfied + # ws writes first so rs reads are satisfied # inline. For consistency with the out-of-line case, we # do not raise the error synchronously. - server.write(b"123456") + ws.write(b"123456") with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until_regex(b"def", self.stop, max_bytes=5) + rs.read_until_regex(b"def", self.stop, max_bytes=5) data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_read_until_regex_max_bytes_ignores_extra(self): - server, client = self.make_iostream_pair() - client.set_close_callback(lambda: self.stop("closed")) + rs, ws = self.make_iostream_pair() + rs.set_close_callback(lambda: self.stop("closed")) try: # Even though data that matches arrives the same packet that # puts us over the limit, we fail the request because it was not # found within the limit. - server.write(b"abcdef") + ws.write(b"abcdef") with ExpectLog(gen_log, "Unsatisfiable read"): - client.read_until_regex(b"def", self.stop, max_bytes=5) + rs.read_until_regex(b"def", self.stop, max_bytes=5) data = self.wait() self.assertEqual(data, "closed") finally: - server.close() - client.close() + ws.close() + rs.close() def test_small_reads_from_large_buffer(self): # 10KB buffer size, 100KB available to read. # Read 1KB at a time and make sure that the buffer is not eagerly # filled. - server, client = self.make_iostream_pair(max_buffer_size=10 * 1024) + rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024) try: - server.write(b"a" * 1024 * 100) + ws.write(b"a" * 1024 * 100) for i in range(100): - client.read_bytes(1024, self.stop) + rs.read_bytes(1024, self.stop) data = self.wait() self.assertEqual(data, b"a" * 1024) finally: - server.close() - client.close() + ws.close() + rs.close() def test_small_read_untils_from_large_buffer(self): # 10KB buffer size, 100KB available to read. # Read 1KB at a time and make sure that the buffer is not eagerly # filled. - server, client = self.make_iostream_pair(max_buffer_size=10 * 1024) + rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024) try: - server.write((b"a" * 1023 + b"\n") * 100) + ws.write((b"a" * 1023 + b"\n") * 100) for i in range(100): - client.read_until(b"\n", self.stop, max_bytes=4096) + rs.read_until(b"\n", self.stop, max_bytes=4096) data = self.wait() self.assertEqual(data, b"a" * 1023 + b"\n") finally: - server.close() - client.close() + ws.close() + rs.close() def test_flow_control(self): MB = 1024 * 1024 - server, client = self.make_iostream_pair(max_buffer_size=5 * MB) + rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB) try: - # Client writes more than the server will accept. - client.write(b"a" * 10 * MB) - # The server pauses while reading. - server.read_bytes(MB, self.stop) + # Client writes more than the rs will accept. + ws.write(b"a" * 10 * MB) + # The rs pauses while reading. + rs.read_bytes(MB, self.stop) self.wait() self.io_loop.call_later(0.1, self.stop) self.wait() - # The client's writes have been blocked; the server can + # The ws's writes have been blocked; the rs can # continue to read gradually. for i in range(9): - server.read_bytes(MB, self.stop) + rs.read_bytes(MB, self.stop) + self.wait() + finally: + rs.close() + ws.close() + + +class TestIOStreamMixin(TestReadWriteMixin): + def _make_server_iostream(self, connection, **kwargs): + raise NotImplementedError() + + def _make_client_iostream(self, connection, **kwargs): + raise NotImplementedError() + + def make_iostream_pair(self, **kwargs): + listener, port = bind_unused_port() + streams = [None, None] + + def accept_callback(connection, address): + streams[0] = self._make_server_iostream(connection, **kwargs) + self.stop() + + def connect_callback(): + streams[1] = client_stream + self.stop() + netutil.add_accept_handler(listener, accept_callback) + client_stream = self._make_client_iostream(socket.socket(), **kwargs) + client_stream.connect(('127.0.0.1', port), + callback=connect_callback) + self.wait(condition=lambda: all(streams)) + self.io_loop.remove_handler(listener.fileno()) + listener.close() + return streams + + def test_connection_refused(self): + # When a connection is refused, the connect callback should not + # be run. (The kqueue IOLoop used to behave differently from the + # epoll IOLoop in this respect) + cleanup_func, port = refusing_port() + self.addCleanup(cleanup_func) + stream = IOStream(socket.socket()) + self.connect_called = False + + def connect_callback(): + self.connect_called = True + self.stop() + stream.set_close_callback(self.stop) + # log messages vary by platform and ioloop implementation + with ExpectLog(gen_log, ".*", required=False): + stream.connect(("127.0.0.1", port), connect_callback) + self.wait() + self.assertFalse(self.connect_called) + self.assertTrue(isinstance(stream.error, socket.error), stream.error) + if sys.platform != 'cygwin': + _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,) + if hasattr(errno, "WSAECONNREFUSED"): + _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,) + # cygwin's errnos don't match those used on native windows python + self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED) + + @unittest.skipIf(mock is None, 'mock package not present') + def test_gaierror(self): + # Test that IOStream sets its exc_info on getaddrinfo error. + # It's difficult to reliably trigger a getaddrinfo error; + # some resolvers own't even return errors for malformed names, + # so we mock it instead. If IOStream changes to call a Resolver + # before sock.connect, the mock target will need to change too. + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + stream = IOStream(s) + stream.set_close_callback(self.stop) + with mock.patch('socket.socket.connect', + side_effect=socket.gaierror(errno.EIO, 'boom')): + with ExpectLog(gen_log, "Connect error"): + stream.connect(('localhost', 80), callback=self.stop) + self.wait() + self.assertIsInstance(stream.error, socket.gaierror) + + def test_read_callback_error(self): + # Test that IOStream sets its exc_info when a read callback throws + server, client = self.make_iostream_pair() + try: + server.set_close_callback(self.stop) + with ExpectLog( + app_log, "(Uncaught exception|Exception in callback)" + ): + # Clear ExceptionStackContext so IOStream catches error + with NullContext(): + server.read_bytes(1, callback=lambda data: 1 / 0) + client.write(b"1") + self.wait() + self.assertTrue(isinstance(server.error, ZeroDivisionError)) + finally: + server.close() + client.close() + + @unittest.skipIf(mock is None, 'mock package not present') + def test_read_until_close_with_error(self): + server, client = self.make_iostream_pair() + try: + with mock.patch('tornado.iostream.BaseIOStream._try_inline_read', + side_effect=IOError('boom')): + with self.assertRaisesRegexp(IOError, 'boom'): + client.read_until_close(self.stop) + finally: + server.close() + client.close() + + @skipIfNonUnix + @skipPypy3V58 + def test_inline_read_error(self): + # An error on an inline read is raised without logging (on the + # assumption that it will eventually be noticed or logged further + # up the stack). + # + # This test is posix-only because windows os.close() doesn't work + # on socket FDs, but we can't close the socket object normally + # because we won't get the error we want if the socket knows + # it's closed. + server, client = self.make_iostream_pair() + try: + os.close(server.socket.fileno()) + with self.assertRaises(socket.error): + server.read_bytes(1, lambda data: None) + finally: + server.close() + client.close() + + @skipPypy3V58 + def test_async_read_error_logging(self): + # Socket errors on asynchronous reads should be logged (but only + # once). + server, client = self.make_iostream_pair() + server.set_close_callback(self.stop) + try: + # Start a read that will be fulfilled asynchronously. + server.read_bytes(1, lambda data: None) + client.write(b'a') + # Stub out read_from_fd to make it fail. + + def fake_read_from_fd(): + os.close(server.socket.fileno()) + server.__class__.read_from_fd(server) + server.read_from_fd = fake_read_from_fd + # This log message is from _handle_read (not read_from_fd). + with ExpectLog(gen_log, "error on read"): self.wait() finally: server.close() @@ -1094,12 +1102,14 @@ class WaitForHandshakeTest(AsyncTestCase): @skipIfNonUnix -class TestPipeIOStream(AsyncTestCase): - def test_pipe_iostream(self): +class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase): + def make_iostream_pair(self, **kwargs): r, w = os.pipe() - rs = PipeIOStream(r) - ws = PipeIOStream(w) + return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs) + + def test_pipe_iostream(self): + rs, ws = self.make_iostream_pair() ws.write(b"hel") ws.write(b"lo world") @@ -1121,10 +1131,7 @@ class TestPipeIOStream(AsyncTestCase): rs.close() def test_pipe_iostream_big_write(self): - r, w = os.pipe() - - rs = PipeIOStream(r) - ws = PipeIOStream(w) + rs, ws = self.make_iostream_pair() NUM_BYTES = 1048576