]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
iostream: Deprecate callback arguments to read methods
authorBen Darnell <ben@bendarnell.com>
Sun, 22 Apr 2018 01:00:54 +0000 (21:00 -0400)
committerBen Darnell <ben@bendarnell.com>
Mon, 23 Apr 2018 01:21:00 +0000 (21:21 -0400)
This requires a lot of mechanical changes throughout the test suite.
tornado.websocket is currently still using the deprecated interfaces
and suppressing the warning.

tornado/iostream.py
tornado/test/concurrent_test.py
tornado/test/httpclient_test.py
tornado/test/httpserver_test.py
tornado/test/iostream_test.py
tornado/test/process_test.py
tornado/websocket.py

index ad50be552f8c8eec8cb4e43df85e091d4ba5e3aa..f7f7e3c83ad8b445e10f2ed9dcb29544219310c9 100644 (file)
@@ -33,6 +33,7 @@ import os
 import socket
 import sys
 import re
+import warnings
 
 from tornado.concurrent import Future
 from tornado import ioloop
@@ -342,6 +343,12 @@ class BaseIOStream(object):
         .. versionchanged:: 4.0
             Added the ``max_bytes`` argument.  The ``callback`` argument is
             now optional and a `.Future` will be returned if it is omitted.
+
+        .. deprecated:: 5.1
+
+           The ``callback`` argument is deprecated and will be removed
+           in Tornado 6.0. Use the returned `.Future` instead.
+
         """
         future = self._set_read_callback(callback)
         self._read_regex = re.compile(regex)
@@ -375,6 +382,11 @@ class BaseIOStream(object):
         .. versionchanged:: 4.0
             Added the ``max_bytes`` argument.  The ``callback`` argument is
             now optional and a `.Future` will be returned if it is omitted.
+
+        .. deprecated:: 5.1
+
+           The ``callback`` argument is deprecated and will be removed
+           in Tornado 6.0. Use the returned `.Future` instead.
         """
         future = self._set_read_callback(callback)
         self._read_delimiter = delimiter
@@ -408,6 +420,12 @@ class BaseIOStream(object):
         .. versionchanged:: 4.0
             Added the ``partial`` argument.  The callback argument is now
             optional and a `.Future` will be returned if it is omitted.
+
+        .. deprecated:: 5.1
+
+           The ``callback`` argument is deprecated and will be removed
+           in Tornado 6.0. Use the returned `.Future` instead.
+
         """
         future = self._set_read_callback(callback)
         assert isinstance(num_bytes, numbers.Integral)
@@ -434,6 +452,12 @@ class BaseIOStream(object):
         entirely filled with read data.
 
         .. versionadded:: 5.0
+
+        .. deprecated:: 5.1
+
+           The ``callback`` argument is deprecated and will be removed
+           in Tornado 6.0. Use the returned `.Future` instead.
+
         """
         future = self._set_read_callback(callback)
 
@@ -485,6 +509,11 @@ class BaseIOStream(object):
             The callback argument is now optional and a `.Future` will
             be returned if it is omitted.
 
+        .. deprecated:: 5.1
+
+           The ``callback`` argument is deprecated and will be removed
+           in Tornado 6.0. Use the returned `.Future` instead.
+
         """
         future = self._set_read_callback(callback)
         self._streaming_callback = stack_context.wrap(streaming_callback)
@@ -807,6 +836,8 @@ class BaseIOStream(object):
         assert self._read_callback is None, "Already reading"
         assert self._read_future is None, "Already reading"
         if callback is not None:
+            warnings.warn("callbacks are deprecated, use returned Future instead",
+                          DeprecationWarning)
             self._read_callback = stack_context.wrap(callback)
         else:
             self._read_future = Future()
index ea147910380ac941bd193f3785695d6ac3ee9982..1df0532cc5eedc75c13f037f03598de9872a1d18 100644 (file)
@@ -20,6 +20,7 @@ import re
 import socket
 import sys
 import traceback
+import warnings
 
 from tornado.concurrent import (Future, return_future, ReturnValueIgnoredError,
                                 run_on_executor, future_set_result_unless_cancelled)
@@ -249,20 +250,16 @@ class ReturnFutureTest(AsyncTestCase):
 
 
 class CapServer(TCPServer):
+    @gen.coroutine
     def handle_stream(self, stream, address):
-        logging.debug("handle_stream")
-        self.stream = stream
-        self.stream.read_until(b"\n", self.handle_read)
-
-    def handle_read(self, data):
-        logging.debug("handle_read")
+        data = yield stream.read_until(b"\n")
         data = to_unicode(data)
         if data == data.upper():
-            self.stream.write(b"error\talready capitalized\n")
+            stream.write(b"error\talready capitalized\n")
         else:
             # data already has \n
-            self.stream.write(utf8("ok\t%s" % data.upper()))
-        self.stream.close()
+            stream.write(utf8("ok\t%s" % data.upper()))
+        stream.close()
 
 
 class CapError(Exception):
@@ -397,10 +394,30 @@ class ClientTestMixin(object):
 class ManualClientTest(ClientTestMixin, AsyncTestCase):
     client_class = ManualCapClient
 
+    def setUp(self):
+        self.warning_catcher = warnings.catch_warnings()
+        self.warning_catcher.__enter__()
+        warnings.simplefilter('ignore', DeprecationWarning)
+        super(ManualClientTest, self).setUp()
+
+    def tearDown(self):
+        super(ManualClientTest, self).tearDown()
+        self.warning_catcher.__exit__(None, None, None)
+
 
 class DecoratorClientTest(ClientTestMixin, AsyncTestCase):
     client_class = DecoratorCapClient
 
+    def setUp(self):
+        self.warning_catcher = warnings.catch_warnings()
+        self.warning_catcher.__enter__()
+        warnings.simplefilter('ignore', DeprecationWarning)
+        super(DecoratorClientTest, self).setUp()
+
+    def tearDown(self):
+        super(DecoratorClientTest, self).tearDown()
+        self.warning_catcher.__exit__(None, None, None)
+
 
 class GeneratorClientTest(ClientTestMixin, AsyncTestCase):
     client_class = GeneratorCapClient
index f980b48cdbaa4d0fc9d3d2757f9c188bae86ebfe..db0492de6dceb8163229859c36160827a15272de 100644 (file)
@@ -4,7 +4,6 @@ import base64
 import binascii
 from contextlib import closing
 import copy
-import functools
 import sys
 import threading
 import datetime
@@ -190,7 +189,12 @@ class HTTPClientCommonTestCase(AsyncHTTPTestCase):
         # over several ioloop iterations, but the connection is already closed.
         sock, port = bind_unused_port()
         with closing(sock):
-            def write_response(stream, request_data):
+            @gen.coroutine
+            def accept_callback(conn, address):
+                # fake an HTTP server using chunked encoding where the final chunks
+                # and connection close all happen at once
+                stream = IOStream(conn)
+                request_data = yield stream.read_until(b"\r\n\r\n")
                 if b"HTTP/1." not in request_data:
                     self.skipTest("requires HTTP/1.x")
                 stream.write(b"""\
@@ -204,13 +208,6 @@ Transfer-Encoding: chunked
 0
 
 """.replace(b"\n", b"\r\n"), callback=stream.close)
-
-            def accept_callback(conn, address):
-                # fake an HTTP server using chunked encoding where the final chunks
-                # and connection close all happen at once
-                stream = IOStream(conn)
-                stream.read_until(b"\r\n\r\n",
-                                  functools.partial(write_response, stream))
             netutil.add_accept_handler(sock, accept_callback)
             resp = self.fetch("http://127.0.0.1:%d/" % port)
             resp.rethrow()
@@ -383,7 +380,10 @@ Transfer-Encoding: chunked
         # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
         sock, port = bind_unused_port()
         with closing(sock):
-            def write_response(stream, request_data):
+            @gen.coroutine
+            def accept_callback(conn, address):
+                stream = IOStream(conn)
+                request_data = yield stream.read_until(b"\r\n\r\n")
                 if b"HTTP/1." not in request_data:
                     self.skipTest("requires HTTP/1.x")
                 stream.write(b"""\
@@ -393,10 +393,6 @@ X-XSS-Protection: 1;
 
 """.replace(b"\n", b"\r\n"), callback=stream.close)
 
-            def accept_callback(conn, address):
-                stream = IOStream(conn)
-                stream.read_until(b"\r\n\r\n",
-                                  functools.partial(write_response, stream))
             netutil.add_accept_handler(sock, accept_callback)
             resp = self.fetch("http://127.0.0.1:%d/" % port)
             resp.rethrow()
index 155dbebe8ecf15a318fbe9f0f02d7bf6ff1819c8..19bfabb39faffa28bc5d8a814048ad171936c8bf 100644 (file)
@@ -1,6 +1,6 @@
 from __future__ import absolute_import, division, print_function
 
-from tornado import netutil
+from tornado import gen, netutil
 from tornado.concurrent import Future
 from tornado.escape import json_decode, json_encode, utf8, _unicode, recursive_unicode, native_str
 from tornado.http1connection import HTTP1Connection
@@ -8,6 +8,7 @@ from tornado.httpclient import HTTPError
 from tornado.httpserver import HTTPServer
 from tornado.httputil import HTTPHeaders, HTTPMessageDelegate, HTTPServerConnectionDelegate, ResponseStartLine  # noqa: E501
 from tornado.iostream import IOStream
+from tornado.locks import Event
 from tornado.log import gen_log
 from tornado.netutil import ssl_options_to_context
 from tornado.simple_httpclient import SimpleAsyncHTTPClient
@@ -250,31 +251,27 @@ class HTTPConnectionTest(AsyncHTTPTestCase):
                                       newline=newline)
             self.assertEqual(response, b'Hello world')
 
+    @gen_test
     def test_100_continue(self):
         # Run through a 100-continue interaction by hand:
         # When given Expect: 100-continue, we get a 100 response after the
         # headers, and then the real response after the body.
         stream = IOStream(socket.socket())
-        stream.connect(("127.0.0.1", self.get_http_port()), callback=self.stop)
-        self.wait()
-        stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
-                                   b"Content-Length: 1024",
-                                   b"Expect: 100-continue",
-                                   b"Connection: close",
-                                   b"\r\n"]), callback=self.stop)
-        self.wait()
-        stream.read_until(b"\r\n\r\n", self.stop)
-        data = self.wait()
+        yield stream.connect(("127.0.0.1", self.get_http_port()))
+        yield stream.write(b"\r\n".join([
+            b"POST /hello HTTP/1.1",
+            b"Content-Length: 1024",
+            b"Expect: 100-continue",
+            b"Connection: close",
+            b"\r\n"]))
+        data = yield stream.read_until(b"\r\n\r\n")
         self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
         stream.write(b"a" * 1024)
-        stream.read_until(b"\r\n", self.stop)
-        first_line = self.wait()
+        first_line = yield stream.read_until(b"\r\n")
         self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
-        stream.read_until(b"\r\n\r\n", self.stop)
-        header_data = self.wait()
+        header_data = yield stream.read_until(b"\r\n\r\n")
         headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
-        stream.read_bytes(int(headers["Content-Length"]), self.stop)
-        body = self.wait()
+        body = yield stream.read_bytes(int(headers["Content-Length"]))
         self.assertEqual(body, b"Got 1024 bytes in POST")
         stream.close()
 
@@ -467,6 +464,7 @@ bar
         start_line, headers, response = self.wait()
         self.assertEqual(json_decode(response), {u'foo': [u'bar']})
 
+    @gen_test
     def test_invalid_content_length(self):
         with ExpectLog(gen_log, '.*Only integer Content-Length is allowed'):
             self.stream.write(b"""\
@@ -476,8 +474,7 @@ Content-Length: foo
 bar
 
 """.replace(b"\n", b"\r\n"))
-            self.stream.read_until_close(self.stop)
-            self.wait()
+            yield self.stream.read_until_close()
 
 
 class XHeaderTest(HandlerBaseTestCase):
@@ -631,24 +628,23 @@ class UnixSocketTest(AsyncTestCase):
         shutil.rmtree(self.tmpdir)
         super(UnixSocketTest, self).tearDown()
 
+    @gen_test
     def test_unix_socket(self):
         self.stream.write(b"GET /hello HTTP/1.0\r\n\r\n")
-        self.stream.read_until(b"\r\n", self.stop)
-        response = self.wait()
+        response = yield self.stream.read_until(b"\r\n")
         self.assertEqual(response, b"HTTP/1.1 200 OK\r\n")
-        self.stream.read_until(b"\r\n\r\n", self.stop)
-        headers = HTTPHeaders.parse(self.wait().decode('latin1'))
-        self.stream.read_bytes(int(headers["Content-Length"]), self.stop)
-        body = self.wait()
+        header_data = yield self.stream.read_until(b"\r\n\r\n")
+        headers = HTTPHeaders.parse(header_data.decode('latin1'))
+        body = yield self.stream.read_bytes(int(headers["Content-Length"]))
         self.assertEqual(body, b"Hello world")
 
+    @gen_test
     def test_unix_socket_bad_request(self):
         # Unix sockets don't have remote addresses so they just return an
         # empty string.
         with ExpectLog(gen_log, "Malformed HTTP message from"):
             self.stream.write(b"garbage\r\n\r\n")
-            self.stream.read_until_close(self.stop)
-            response = self.wait()
+            response = yield self.stream.read_until_close()
         self.assertEqual(response, b"HTTP/1.1 400 Bad Request\r\n\r\n")
 
 
@@ -702,123 +698,129 @@ class KeepAliveTest(AsyncHTTPTestCase):
         super(KeepAliveTest, self).tearDown()
 
     # The next few methods are a crude manual http client
+    @gen.coroutine
     def connect(self):
         self.stream = IOStream(socket.socket())
-        self.stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
-        self.wait()
+        yield self.stream.connect(('127.0.0.1', self.get_http_port()))
 
+    @gen.coroutine
     def read_headers(self):
-        self.stream.read_until(b'\r\n', self.stop)
-        first_line = self.wait()
+        first_line = yield self.stream.read_until(b'\r\n')
         self.assertTrue(first_line.startswith(b'HTTP/1.1 200'), first_line)
-        self.stream.read_until(b'\r\n\r\n', self.stop)
-        header_bytes = self.wait()
+        header_bytes = yield self.stream.read_until(b'\r\n\r\n')
         headers = HTTPHeaders.parse(header_bytes.decode('latin1'))
-        return headers
+        raise gen.Return(headers)
 
+    @gen.coroutine
     def read_response(self):
-        self.headers = self.read_headers()
-        self.stream.read_bytes(int(self.headers['Content-Length']), self.stop)
-        body = self.wait()
+        self.headers = yield self.read_headers()
+        body = yield self.stream.read_bytes(int(self.headers['Content-Length']))
         self.assertEqual(b'Hello world', body)
 
     def close(self):
         self.stream.close()
         del self.stream
 
+    @gen_test
     def test_two_requests(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.close()
 
+    @gen_test
     def test_request_close(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n')
-        self.read_response()
-        self.stream.read_until_close(callback=self.stop)
-        data = self.wait()
+        yield self.read_response()
+        data = yield self.stream.read_until_close()
         self.assertTrue(not data)
         self.assertEqual(self.headers['Connection'], 'close')
         self.close()
 
     # keepalive is supported for http 1.0 too, but it's opt-in
+    @gen_test
     def test_http10(self):
         self.http_version = b'HTTP/1.0'
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.0\r\n\r\n')
-        self.read_response()
-        self.stream.read_until_close(callback=self.stop)
-        data = self.wait()
+        yield self.read_response()
+        data = yield self.stream.read_until_close()
         self.assertTrue(not data)
         self.assertTrue('Connection' not in self.headers)
         self.close()
 
+    @gen_test
     def test_http10_keepalive(self):
         self.http_version = b'HTTP/1.0'
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.close()
 
+    @gen_test
     def test_http10_keepalive_extra_crlf(self):
         self.http_version = b'HTTP/1.0'
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.close()
 
+    @gen_test
     def test_pipelined_requests(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
-        self.read_response()
-        self.read_response()
+        yield self.read_response()
+        yield self.read_response()
         self.close()
 
+    @gen_test
     def test_pipelined_cancel(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
         # only read once
-        self.read_response()
+        yield self.read_response()
         self.close()
 
+    @gen_test
     def test_cancel_during_download(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET /large HTTP/1.1\r\n\r\n')
-        self.read_headers()
-        self.stream.read_bytes(1024, self.stop)
-        self.wait()
+        yield self.read_headers()
+        yield self.stream.read_bytes(1024)
         self.close()
 
+    @gen_test
     def test_finish_while_closed(self):
-        self.connect()
+        yield self.connect()
         self.stream.write(b'GET /finish_on_close HTTP/1.1\r\n\r\n')
-        self.read_headers()
+        yield self.read_headers()
         self.close()
 
+    @gen_test
     def test_keepalive_chunked(self):
         self.http_version = b'HTTP/1.0'
-        self.connect()
+        yield self.connect()
         self.stream.write(b'POST / HTTP/1.0\r\n'
                           b'Connection: keep-alive\r\n'
                           b'Transfer-Encoding: chunked\r\n'
                           b'\r\n'
                           b'0\r\n'
                           b'\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
-        self.read_response()
+        yield self.read_response()
         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
         self.close()
 
@@ -990,34 +992,35 @@ class IdleTimeoutTest(AsyncHTTPTestCase):
         for stream in self.streams:
             stream.close()
 
+    @gen.coroutine
     def connect(self):
         stream = IOStream(socket.socket())
-        stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
-        self.wait()
+        yield stream.connect(('127.0.0.1', self.get_http_port()))
         self.streams.append(stream)
-        return stream
+        raise gen.Return(stream)
 
+    @gen_test
     def test_unused_connection(self):
-        stream = self.connect()
-        stream.set_close_callback(self.stop)
-        self.wait()
+        stream = yield self.connect()
+        event = Event()
+        stream.set_close_callback(event.set)
+        yield event.wait()
 
+    @gen_test
     def test_idle_after_use(self):
-        stream = self.connect()
-        stream.set_close_callback(lambda: self.stop("closed"))
+        stream = yield self.connect()
+        event = Event()
+        stream.set_close_callback(event.set)
 
         # Use the connection twice to make sure keep-alives are working
         for i in range(2):
             stream.write(b"GET / HTTP/1.1\r\n\r\n")
-            stream.read_until(b"\r\n\r\n", self.stop)
-            self.wait()
-            stream.read_bytes(11, self.stop)
-            data = self.wait()
+            yield stream.read_until(b"\r\n\r\n")
+            data = yield stream.read_bytes(11)
             self.assertEqual(data, b"Hello world")
 
         # Now let the timeout trigger and close the connection.
-        data = self.wait()
-        self.assertEqual(data, "closed")
+        yield event.wait()
 
 
 class BodyLimitsTest(AsyncHTTPTestCase):
index 45799db2ff7e1ddfbe9511c888b4d60c82f1d024..200a3a0c3c0843d14741b45d1ae8ee53897021a2 100644 (file)
@@ -4,12 +4,14 @@ from tornado import gen
 from tornado import netutil
 from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer
 from tornado.httputil import HTTPHeaders
+from tornado.locks import Condition, Event
 from tornado.log import gen_log, app_log
 from tornado.netutil import ssl_wrap_socket
 from tornado.stack_context import NullContext
 from tornado.tcpserver import TCPServer
 from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test  # noqa: E501
-from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58
+from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58,
+                               ignore_deprecation)
 from tornado.web import RequestHandler, Application
 import errno
 import hashlib
@@ -59,48 +61,45 @@ class TestIOStreamWebMixin(object):
         response = self.fetch("/", headers={"Connection": "close"})
         response.rethrow()
 
+    @gen_test
     def test_read_until_close(self):
         stream = self._make_client_iostream()
-        stream.connect(('127.0.0.1', self.get_http_port()), callback=self.stop)
-        self.wait()
+        yield stream.connect(('127.0.0.1', self.get_http_port()))
         stream.write(b"GET / HTTP/1.0\r\n\r\n")
 
-        stream.read_until_close(self.stop)
-        data = self.wait()
+        data = yield stream.read_until_close()
         self.assertTrue(data.startswith(b"HTTP/1.1 200"))
         self.assertTrue(data.endswith(b"Hello"))
 
+    @gen_test
     def test_read_zero_bytes(self):
         self.stream = self._make_client_iostream()
-        self.stream.connect(("127.0.0.1", self.get_http_port()),
-                            callback=self.stop)
-        self.wait()
+        yield self.stream.connect(("127.0.0.1", self.get_http_port()))
         self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
 
         # normal read
-        self.stream.read_bytes(9, self.stop)
-        data = self.wait()
+        data = yield self.stream.read_bytes(9)
         self.assertEqual(data, b"HTTP/1.1 ")
 
         # zero bytes
-        self.stream.read_bytes(0, self.stop)
-        data = self.wait()
+        data = yield self.stream.read_bytes(0)
         self.assertEqual(data, b"")
 
         # another normal read
-        self.stream.read_bytes(3, self.stop)
-        data = self.wait()
+        data = yield self.stream.read_bytes(3)
         self.assertEqual(data, b"200")
 
         self.stream.close()
 
+    @gen_test
     def test_write_while_connecting(self):
         stream = self._make_client_iostream()
         connected = [False]
+        cond = Condition()
 
         def connected_callback():
             connected[0] = True
-            self.stop()
+            cond.notify()
         stream.connect(("127.0.0.1", self.get_http_port()),
                        callback=connected_callback)
         # unlike the previous tests, try to write before the connection
@@ -109,19 +108,19 @@ class TestIOStreamWebMixin(object):
 
         def write_callback():
             written[0] = True
-            self.stop()
+            cond.notify()
         stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n",
                      callback=write_callback)
         self.assertTrue(not connected[0])
         # by the time the write has flushed, the connection callback has
         # also run
         try:
-            self.wait(lambda: connected[0] and written[0])
+            while not (connected[0] and written[0]):
+                yield cond.wait()
         finally:
             logging.debug((connected, written))
 
-        stream.read_until_close(self.stop)
-        data = self.wait()
+        data = yield stream.read_until_close()
         self.assertTrue(data.endswith(b"Hello"))
 
         stream.close()
@@ -176,91 +175,124 @@ class TestReadWriteMixin(object):
     def make_iostream_pair(self, **kwargs):
         raise NotImplementedError
 
+    @gen_test
     def test_write_zero_bytes(self):
         # Attempting to write zero bytes should run the callback without
         # going into an infinite loop.
-        rs, ws = self.make_iostream_pair()
-        ws.write(b'', callback=self.stop)
-        self.wait()
+        rs, ws = yield self.make_iostream_pair()
+        yield ws.write(b'')
         ws.close()
         rs.close()
 
+    @gen_test
     def test_streaming_callback(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
+        try:
+            chunks = []
+            cond = Condition()
+
+            def streaming_callback(data):
+                chunks.append(data)
+                cond.notify()
+
+            fut = rs.read_bytes(6, streaming_callback=streaming_callback)
+            ws.write(b"1234")
+            while not chunks:
+                yield cond.wait()
+            ws.write(b"5678")
+            final_data = yield(fut)
+            self.assertFalse(final_data)
+            self.assertEqual(chunks, [b"1234", b"56"])
+
+            # the rest of the last chunk is still in the buffer
+            data = yield rs.read_bytes(2)
+            self.assertEqual(data, b"78")
+        finally:
+            rs.close()
+            ws.close()
+
+    @gen_test
+    def test_streaming_callback_with_final_callback(self):
+        rs, ws = yield self.make_iostream_pair()
         try:
             chunks = []
             final_called = []
+            cond = Condition()
 
             def streaming_callback(data):
                 chunks.append(data)
-                self.stop()
+                cond.notify()
 
             def final_callback(data):
                 self.assertFalse(data)
                 final_called.append(True)
-                self.stop()
-            rs.read_bytes(6, callback=final_callback,
-                          streaming_callback=streaming_callback)
+                cond.notify()
+            with ignore_deprecation():
+                rs.read_bytes(6, callback=final_callback,
+                              streaming_callback=streaming_callback)
             ws.write(b"1234")
-            self.wait(condition=lambda: chunks)
+            while not chunks:
+                yield cond.wait()
             ws.write(b"5678")
-            self.wait(condition=lambda: final_called)
+            while not final_called:
+                yield cond.wait()
             self.assertEqual(chunks, [b"1234", b"56"])
 
             # the rest of the last chunk is still in the buffer
-            rs.read_bytes(2, callback=self.stop)
-            data = self.wait()
+            data = yield rs.read_bytes(2)
             self.assertEqual(data, b"78")
         finally:
             rs.close()
             ws.close()
 
+    @gen_test
     def test_streaming_callback_with_data_in_buffer(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         ws.write(b"abcd\r\nefgh")
-        rs.read_until(b"\r\n", self.stop)
-        data = self.wait()
+        data = yield rs.read_until(b"\r\n")
         self.assertEqual(data, b"abcd\r\n")
 
-        def closed_callback(chunk):
-            self.fail()
-        rs.read_until_close(callback=closed_callback,
-                            streaming_callback=self.stop)
-        #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
-        data = self.wait()
+        streaming_fut = Future()
+        rs.read_until_close(streaming_callback=streaming_fut.set_result)
+        data = yield streaming_fut
         self.assertEqual(data, b"efgh")
         rs.close()
         ws.close()
 
+    @gen_test
     def test_streaming_until_close(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             chunks = []
             closed = [False]
+            cond = Condition()
 
             def streaming_callback(data):
                 chunks.append(data)
-                self.stop()
+                cond.notify()
 
             def close_callback(data):
                 assert not data, data
                 closed[0] = True
-                self.stop()
-            rs.read_until_close(callback=close_callback,
-                                streaming_callback=streaming_callback)
+                cond.notify()
+            with ignore_deprecation():
+                rs.read_until_close(callback=close_callback,
+                                    streaming_callback=streaming_callback)
             ws.write(b"1234")
-            self.wait(condition=lambda: len(chunks) == 1)
-            ws.write(b"5678", self.stop)
-            self.wait()
+            while len(chunks) != 1:
+                yield cond.wait()
+            yield ws.write(b"5678")
             ws.close()
-            self.wait(condition=lambda: closed[0])
+            while not closed[0]:
+                yield cond.wait()
             self.assertEqual(chunks, [b"1234", b"5678"])
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_streaming_until_close_future(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             chunks = []
 
@@ -275,60 +307,58 @@ class TestReadWriteMixin(object):
                 yield ws.write(b"5678")
                 ws.close()
 
-            @gen.coroutine
-            def f():
-                yield [rs_task(), ws_task()]
-            self.io_loop.run_sync(f)
+            yield [rs_task(), ws_task()]
             self.assertEqual(chunks, [b"1234", b"5678"])
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_delayed_close_callback(self):
         # The scenario:  Server closes the connection while there is a pending
         # read that can be served out of buffered data.  The client does not
         # run the close_callback as soon as it detects the close, but rather
         # defers it until after the buffered read has finished.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
-            rs.set_close_callback(self.stop)
+            event = Event()
+            rs.set_close_callback(event.set)
             ws.write(b"12")
             chunks = []
 
             def callback1(data):
                 chunks.append(data)
-                rs.read_bytes(1, callback2)
+                with ignore_deprecation():
+                    rs.read_bytes(1, callback2)
                 ws.close()
 
             def callback2(data):
                 chunks.append(data)
-            rs.read_bytes(1, callback1)
-            self.wait()  # stopped by close_callback
+            with ignore_deprecation():
+                rs.read_bytes(1, callback1)
+            yield event.wait()  # stopped by close_callback
             self.assertEqual(chunks, [b"1", b"2"])
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_future_delayed_close_callback(self):
         # Same as test_delayed_close_callback, but with the future interface.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
-        # We can't call make_iostream_pair inside a gen_test function
-        # because the ioloop is not reentrant.
-        @gen_test
-        def f(self):
+        try:
             ws.write(b"12")
             chunks = []
             chunks.append((yield rs.read_bytes(1)))
             ws.close()
             chunks.append((yield rs.read_bytes(1)))
             self.assertEqual(chunks, [b"1", b"2"])
-        try:
-            f(self)
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_close_buffered_data(self):
         # Similar to the previous test, but with data stored in the OS's
         # socket buffers instead of the IOStream's read buffer.  Out-of-band
@@ -338,71 +368,70 @@ class TestReadWriteMixin(object):
         #
         # This depends on the read_chunk_size being smaller than the
         # OS socket buffer, so make it small.
-        rs, ws = self.make_iostream_pair(read_chunk_size=256)
+        rs, ws = yield self.make_iostream_pair(read_chunk_size=256)
         try:
             ws.write(b"A" * 512)
-            rs.read_bytes(256, self.stop)
-            data = self.wait()
+            data = yield rs.read_bytes(256)
             self.assertEqual(b"A" * 256, data)
             ws.close()
             # Allow the close to propagate to the `rs` side of the
             # connection.  Using add_callback instead of add_timeout
             # doesn't seem to work, even with multiple iterations
-            self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
-            self.wait()
-            rs.read_bytes(256, self.stop)
-            data = self.wait()
+            yield gen.sleep(0.01)
+            data = yield rs.read_bytes(256)
             self.assertEqual(b"A" * 256, data)
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_close_after_close(self):
         # Similar to test_delayed_close_callback, but read_until_close takes
         # a separate code path so test it separately.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             ws.write(b"1234")
             ws.close()
             # Read one byte to make sure the client has received the data.
             # It won't run the close callback as long as there is more buffered
             # data that could satisfy a later read.
-            rs.read_bytes(1, self.stop)
-            data = self.wait()
+            data = yield rs.read_bytes(1)
             self.assertEqual(data, b"1")
-            rs.read_until_close(self.stop)
-            data = self.wait()
+            data = yield rs.read_until_close()
             self.assertEqual(data, b"234")
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_streaming_read_until_close_after_close(self):
         # Same as the preceding test but with a streaming_callback.
         # All data should go through the streaming callback,
         # and the final read callback just gets an empty string.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             ws.write(b"1234")
             ws.close()
-            rs.read_bytes(1, self.stop)
-            data = self.wait()
+            data = yield rs.read_bytes(1)
             self.assertEqual(data, b"1")
             streaming_data = []
-            rs.read_until_close(self.stop,
-                                streaming_callback=streaming_data.append)
-            data = self.wait()
-            self.assertEqual(b'', data)
+            final_future = Future()
+            with ignore_deprecation():
+                rs.read_until_close(final_future.set_result,
+                                    streaming_callback=streaming_data.append)
+            final_data = yield final_future
+            self.assertEqual(b'', final_data)
             self.assertEqual(b''.join(streaming_data), b"234")
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_large_read_until(self):
         # Performance test: read_until used to have a quadratic component
         # so a read_until of 4MB would take 8 seconds; now it takes 0.25
         # seconds.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             # This test fails on pypy with ssl.  I think it's because
             # pypy's gc defeats moves objects, breaking the
@@ -415,126 +444,131 @@ class TestReadWriteMixin(object):
             for i in range(NUM_KB):
                 ws.write(b"A" * 1024)
             ws.write(b"\r\n")
-            rs.read_until(b"\r\n", self.stop)
-            data = self.wait()
+            data = yield rs.read_until(b"\r\n")
             self.assertEqual(len(data), NUM_KB * 1024 + 2)
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_close_callback_with_pending_read(self):
         # Regression test for a bug that was introduced in 2.3
         # where the IOStream._close_callback would never be called
         # if there were pending reads.
         OK = b"OK\r\n"
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(self.stop)
+        rs, ws = yield self.make_iostream_pair()
+        event = Event()
+        rs.set_close_callback(event.set)
         try:
             ws.write(OK)
-            rs.read_until(b"\r\n", self.stop)
-            res = self.wait()
+            res = yield rs.read_until(b"\r\n")
             self.assertEqual(res, OK)
 
             ws.close()
-            rs.read_until(b"\r\n", lambda x: x)
+            with ignore_deprecation():
+                rs.read_until(b"\r\n", lambda x: x)
             # If _close_callback (self.stop) is not called,
             # an AssertionError: Async operation timed out after 5 seconds
             # will be raised.
-            res = self.wait()
-            self.assertTrue(res is None)
+            yield event.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_future_close_callback(self):
         # Regression test for interaction between the Future read interfaces
         # and IOStream._maybe_add_error_listener.
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         closed = [False]
+        cond = Condition()
 
         def close_callback():
             closed[0] = True
-            self.stop()
+            cond.notify()
         rs.set_close_callback(close_callback)
         try:
             ws.write(b'a')
-            future = rs.read_bytes(1)
-            self.io_loop.add_future(future, self.stop)
-            self.assertEqual(self.wait().result(), b'a')
+            res = yield rs.read_bytes(1)
+            self.assertEqual(res, b'a')
             self.assertFalse(closed[0])
             ws.close()
-            self.wait()
+            yield cond.wait()
             self.assertTrue(closed[0])
         finally:
             rs.close()
             ws.close()
 
+    @gen_test
     def test_write_memoryview(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
-            rs.read_bytes(4, self.stop)
+            fut = rs.read_bytes(4)
             ws.write(memoryview(b"hello"))
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"hell")
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_bytes_partial(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             # Ask for more than is available with partial=True
-            rs.read_bytes(50, self.stop, partial=True)
+            fut = rs.read_bytes(50, partial=True)
             ws.write(b"hello")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"hello")
 
             # Ask for less than what is available; num_bytes is still
             # respected.
-            rs.read_bytes(3, self.stop, partial=True)
+            fut = rs.read_bytes(3, partial=True)
             ws.write(b"world")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"wor")
 
             # Partial reads won't return an empty string, but read_bytes(0)
             # will.
-            rs.read_bytes(0, self.stop, partial=True)
-            data = self.wait()
+            data = yield rs.read_bytes(0, partial=True)
             self.assertEqual(data, b'')
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_max_bytes(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Extra room under the limit
-            rs.read_until(b"def", self.stop, max_bytes=50)
+            fut = rs.read_until(b"def", max_bytes=50)
             ws.write(b"abcdef")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"abcdef")
 
             # Just enough space
-            rs.read_until(b"def", self.stop, max_bytes=6)
+            fut = rs.read_until(b"def", max_bytes=6)
             ws.write(b"abcdef")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"abcdef")
 
             # Not enough space, but we don't know it until all we can do is
             # log a warning and close the connection.
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until(b"def", self.stop, max_bytes=5)
+                fut = rs.read_until(b"def", max_bytes=5)
                 ws.write(b"123456")
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_max_bytes_inline(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Similar to the error case in the previous test, but the
             # ws writes first so rs reads are satisfied
@@ -542,59 +576,63 @@ class TestReadWriteMixin(object):
             # do not raise the error synchronously.
             ws.write(b"123456")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until(b"def", self.stop, max_bytes=5)
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                with ignore_deprecation():
+                    rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5)
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_max_bytes_ignores_extra(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Even though data that matches arrives the same packet that
             # puts us over the limit, we fail the request because it was not
             # found within the limit.
             ws.write(b"abcdef")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until(b"def", self.stop, max_bytes=5)
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                rs.read_until(b"def", max_bytes=5)
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_regex_max_bytes(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Extra room under the limit
-            rs.read_until_regex(b"def", self.stop, max_bytes=50)
+            fut = rs.read_until_regex(b"def", max_bytes=50)
             ws.write(b"abcdef")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"abcdef")
 
             # Just enough space
-            rs.read_until_regex(b"def", self.stop, max_bytes=6)
+            fut = rs.read_until_regex(b"def", max_bytes=6)
             ws.write(b"abcdef")
-            data = self.wait()
+            data = yield fut
             self.assertEqual(data, b"abcdef")
 
             # Not enough space, but we don't know it until all we can do is
             # log a warning and close the connection.
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until_regex(b"def", self.stop, max_bytes=5)
+                rs.read_until_regex(b"def", max_bytes=5)
                 ws.write(b"123456")
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_regex_max_bytes_inline(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Similar to the error case in the previous test, but the
             # ws writes first so rs reads are satisfied
@@ -602,102 +640,101 @@ class TestReadWriteMixin(object):
             # do not raise the error synchronously.
             ws.write(b"123456")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until_regex(b"def", self.stop, max_bytes=5)
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                rs.read_until_regex(b"def", max_bytes=5)
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_until_regex_max_bytes_ignores_extra(self):
-        rs, ws = self.make_iostream_pair()
-        rs.set_close_callback(lambda: self.stop("closed"))
+        rs, ws = yield self.make_iostream_pair()
+        closed = Event()
+        rs.set_close_callback(closed.set)
         try:
             # Even though data that matches arrives the same packet that
             # puts us over the limit, we fail the request because it was not
             # found within the limit.
             ws.write(b"abcdef")
             with ExpectLog(gen_log, "Unsatisfiable read"):
-                rs.read_until_regex(b"def", self.stop, max_bytes=5)
-                data = self.wait()
-            self.assertEqual(data, "closed")
+                rs.read_until_regex(b"def", max_bytes=5)
+                yield closed.wait()
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_small_reads_from_large_buffer(self):
         # 10KB buffer size, 100KB available to read.
         # Read 1KB at a time and make sure that the buffer is not eagerly
         # filled.
-        rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
         try:
             ws.write(b"a" * 1024 * 100)
             for i in range(100):
-                rs.read_bytes(1024, self.stop)
-                data = self.wait()
+                data = yield rs.read_bytes(1024)
                 self.assertEqual(data, b"a" * 1024)
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_small_read_untils_from_large_buffer(self):
         # 10KB buffer size, 100KB available to read.
         # Read 1KB at a time and make sure that the buffer is not eagerly
         # filled.
-        rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
+        rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
         try:
             ws.write((b"a" * 1023 + b"\n") * 100)
             for i in range(100):
-                rs.read_until(b"\n", self.stop, max_bytes=4096)
-                data = self.wait()
+                data = yield rs.read_until(b"\n", max_bytes=4096)
                 self.assertEqual(data, b"a" * 1023 + b"\n")
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_flow_control(self):
         MB = 1024 * 1024
-        rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB)
+        rs, ws = yield self.make_iostream_pair(max_buffer_size=5 * MB)
         try:
             # Client writes more than the rs will accept.
             ws.write(b"a" * 10 * MB)
             # The rs pauses while reading.
-            rs.read_bytes(MB, self.stop)
-            self.wait()
-            self.io_loop.call_later(0.1, self.stop)
-            self.wait()
+            yield rs.read_bytes(MB)
+            yield gen.sleep(0.1)
             # The ws's writes have been blocked; the rs can
             # continue to read gradually.
             for i in range(9):
-                rs.read_bytes(MB, self.stop)
-                self.wait()
+                yield rs.read_bytes(MB)
         finally:
             rs.close()
             ws.close()
 
+    @gen_test
     def test_read_into(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
         def sleep_some():
             self.io_loop.run_sync(lambda: gen.sleep(0.05))
         try:
             buf = bytearray(10)
-            rs.read_into(buf, callback=self.stop)
+            fut = rs.read_into(buf)
             ws.write(b"hello")
-            sleep_some()
+            yield gen.sleep(0.05)
             self.assertTrue(rs.reading())
             ws.write(b"world!!")
-            data = self.wait()
+            data = yield fut
             self.assertFalse(rs.reading())
             self.assertEqual(data, 10)
             self.assertEqual(bytes(buf), b"helloworld")
 
             # Existing buffer is fed into user buffer
-            rs.read_into(buf, callback=self.stop)
-            sleep_some()
+            fut = rs.read_into(buf)
+            yield gen.sleep(0.05)
             self.assertTrue(rs.reading())
             ws.write(b"1234567890")
-            data = self.wait()
+            data = yield fut
             self.assertFalse(rs.reading())
             self.assertEqual(data, 10)
             self.assertEqual(bytes(buf), b"!!12345678")
@@ -705,43 +742,38 @@ class TestReadWriteMixin(object):
             # Existing buffer can satisfy read immediately
             buf = bytearray(4)
             ws.write(b"abcdefghi")
-            rs.read_into(buf, callback=self.stop)
-            data = self.wait()
+            data = yield rs.read_into(buf)
             self.assertEqual(data, 4)
             self.assertEqual(bytes(buf), b"90ab")
 
-            rs.read_bytes(7, self.stop)
-            data = self.wait()
+            data = yield rs.read_bytes(7)
             self.assertEqual(data, b"cdefghi")
         finally:
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_into_partial(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
-        def sleep_some():
-            self.io_loop.run_sync(lambda: gen.sleep(0.05))
         try:
             # Partial read
             buf = bytearray(10)
-            rs.read_into(buf, callback=self.stop, partial=True)
+            fut = rs.read_into(buf, partial=True)
             ws.write(b"hello")
-            data = self.wait()
+            data = yield fut
             self.assertFalse(rs.reading())
             self.assertEqual(data, 5)
             self.assertEqual(bytes(buf), b"hello\0\0\0\0\0")
 
             # Full read despite partial=True
             ws.write(b"world!1234567890")
-            rs.read_into(buf, callback=self.stop, partial=True)
-            data = self.wait()
+            data = yield rs.read_into(buf, partial=True)
             self.assertEqual(data, 10)
             self.assertEqual(bytes(buf), b"world!1234")
 
             # Existing buffer can satisfy read immediately
-            rs.read_into(buf, callback=self.stop, partial=True)
-            data = self.wait()
+            data = yield rs.read_into(buf, partial=True)
             self.assertEqual(data, 6)
             self.assertEqual(bytes(buf), b"5678901234")
 
@@ -749,8 +781,9 @@ class TestReadWriteMixin(object):
             ws.close()
             rs.close()
 
+    @gen_test
     def test_read_into_zero_bytes(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
         try:
             buf = bytearray()
             fut = rs.read_into(buf)
@@ -759,13 +792,14 @@ class TestReadWriteMixin(object):
             ws.close()
             rs.close()
 
+    @gen_test
     def test_many_mixed_reads(self):
         # Stress buffer handling when going back and forth between
         # read_bytes() (using an internal buffer) and read_into()
         # (using a user-allocated buffer).
         r = random.Random(42)
         nbytes = 1000000
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
         produce_hash = hashlib.sha1()
         consume_hash = hashlib.sha1()
@@ -801,13 +835,9 @@ class TestReadWriteMixin(object):
                     remaining -= size
             assert remaining == 0
 
-        @gen.coroutine
-        def main():
+        try:
             yield [produce(), consume()]
             assert produce_hash.hexdigest() == consume_hash.hexdigest()
-
-        try:
-            self.io_loop.run_sync(main)
         finally:
             ws.close()
             rs.close()
@@ -820,25 +850,21 @@ class TestIOStreamMixin(TestReadWriteMixin):
     def _make_client_iostream(self, connection, **kwargs):
         raise NotImplementedError()
 
+    @gen.coroutine
     def make_iostream_pair(self, **kwargs):
         listener, port = bind_unused_port()
-        streams = [None, None]
+        server_stream_fut = Future()
 
         def accept_callback(connection, address):
-            streams[0] = self._make_server_iostream(connection, **kwargs)
-            self.stop()
+            server_stream_fut.set_result(self._make_server_iostream(connection, **kwargs))
 
-        def connect_callback():
-            streams[1] = client_stream
-            self.stop()
         netutil.add_accept_handler(listener, accept_callback)
         client_stream = self._make_client_iostream(socket.socket(), **kwargs)
-        client_stream.connect(('127.0.0.1', port),
-                              callback=connect_callback)
-        self.wait(condition=lambda: all(streams))
+        connect_fut = client_stream.connect(('127.0.0.1', port))
+        server_stream, client_stream = yield [server_stream_fut, connect_fut]
         self.io_loop.remove_handler(listener.fileno())
         listener.close()
-        return streams
+        raise gen.Return((server_stream, client_stream))
 
     def test_connection_refused(self):
         # When a connection is refused, the connect callback should not
@@ -883,38 +909,44 @@ class TestIOStreamMixin(TestReadWriteMixin):
                 self.wait()
                 self.assertIsInstance(stream.error, socket.gaierror)
 
+    @gen_test
     def test_read_callback_error(self):
         # Test that IOStream sets its exc_info when a read callback throws
-        server, client = self.make_iostream_pair()
+        server, client = yield self.make_iostream_pair()
         try:
-            server.set_close_callback(self.stop)
+            closed = Event()
+            server.set_close_callback(closed.set)
             with ExpectLog(
                 app_log, "(Uncaught exception|Exception in callback)"
             ):
                 # Clear ExceptionStackContext so IOStream catches error
                 with NullContext():
-                    server.read_bytes(1, callback=lambda data: 1 / 0)
+                    with ignore_deprecation():
+                        server.read_bytes(1, callback=lambda data: 1 / 0)
                 client.write(b"1")
-                self.wait()
+                yield closed.wait()
             self.assertTrue(isinstance(server.error, ZeroDivisionError))
         finally:
             server.close()
             client.close()
 
     @unittest.skipIf(mock is None, 'mock package not present')
+    @gen_test
     def test_read_until_close_with_error(self):
-        server, client = self.make_iostream_pair()
+        server, client = yield self.make_iostream_pair()
         try:
             with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
                             side_effect=IOError('boom')):
                 with self.assertRaisesRegexp(IOError, 'boom'):
-                    client.read_until_close(self.stop)
+                    with ignore_deprecation():
+                        client.read_until_close(lambda x: None)
         finally:
             server.close()
             client.close()
 
     @skipIfNonUnix
     @skipPypy3V58
+    @gen_test
     def test_inline_read_error(self):
         # An error on an inline read is raised without logging (on the
         # assumption that it will eventually be noticed or logged further
@@ -924,24 +956,27 @@ class TestIOStreamMixin(TestReadWriteMixin):
         # on socket FDs, but we can't close the socket object normally
         # because we won't get the error we want if the socket knows
         # it's closed.
-        server, client = self.make_iostream_pair()
+        server, client = yield self.make_iostream_pair()
         try:
             os.close(server.socket.fileno())
             with self.assertRaises(socket.error):
-                server.read_bytes(1, lambda data: None)
+                server.read_bytes(1)
         finally:
             server.close()
             client.close()
 
     @skipPypy3V58
+    @gen_test
     def test_async_read_error_logging(self):
         # Socket errors on asynchronous reads should be logged (but only
         # once).
-        server, client = self.make_iostream_pair()
-        server.set_close_callback(self.stop)
+        server, client = yield self.make_iostream_pair()
+        closed = Event()
+        server.set_close_callback(closed.set)
         try:
             # Start a read that will be fulfilled asynchronously.
-            server.read_bytes(1, lambda data: None)
+            with ignore_deprecation():
+                server.read_bytes(1, lambda data: None)
             client.write(b'a')
             # Stub out read_from_fd to make it fail.
 
@@ -951,11 +986,12 @@ class TestIOStreamMixin(TestReadWriteMixin):
             server.read_from_fd = fake_read_from_fd
             # This log message is from _handle_read (not read_from_fd).
             with ExpectLog(gen_log, "error on read"):
-                self.wait()
+                yield closed.wait()
         finally:
             server.close()
             client.close()
 
+    @gen_test
     def test_future_write(self):
         """
         Test that write() Futures are never orphaned.
@@ -965,7 +1001,7 @@ class TestIOStreamMixin(TestReadWriteMixin):
         m, n = 10000, 1000
         nproducers = 10
         total_bytes = m * n * nproducers
-        server, client = self.make_iostream_pair(max_buffer_size=total_bytes)
+        server, client = yield self.make_iostream_pair(max_buffer_size=total_bytes)
 
         @gen.coroutine
         def produce():
@@ -980,12 +1016,8 @@ class TestIOStreamMixin(TestReadWriteMixin):
                 res = yield client.read_bytes(m)
                 nread += len(res)
 
-        @gen.coroutine
-        def main():
-            yield [produce() for i in range(nproducers)] + [consume()]
-
         try:
-            self.io_loop.run_sync(main)
+            yield [produce() for i in range(nproducers)] + [consume()]
         finally:
             server.close()
             client.close()
@@ -1241,43 +1273,42 @@ class WaitForHandshakeTest(AsyncTestCase):
 
 @skipIfNonUnix
 class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
+    @gen.coroutine
     def make_iostream_pair(self, **kwargs):
         r, w = os.pipe()
 
         return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
 
+    @gen_test
     def test_pipe_iostream(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
         ws.write(b"hel")
         ws.write(b"lo world")
 
-        rs.read_until(b' ', callback=self.stop)
-        data = self.wait()
+        data = yield rs.read_until(b' ')
         self.assertEqual(data, b"hello ")
 
-        rs.read_bytes(3, self.stop)
-        data = self.wait()
+        data = yield rs.read_bytes(3)
         self.assertEqual(data, b"wor")
 
         ws.close()
 
-        rs.read_until_close(self.stop)
-        data = self.wait()
+        data = yield rs.read_until_close()
         self.assertEqual(data, b"ld")
 
         rs.close()
 
+    @gen_test
     def test_pipe_iostream_big_write(self):
-        rs, ws = self.make_iostream_pair()
+        rs, ws = yield self.make_iostream_pair()
 
         NUM_BYTES = 1048576
 
         # Write 1MB of data, which should fill the buffer
         ws.write(b"1" * NUM_BYTES)
 
-        rs.read_bytes(NUM_BYTES, self.stop)
-        data = self.wait()
+        data = yield rs.read_bytes(NUM_BYTES)
         self.assertEqual(data, b"1" * NUM_BYTES)
 
         ws.close()
index 4589e51bcc07741ffa1a8b5bf05e4c71eb4bca47..1095b1126a1459e449e51997e68b1c4f3f634343 100644 (file)
@@ -143,6 +143,7 @@ class ProcessTest(unittest.TestCase):
 
 @skipIfNonUnix
 class SubprocessTest(AsyncTestCase):
+    @gen_test
     def test_subprocess(self):
         if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'):
             # This test fails non-deterministically with LayeredTwistedIOLoop.
@@ -158,33 +159,29 @@ class SubprocessTest(AsyncTestCase):
         self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
         self.addCleanup(subproc.stdout.close)
         self.addCleanup(subproc.stdin.close)
-        subproc.stdout.read_until(b'>>> ', self.stop)
-        self.wait()
+        yield subproc.stdout.read_until(b'>>> ')
         subproc.stdin.write(b"print('hello')\n")
-        subproc.stdout.read_until(b'\n', self.stop)
-        data = self.wait()
+        data = yield subproc.stdout.read_until(b'\n')
         self.assertEqual(data, b"hello\n")
 
-        subproc.stdout.read_until(b">>> ", self.stop)
-        self.wait()
+        yield subproc.stdout.read_until(b">>> ")
         subproc.stdin.write(b"raise SystemExit\n")
-        subproc.stdout.read_until_close(self.stop)
-        data = self.wait()
+        data = yield subproc.stdout.read_until_close()
         self.assertEqual(data, b"")
 
+    @gen_test
     def test_close_stdin(self):
         # Close the parent's stdin handle and see that the child recognizes it.
         subproc = Subprocess([sys.executable, '-u', '-i'],
                              stdin=Subprocess.STREAM,
                              stdout=Subprocess.STREAM, stderr=subprocess.STDOUT)
         self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
-        subproc.stdout.read_until(b'>>> ', self.stop)
-        self.wait()
+        yield subproc.stdout.read_until(b'>>> ')
         subproc.stdin.close()
-        subproc.stdout.read_until_close(self.stop)
-        data = self.wait()
+        data = yield subproc.stdout.read_until_close()
         self.assertEqual(data, b"\n")
 
+    @gen_test
     def test_stderr(self):
         # This test is mysteriously flaky on twisted: it succeeds, but logs
         # an error of EBADF on closing a file descriptor.
@@ -193,8 +190,7 @@ class SubprocessTest(AsyncTestCase):
                               r"import sys; sys.stderr.write('hello\n')"],
                              stderr=Subprocess.STREAM)
         self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
-        subproc.stderr.read_until(b'\n', self.stop)
-        data = self.wait()
+        data = yield subproc.stderr.read_until(b'\n')
         self.assertEqual(data, b'hello\n')
         # More mysterious EBADF: This fails if done with self.addCleanup instead of here.
         subproc.stderr.close()
index 738a9ccb80013296de53107df1ef35e092532482..ced6d657f6adbcbd923ea9822fe768df48b7b826 100644 (file)
@@ -19,11 +19,13 @@ the protocol (known as "draft 76") and are not compatible with this module.
 from __future__ import absolute_import, division, print_function
 
 import base64
+import contextlib
 import hashlib
 import os
 import struct
 import tornado.escape
 import tornado.web
+import warnings
 import zlib
 
 from tornado.concurrent import Future, future_set_result_unless_cancelled
@@ -44,6 +46,14 @@ else:
     from urlparse import urlparse  # py3
 
 
+@contextlib.contextmanager
+def ignore_deprecation():
+    """Context manager to ignore deprecation warnings."""
+    with warnings.catch_warnings():
+        warnings.simplefilter('ignore', DeprecationWarning)
+        yield
+
+
 class WebSocketError(Exception):
     pass
 
@@ -838,7 +848,8 @@ class WebSocketProtocol13(WebSocketProtocol):
 
     def _receive_frame(self):
         try:
-            self.stream.read_bytes(2, self._on_frame_start)
+            with ignore_deprecation():
+                self.stream.read_bytes(2, self._on_frame_start)
         except StreamClosedError:
             self._abort()
 
@@ -863,16 +874,17 @@ class WebSocketProtocol13(WebSocketProtocol):
             self._abort()
             return
         try:
-            if payloadlen < 126:
-                self._frame_length = payloadlen
-                if self._masked_frame:
-                    self.stream.read_bytes(4, self._on_masking_key)
-                else:
-                    self._read_frame_data(False)
-            elif payloadlen == 126:
-                self.stream.read_bytes(2, self._on_frame_length_16)
-            elif payloadlen == 127:
-                self.stream.read_bytes(8, self._on_frame_length_64)
+            with ignore_deprecation():
+                if payloadlen < 126:
+                    self._frame_length = payloadlen
+                    if self._masked_frame:
+                        self.stream.read_bytes(4, self._on_masking_key)
+                    else:
+                        self._read_frame_data(False)
+                elif payloadlen == 126:
+                    self.stream.read_bytes(2, self._on_frame_length_16)
+                elif payloadlen == 127:
+                    self.stream.read_bytes(8, self._on_frame_length_64)
         except StreamClosedError:
             self._abort()
 
@@ -883,16 +895,18 @@ class WebSocketProtocol13(WebSocketProtocol):
         if new_len > (self.handler.max_message_size or 10 * 1024 * 1024):
             self.close(1009, "message too big")
             return
-        self.stream.read_bytes(
-            self._frame_length,
-            self._on_masked_frame_data if masked else self._on_frame_data)
+        with ignore_deprecation():
+            self.stream.read_bytes(
+                self._frame_length,
+                self._on_masked_frame_data if masked else self._on_frame_data)
 
     def _on_frame_length_16(self, data):
         self._wire_bytes_in += len(data)
         self._frame_length = struct.unpack("!H", data)[0]
         try:
             if self._masked_frame:
-                self.stream.read_bytes(4, self._on_masking_key)
+                with ignore_deprecation():
+                    self.stream.read_bytes(4, self._on_masking_key)
             else:
                 self._read_frame_data(False)
         except StreamClosedError:
@@ -903,7 +917,8 @@ class WebSocketProtocol13(WebSocketProtocol):
         self._frame_length = struct.unpack("!Q", data)[0]
         try:
             if self._masked_frame:
-                self.stream.read_bytes(4, self._on_masking_key)
+                with ignore_deprecation():
+                    self.stream.read_bytes(4, self._on_masking_key)
             else:
                 self._read_frame_data(False)
         except StreamClosedError: