]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
iostream: Use file objects instead of raw descriptors in PipeIOStream
authorAntoine Pitrou <antoine@python.org>
Sun, 22 Oct 2017 17:05:41 +0000 (19:05 +0200)
committerBen Darnell <ben@bendarnell.com>
Sun, 14 Jan 2018 19:49:35 +0000 (14:49 -0500)
This makes the implementation of PipeIOStream more compatible with
socket streams, and allows for more of the test suite to be used with
it.

Extracted from PR #2193

tornado/iostream.py
tornado/test/iostream_test.py

index d77460320545c4f692d8f850010f8ea6284a9111..3bf058749429ed49e2187e112660c72b090e6325 100644 (file)
@@ -27,6 +27,7 @@ from __future__ import absolute_import, division, print_function
 
 import collections
 import errno
+import io
 import numbers
 import os
 import socket
@@ -1573,6 +1574,7 @@ class PipeIOStream(BaseIOStream):
     """
     def __init__(self, fd, *args, **kwargs):
         self.fd = fd
+        self._fio = io.FileIO(self.fd, "r+")
         _set_nonblocking(fd)
         super(PipeIOStream, self).__init__(*args, **kwargs)
 
@@ -1580,7 +1582,7 @@ class PipeIOStream(BaseIOStream):
         return self.fd
 
     def close_fd(self):
-        os.close(self.fd)
+        self._fio.close()
 
     def write_to_fd(self, data):
         try:
@@ -1592,17 +1594,18 @@ class PipeIOStream(BaseIOStream):
 
     def read_from_fd(self):
         try:
-            chunk = os.read(self.fd, self.read_chunk_size)
+            chunk = self._fio.read(self.read_chunk_size)
         except (IOError, OSError) as e:
-            if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
-                return None
-            elif errno_from_exception(e) == errno.EBADF:
+            if errno_from_exception(e) == errno.EBADF:
                 # If the writing half of a pipe is closed, select will
                 # report it as readable but reads will fail with EBADF.
                 self.close(exc_info=e)
                 return None
             else:
                 raise
+        if chunk is None:
+            # Read would block
+            return None
         if not chunk:
             self.close()
             return None
index bdc5ee14f957dcd89f7838a21bf76260f0b327aa..1bfb0b34cb689fa194303ddf7214dd2d58c8a8c5 100644 (file)
@@ -168,122 +168,24 @@ class TestIOStreamWebMixin(object):
             stream.read_bytes(1)
 
 
-class TestIOStreamMixin(object):
-    def _make_server_iostream(self, connection, **kwargs):
-        raise NotImplementedError()
-
-    def _make_client_iostream(self, connection, **kwargs):
-        raise NotImplementedError()
+class TestReadWriteMixin(object):
+    # Tests where one stream reads and the other writes.
+    # These should work for BaseIOStream implementations.
 
     def make_iostream_pair(self, **kwargs):
-        listener, port = bind_unused_port()
-        streams = [None, None]
-
-        def accept_callback(connection, address):
-            streams[0] = self._make_server_iostream(connection, **kwargs)
-            self.stop()
-
-        def connect_callback():
-            streams[1] = client_stream
-            self.stop()
-        netutil.add_accept_handler(listener, accept_callback)
-        client_stream = self._make_client_iostream(socket.socket(), **kwargs)
-        client_stream.connect(('127.0.0.1', port),
-                              callback=connect_callback)
-        self.wait(condition=lambda: all(streams))
-        self.io_loop.remove_handler(listener.fileno())
-        listener.close()
-        return streams
-
-    def test_streaming_callback_with_data_in_buffer(self):
-        server, client = self.make_iostream_pair()
-        client.write(b"abcd\r\nefgh")
-        server.read_until(b"\r\n", self.stop)
-        data = self.wait()
-        self.assertEqual(data, b"abcd\r\n")
-
-        def closed_callback(chunk):
-            self.fail()
-        server.read_until_close(callback=closed_callback,
-                                streaming_callback=self.stop)
-        # self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
-        data = self.wait()
-        self.assertEqual(data, b"efgh")
-        server.close()
-        client.close()
+        raise NotImplementedError
 
     def test_write_zero_bytes(self):
         # Attempting to write zero bytes should run the callback without
         # going into an infinite loop.
-        server, client = self.make_iostream_pair()
-        server.write(b'', callback=self.stop)
+        rs, ws = self.make_iostream_pair()
+        ws.write(b'', callback=self.stop)
         self.wait()
-        server.close()
-        client.close()
-
-    def test_connection_refused(self):
-        # When a connection is refused, the connect callback should not
-        # be run.  (The kqueue IOLoop used to behave differently from the
-        # epoll IOLoop in this respect)
-        cleanup_func, port = refusing_port()
-        self.addCleanup(cleanup_func)
-        stream = IOStream(socket.socket())
-        self.connect_called = False
-
-        def connect_callback():
-            self.connect_called = True
-            self.stop()
-        stream.set_close_callback(self.stop)
-        # log messages vary by platform and ioloop implementation
-        with ExpectLog(gen_log, ".*", required=False):
-            stream.connect(("127.0.0.1", port), connect_callback)
-            self.wait()
-        self.assertFalse(self.connect_called)
-        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
-        if sys.platform != 'cygwin':
-            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
-            if hasattr(errno, "WSAECONNREFUSED"):
-                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
-            # cygwin's errnos don't match those used on native windows python
-            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
-
-    @unittest.skipIf(mock is None, 'mock package not present')
-    def test_gaierror(self):
-        # Test that IOStream sets its exc_info on getaddrinfo error.
-        # It's difficult to reliably trigger a getaddrinfo error;
-        # some resolvers own't even return errors for malformed names,
-        # so we mock it instead. If IOStream changes to call a Resolver
-        # before sock.connect, the mock target will need to change too.
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-        stream = IOStream(s)
-        stream.set_close_callback(self.stop)
-        with mock.patch('socket.socket.connect',
-                        side_effect=socket.gaierror(errno.EIO, 'boom')):
-            with ExpectLog(gen_log, "Connect error"):
-                stream.connect(('localhost', 80), callback=self.stop)
-                self.wait()
-                self.assertIsInstance(stream.error, socket.gaierror)
-
-    def test_read_callback_error(self):
-        # Test that IOStream sets its exc_info when a read callback throws
-        server, client = self.make_iostream_pair()
-        try:
-            server.set_close_callback(self.stop)
-            with ExpectLog(
-                app_log, "(Uncaught exception|Exception in callback)"
-            ):
-                # Clear ExceptionStackContext so IOStream catches error
-                with NullContext():
-                    server.read_bytes(1, callback=lambda data: 1 / 0)
-                client.write(b"1")
-                self.wait()
-            self.assertTrue(isinstance(server.error, ZeroDivisionError))
-        finally:
-            server.close()
-            client.close()
+        ws.close()
+        rs.close()
 
     def test_streaming_callback(self):
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
             chunks = []
             final_called = []
@@ -296,24 +198,41 @@ class TestIOStreamMixin(object):
                 self.assertFalse(data)
                 final_called.append(True)
                 self.stop()
-            server.read_bytes(6, callback=final_callback,
-                              streaming_callback=streaming_callback)
-            client.write(b"1234")
+            rs.read_bytes(6, callback=final_callback,
+                          streaming_callback=streaming_callback)
+            ws.write(b"1234")
             self.wait(condition=lambda: chunks)
-            client.write(b"5678")
+            ws.write(b"5678")
             self.wait(condition=lambda: final_called)
             self.assertEqual(chunks, [b"1234", b"56"])
 
             # the rest of the last chunk is still in the buffer
-            server.read_bytes(2, callback=self.stop)
+            rs.read_bytes(2, callback=self.stop)
             data = self.wait()
             self.assertEqual(data, b"78")
         finally:
-            server.close()
-            client.close()
+            rs.close()
+            ws.close()
+
+    def test_streaming_callback_with_data_in_buffer(self):
+        rs, ws = self.make_iostream_pair()
+        ws.write(b"abcd\r\nefgh")
+        rs.read_until(b"\r\n", self.stop)
+        data = self.wait()
+        self.assertEqual(data, b"abcd\r\n")
+
+        def closed_callback(chunk):
+            self.fail()
+        rs.read_until_close(callback=closed_callback,
+                            streaming_callback=self.stop)
+        #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
+        data = self.wait()
+        self.assertEqual(data, b"efgh")
+        rs.close()
+        ws.close()
 
     def test_streaming_until_close(self):
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
             chunks = []
             closed = [False]
@@ -326,88 +245,88 @@ class TestIOStreamMixin(object):
                 assert not data, data
                 closed[0] = True
                 self.stop()
-            client.read_until_close(callback=close_callback,
-                                    streaming_callback=streaming_callback)
-            server.write(b"1234")
+            rs.read_until_close(callback=close_callback,
+                                streaming_callback=streaming_callback)
+            ws.write(b"1234")
             self.wait(condition=lambda: len(chunks) == 1)
-            server.write(b"5678", self.stop)
+            ws.write(b"5678", self.stop)
             self.wait()
-            server.close()
+            ws.close()
             self.wait(condition=lambda: closed[0])
             self.assertEqual(chunks, [b"1234", b"5678"])
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_streaming_until_close_future(self):
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
             chunks = []
 
             @gen.coroutine
-            def client_task():
-                yield client.read_until_close(streaming_callback=chunks.append)
+            def rs_task():
+                yield rs.read_until_close(streaming_callback=chunks.append)
 
             @gen.coroutine
-            def server_task():
-                yield server.write(b"1234")
+            def ws_task():
+                yield ws.write(b"1234")
                 yield gen.sleep(0.01)
-                yield server.write(b"5678")
-                server.close()
+                yield ws.write(b"5678")
+                ws.close()
 
             @gen.coroutine
             def f():
-                yield [client_task(), server_task()]
+                yield [rs_task(), ws_task()]
             self.io_loop.run_sync(f)
             self.assertEqual(chunks, [b"1234", b"5678"])
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_delayed_close_callback(self):
         # The scenario:  Server closes the connection while there is a pending
         # read that can be served out of buffered data.  The client does not
         # run the close_callback as soon as it detects the close, but rather
         # defers it until after the buffered read has finished.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
-            client.set_close_callback(self.stop)
-            server.write(b"12")
+            rs.set_close_callback(self.stop)
+            ws.write(b"12")
             chunks = []
 
             def callback1(data):
                 chunks.append(data)
-                client.read_bytes(1, callback2)
-                server.close()
+                rs.read_bytes(1, callback2)
+                ws.close()
 
             def callback2(data):
                 chunks.append(data)
-            client.read_bytes(1, callback1)
+            rs.read_bytes(1, callback1)
             self.wait()  # stopped by close_callback
             self.assertEqual(chunks, [b"1", b"2"])
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_future_delayed_close_callback(self):
         # Same as test_delayed_close_callback, but with the future interface.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
 
         # We can't call make_iostream_pair inside a gen_test function
         # because the ioloop is not reentrant.
         @gen_test
         def f(self):
-            server.write(b"12")
+            ws.write(b"12")
             chunks = []
-            chunks.append((yield client.read_bytes(1)))
-            server.close()
-            chunks.append((yield client.read_bytes(1)))
+            chunks.append((yield rs.read_bytes(1)))
+            ws.close()
+            chunks.append((yield rs.read_bytes(1)))
             self.assertEqual(chunks, [b"1", b"2"])
         try:
             f(self)
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_close_buffered_data(self):
         # Similar to the previous test, but with data stored in the OS's
@@ -418,393 +337,482 @@ class TestIOStreamMixin(object):
         #
         # This depends on the read_chunk_size being smaller than the
         # OS socket buffer, so make it small.
-        server, client = self.make_iostream_pair(read_chunk_size=256)
+        rs, ws = self.make_iostream_pair(read_chunk_size=256)
         try:
-            server.write(b"A" * 512)
-            client.read_bytes(256, self.stop)
+            ws.write(b"A" * 512)
+            rs.read_bytes(256, self.stop)
             data = self.wait()
             self.assertEqual(b"A" * 256, data)
-            server.close()
-            # Allow the close to propagate to the client side of the
+            ws.close()
+            # Allow the close to propagate to the `rs` side of the
             # connection.  Using add_callback instead of add_timeout
             # doesn't seem to work, even with multiple iterations
             self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
             self.wait()
-            client.read_bytes(256, self.stop)
+            rs.read_bytes(256, self.stop)
             data = self.wait()
             self.assertEqual(b"A" * 256, data)
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_close_after_close(self):
         # Similar to test_delayed_close_callback, but read_until_close takes
         # a separate code path so test it separately.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
-            server.write(b"1234")
-            server.close()
+            ws.write(b"1234")
+            ws.close()
             # Read one byte to make sure the client has received the data.
             # It won't run the close callback as long as there is more buffered
             # data that could satisfy a later read.
-            client.read_bytes(1, self.stop)
+            rs.read_bytes(1, self.stop)
             data = self.wait()
             self.assertEqual(data, b"1")
-            client.read_until_close(self.stop)
+            rs.read_until_close(self.stop)
             data = self.wait()
             self.assertEqual(data, b"234")
         finally:
-            server.close()
-            client.close()
-
-    @unittest.skipIf(mock is None, 'mock package not present')
-    def test_read_until_close_with_error(self):
-        server, client = self.make_iostream_pair()
-        try:
-            with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
-                            side_effect=IOError('boom')):
-                with self.assertRaisesRegexp(IOError, 'boom'):
-                    client.read_until_close(self.stop)
-        finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_streaming_read_until_close_after_close(self):
         # Same as the preceding test but with a streaming_callback.
         # All data should go through the streaming callback,
         # and the final read callback just gets an empty string.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
-            server.write(b"1234")
-            server.close()
-            client.read_bytes(1, self.stop)
+            ws.write(b"1234")
+            ws.close()
+            rs.read_bytes(1, self.stop)
             data = self.wait()
             self.assertEqual(data, b"1")
             streaming_data = []
-            client.read_until_close(self.stop,
-                                    streaming_callback=streaming_data.append)
+            rs.read_until_close(self.stop,
+                                streaming_callback=streaming_data.append)
             data = self.wait()
             self.assertEqual(b'', data)
             self.assertEqual(b''.join(streaming_data), b"234")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_large_read_until(self):
         # Performance test: read_until used to have a quadratic component
         # so a read_until of 4MB would take 8 seconds; now it takes 0.25
         # seconds.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
             # This test fails on pypy with ssl.  I think it's because
             # pypy's gc defeats moves objects, breaking the
             # "frozen write buffer" assumption.
-            if (isinstance(server, SSLIOStream) and
+            if (isinstance(rs, SSLIOStream) and
                     platform.python_implementation() == 'PyPy'):
                 raise unittest.SkipTest(
                     "pypy gc causes problems with openssl")
             NUM_KB = 4096
             for i in range(NUM_KB):
-                client.write(b"A" * 1024)
-            client.write(b"\r\n")
-            server.read_until(b"\r\n", self.stop)
+                ws.write(b"A" * 1024)
+            ws.write(b"\r\n")
+            rs.read_until(b"\r\n", self.stop)
             data = self.wait()
             self.assertEqual(len(data), NUM_KB * 1024 + 2)
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_close_callback_with_pending_read(self):
         # Regression test for a bug that was introduced in 2.3
         # where the IOStream._close_callback would never be called
         # if there were pending reads.
         OK = b"OK\r\n"
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(self.stop)
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(self.stop)
         try:
-            server.write(OK)
-            client.read_until(b"\r\n", self.stop)
+            ws.write(OK)
+            rs.read_until(b"\r\n", self.stop)
             res = self.wait()
             self.assertEqual(res, OK)
 
-            server.close()
-            client.read_until(b"\r\n", lambda x: x)
+            ws.close()
+            rs.read_until(b"\r\n", lambda x: x)
             # If _close_callback (self.stop) is not called,
             # an AssertionError: Async operation timed out after 5 seconds
             # will be raised.
             res = self.wait()
             self.assertTrue(res is None)
         finally:
-            server.close()
-            client.close()
-
-    @skipIfNonUnix
-    @skipPypy3V58
-    def test_inline_read_error(self):
-        # An error on an inline read is raised without logging (on the
-        # assumption that it will eventually be noticed or logged further
-        # up the stack).
-        #
-        # This test is posix-only because windows os.close() doesn't work
-        # on socket FDs, but we can't close the socket object normally
-        # because we won't get the error we want if the socket knows
-        # it's closed.
-        server, client = self.make_iostream_pair()
-        try:
-            os.close(server.socket.fileno())
-            with self.assertRaises(socket.error):
-                server.read_bytes(1, lambda data: None)
-        finally:
-            server.close()
-            client.close()
-
-    @skipPypy3V58
-    def test_async_read_error_logging(self):
-        # Socket errors on asynchronous reads should be logged (but only
-        # once).
-        server, client = self.make_iostream_pair()
-        server.set_close_callback(self.stop)
-        try:
-            # Start a read that will be fulfilled asynchronously.
-            server.read_bytes(1, lambda data: None)
-            client.write(b'a')
-            # Stub out read_from_fd to make it fail.
-
-            def fake_read_from_fd():
-                os.close(server.socket.fileno())
-                server.__class__.read_from_fd(server)
-            server.read_from_fd = fake_read_from_fd
-            # This log message is from _handle_read (not read_from_fd).
-            with ExpectLog(gen_log, "error on read"):
-                self.wait()
-        finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_future_close_callback(self):
         # Regression test for interaction between the Future read interfaces
         # and IOStream._maybe_add_error_listener.
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         closed = [False]
 
         def close_callback():
             closed[0] = True
             self.stop()
-        server.set_close_callback(close_callback)
+        rs.set_close_callback(close_callback)
         try:
-            client.write(b'a')
-            future = server.read_bytes(1)
+            ws.write(b'a')
+            future = rs.read_bytes(1)
             self.io_loop.add_future(future, self.stop)
             self.assertEqual(self.wait().result(), b'a')
             self.assertFalse(closed[0])
-            client.close()
+            ws.close()
             self.wait()
             self.assertTrue(closed[0])
         finally:
-            server.close()
-            client.close()
+            rs.close()
+            ws.close()
 
     def test_write_memoryview(self):
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
-            client.read_bytes(4, self.stop)
-            server.write(memoryview(b"hello"))
+            rs.read_bytes(4, self.stop)
+            ws.write(memoryview(b"hello"))
             data = self.wait()
             self.assertEqual(data, b"hell")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_bytes_partial(self):
-        server, client = self.make_iostream_pair()
+        rs, ws = self.make_iostream_pair()
         try:
             # Ask for more than is available with partial=True
-            client.read_bytes(50, self.stop, partial=True)
-            server.write(b"hello")
+            rs.read_bytes(50, self.stop, partial=True)
+            ws.write(b"hello")
             data = self.wait()
             self.assertEqual(data, b"hello")
 
             # Ask for less than what is available; num_bytes is still
             # respected.
-            client.read_bytes(3, self.stop, partial=True)
-            server.write(b"world")
+            rs.read_bytes(3, self.stop, partial=True)
+            ws.write(b"world")
             data = self.wait()
             self.assertEqual(data, b"wor")
 
             # Partial reads won't return an empty string, but read_bytes(0)
             # will.
-            client.read_bytes(0, self.stop, partial=True)
+            rs.read_bytes(0, self.stop, partial=True)
             data = self.wait()
             self.assertEqual(data, b'')
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_max_bytes(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Extra room under the limit
-            client.read_until(b"def", self.stop, max_bytes=50)
-            server.write(b"abcdef")
+            rs.read_until(b"def", self.stop, max_bytes=50)
+            ws.write(b"abcdef")
             data = self.wait()
             self.assertEqual(data, b"abcdef")
 
             # Just enough space
-            client.read_until(b"def", self.stop, max_bytes=6)
-            server.write(b"abcdef")
+            rs.read_until(b"def", self.stop, max_bytes=6)
+            ws.write(b"abcdef")
             data = self.wait()
             self.assertEqual(data, b"abcdef")
 
             # Not enough space, but we don't know it until all we can do is
             # log a warning and close the connection.
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until(b"def", self.stop, max_bytes=5)
-                server.write(b"123456")
+                rs.read_until(b"def", self.stop, max_bytes=5)
+                ws.write(b"123456")
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_max_bytes_inline(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Similar to the error case in the previous test, but the
-            # server writes first so client reads are satisfied
+            # ws writes first so rs reads are satisfied
             # inline.  For consistency with the out-of-line case, we
             # do not raise the error synchronously.
-            server.write(b"123456")
+            ws.write(b"123456")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until(b"def", self.stop, max_bytes=5)
+                rs.read_until(b"def", self.stop, max_bytes=5)
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_max_bytes_ignores_extra(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Even though data that matches arrives the same packet that
             # puts us over the limit, we fail the request because it was not
             # found within the limit.
-            server.write(b"abcdef")
+            ws.write(b"abcdef")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until(b"def", self.stop, max_bytes=5)
+                rs.read_until(b"def", self.stop, max_bytes=5)
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_regex_max_bytes(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Extra room under the limit
-            client.read_until_regex(b"def", self.stop, max_bytes=50)
-            server.write(b"abcdef")
+            rs.read_until_regex(b"def", self.stop, max_bytes=50)
+            ws.write(b"abcdef")
             data = self.wait()
             self.assertEqual(data, b"abcdef")
 
             # Just enough space
-            client.read_until_regex(b"def", self.stop, max_bytes=6)
-            server.write(b"abcdef")
+            rs.read_until_regex(b"def", self.stop, max_bytes=6)
+            ws.write(b"abcdef")
             data = self.wait()
             self.assertEqual(data, b"abcdef")
 
             # Not enough space, but we don't know it until all we can do is
             # log a warning and close the connection.
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until_regex(b"def", self.stop, max_bytes=5)
-                server.write(b"123456")
+                rs.read_until_regex(b"def", self.stop, max_bytes=5)
+                ws.write(b"123456")
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_regex_max_bytes_inline(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Similar to the error case in the previous test, but the
-            # server writes first so client reads are satisfied
+            # ws writes first so rs reads are satisfied
             # inline.  For consistency with the out-of-line case, we
             # do not raise the error synchronously.
-            server.write(b"123456")
+            ws.write(b"123456")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until_regex(b"def", self.stop, max_bytes=5)
+                rs.read_until_regex(b"def", self.stop, max_bytes=5)
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_read_until_regex_max_bytes_ignores_extra(self):
-        server, client = self.make_iostream_pair()
-        client.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = self.make_iostream_pair()
+        rs.set_close_callback(lambda: self.stop("closed"))
         try:
             # Even though data that matches arrives the same packet that
             # puts us over the limit, we fail the request because it was not
             # found within the limit.
-            server.write(b"abcdef")
+            ws.write(b"abcdef")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                client.read_until_regex(b"def", self.stop, max_bytes=5)
+                rs.read_until_regex(b"def", self.stop, max_bytes=5)
                 data = self.wait()
             self.assertEqual(data, "closed")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_small_reads_from_large_buffer(self):
         # 10KB buffer size, 100KB available to read.
         # Read 1KB at a time and make sure that the buffer is not eagerly
         # filled.
-        server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
         try:
-            server.write(b"a" * 1024 * 100)
+            ws.write(b"a" * 1024 * 100)
             for i in range(100):
-                client.read_bytes(1024, self.stop)
+                rs.read_bytes(1024, self.stop)
                 data = self.wait()
                 self.assertEqual(data, b"a" * 1024)
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_small_read_untils_from_large_buffer(self):
         # 10KB buffer size, 100KB available to read.
         # Read 1KB at a time and make sure that the buffer is not eagerly
         # filled.
-        server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
         try:
-            server.write((b"a" * 1023 + b"\n") * 100)
+            ws.write((b"a" * 1023 + b"\n") * 100)
             for i in range(100):
-                client.read_until(b"\n", self.stop, max_bytes=4096)
+                rs.read_until(b"\n", self.stop, max_bytes=4096)
                 data = self.wait()
                 self.assertEqual(data, b"a" * 1023 + b"\n")
         finally:
-            server.close()
-            client.close()
+            ws.close()
+            rs.close()
 
     def test_flow_control(self):
         MB = 1024 * 1024
-        server, client = self.make_iostream_pair(max_buffer_size=5 * MB)
+        rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB)
         try:
-            # Client writes more than the server will accept.
-            client.write(b"a" * 10 * MB)
-            # The server pauses while reading.
-            server.read_bytes(MB, self.stop)
+            # Client writes more than the rs will accept.
+            ws.write(b"a" * 10 * MB)
+            # The rs pauses while reading.
+            rs.read_bytes(MB, self.stop)
             self.wait()
             self.io_loop.call_later(0.1, self.stop)
             self.wait()
-            # The client's writes have been blocked; the server can
+            # The ws's writes have been blocked; the rs can
             # continue to read gradually.
             for i in range(9):
-                server.read_bytes(MB, self.stop)
+                rs.read_bytes(MB, self.stop)
+                self.wait()
+        finally:
+            rs.close()
+            ws.close()
+
+
+class TestIOStreamMixin(TestReadWriteMixin):
+    def _make_server_iostream(self, connection, **kwargs):
+        raise NotImplementedError()
+
+    def _make_client_iostream(self, connection, **kwargs):
+        raise NotImplementedError()
+
+    def make_iostream_pair(self, **kwargs):
+        listener, port = bind_unused_port()
+        streams = [None, None]
+
+        def accept_callback(connection, address):
+            streams[0] = self._make_server_iostream(connection, **kwargs)
+            self.stop()
+
+        def connect_callback():
+            streams[1] = client_stream
+            self.stop()
+        netutil.add_accept_handler(listener, accept_callback)
+        client_stream = self._make_client_iostream(socket.socket(), **kwargs)
+        client_stream.connect(('127.0.0.1', port),
+                              callback=connect_callback)
+        self.wait(condition=lambda: all(streams))
+        self.io_loop.remove_handler(listener.fileno())
+        listener.close()
+        return streams
+
+    def test_connection_refused(self):
+        # When a connection is refused, the connect callback should not
+        # be run.  (The kqueue IOLoop used to behave differently from the
+        # epoll IOLoop in this respect)
+        cleanup_func, port = refusing_port()
+        self.addCleanup(cleanup_func)
+        stream = IOStream(socket.socket())
+        self.connect_called = False
+
+        def connect_callback():
+            self.connect_called = True
+            self.stop()
+        stream.set_close_callback(self.stop)
+        # log messages vary by platform and ioloop implementation
+        with ExpectLog(gen_log, ".*", required=False):
+            stream.connect(("127.0.0.1", port), connect_callback)
+            self.wait()
+        self.assertFalse(self.connect_called)
+        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
+        if sys.platform != 'cygwin':
+            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
+            if hasattr(errno, "WSAECONNREFUSED"):
+                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
+            # cygwin's errnos don't match those used on native windows python
+            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
+
+    @unittest.skipIf(mock is None, 'mock package not present')
+    def test_gaierror(self):
+        # Test that IOStream sets its exc_info on getaddrinfo error.
+        # It's difficult to reliably trigger a getaddrinfo error;
+        # some resolvers own't even return errors for malformed names,
+        # so we mock it instead. If IOStream changes to call a Resolver
+        # before sock.connect, the mock target will need to change too.
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        stream = IOStream(s)
+        stream.set_close_callback(self.stop)
+        with mock.patch('socket.socket.connect',
+                        side_effect=socket.gaierror(errno.EIO, 'boom')):
+            with ExpectLog(gen_log, "Connect error"):
+                stream.connect(('localhost', 80), callback=self.stop)
+                self.wait()
+                self.assertIsInstance(stream.error, socket.gaierror)
+
+    def test_read_callback_error(self):
+        # Test that IOStream sets its exc_info when a read callback throws
+        server, client = self.make_iostream_pair()
+        try:
+            server.set_close_callback(self.stop)
+            with ExpectLog(
+                app_log, "(Uncaught exception|Exception in callback)"
+            ):
+                # Clear ExceptionStackContext so IOStream catches error
+                with NullContext():
+                    server.read_bytes(1, callback=lambda data: 1 / 0)
+                client.write(b"1")
+                self.wait()
+            self.assertTrue(isinstance(server.error, ZeroDivisionError))
+        finally:
+            server.close()
+            client.close()
+
+    @unittest.skipIf(mock is None, 'mock package not present')
+    def test_read_until_close_with_error(self):
+        server, client = self.make_iostream_pair()
+        try:
+            with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
+                            side_effect=IOError('boom')):
+                with self.assertRaisesRegexp(IOError, 'boom'):
+                    client.read_until_close(self.stop)
+        finally:
+            server.close()
+            client.close()
+
+    @skipIfNonUnix
+    @skipPypy3V58
+    def test_inline_read_error(self):
+        # An error on an inline read is raised without logging (on the
+        # assumption that it will eventually be noticed or logged further
+        # up the stack).
+        #
+        # This test is posix-only because windows os.close() doesn't work
+        # on socket FDs, but we can't close the socket object normally
+        # because we won't get the error we want if the socket knows
+        # it's closed.
+        server, client = self.make_iostream_pair()
+        try:
+            os.close(server.socket.fileno())
+            with self.assertRaises(socket.error):
+                server.read_bytes(1, lambda data: None)
+        finally:
+            server.close()
+            client.close()
+
+    @skipPypy3V58
+    def test_async_read_error_logging(self):
+        # Socket errors on asynchronous reads should be logged (but only
+        # once).
+        server, client = self.make_iostream_pair()
+        server.set_close_callback(self.stop)
+        try:
+            # Start a read that will be fulfilled asynchronously.
+            server.read_bytes(1, lambda data: None)
+            client.write(b'a')
+            # Stub out read_from_fd to make it fail.
+
+            def fake_read_from_fd():
+                os.close(server.socket.fileno())
+                server.__class__.read_from_fd(server)
+            server.read_from_fd = fake_read_from_fd
+            # This log message is from _handle_read (not read_from_fd).
+            with ExpectLog(gen_log, "error on read"):
                 self.wait()
         finally:
             server.close()
@@ -1094,12 +1102,14 @@ class WaitForHandshakeTest(AsyncTestCase):
 
 
 @skipIfNonUnix
-class TestPipeIOStream(AsyncTestCase):
-    def test_pipe_iostream(self):
+class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
+    def make_iostream_pair(self, **kwargs):
         r, w = os.pipe()
 
-        rs = PipeIOStream(r)
-        ws = PipeIOStream(w)
+        return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
+
+    def test_pipe_iostream(self):
+        rs, ws = self.make_iostream_pair()
 
         ws.write(b"hel")
         ws.write(b"lo world")
@@ -1121,10 +1131,7 @@ class TestPipeIOStream(AsyncTestCase):
         rs.close()
 
     def test_pipe_iostream_big_write(self):
-        r, w = os.pipe()
-
-        rs = PipeIOStream(r)
-        ws = PipeIOStream(w)
+        rs, ws = self.make_iostream_pair()
 
         NUM_BYTES = 1048576