From: Ben Darnell Date: Sun, 22 Apr 2018 01:00:54 +0000 (-0400) Subject: iostream: Deprecate callback arguments to read methods X-Git-Tag: v5.1.0b1~23^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=58067883c40b2894b5bdf26dd79c30b6e49207d7;p=thirdparty%2Ftornado.git iostream: Deprecate callback arguments to read methods This requires a lot of mechanical changes throughout the test suite. tornado.websocket is currently still using the deprecated interfaces and suppressing the warning. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index ad50be552..f7f7e3c83 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -33,6 +33,7 @@ import os import socket import sys import re +import warnings from tornado.concurrent import Future from tornado import ioloop @@ -342,6 +343,12 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) self._read_regex = re.compile(regex) @@ -375,6 +382,11 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. """ future = self._set_read_callback(callback) self._read_delimiter = delimiter @@ -408,6 +420,12 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``partial`` argument. The callback argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) @@ -434,6 +452,12 @@ class BaseIOStream(object): entirely filled with read data. .. versionadded:: 5.0 + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) @@ -485,6 +509,11 @@ class BaseIOStream(object): The callback argument is now optional and a `.Future` will be returned if it is omitted. + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) self._streaming_callback = stack_context.wrap(streaming_callback) @@ -807,6 +836,8 @@ class BaseIOStream(object): assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: + warnings.warn("callbacks are deprecated, use returned Future instead", + DeprecationWarning) self._read_callback = stack_context.wrap(callback) else: self._read_future = Future() diff --git a/tornado/test/concurrent_test.py b/tornado/test/concurrent_test.py index ea1479103..1df0532cc 100644 --- a/tornado/test/concurrent_test.py +++ b/tornado/test/concurrent_test.py @@ -20,6 +20,7 @@ import re import socket import sys import traceback +import warnings from tornado.concurrent import (Future, return_future, ReturnValueIgnoredError, run_on_executor, future_set_result_unless_cancelled) @@ -249,20 +250,16 @@ class ReturnFutureTest(AsyncTestCase): class CapServer(TCPServer): + @gen.coroutine def handle_stream(self, stream, address): - logging.debug("handle_stream") - self.stream = stream - self.stream.read_until(b"\n", self.handle_read) - - def handle_read(self, data): - logging.debug("handle_read") + data = yield stream.read_until(b"\n") data = to_unicode(data) if data == data.upper(): - self.stream.write(b"error\talready capitalized\n") + stream.write(b"error\talready capitalized\n") else: # data already has \n - self.stream.write(utf8("ok\t%s" % data.upper())) - self.stream.close() + stream.write(utf8("ok\t%s" % data.upper())) + stream.close() class CapError(Exception): @@ -397,10 +394,30 @@ class ClientTestMixin(object): class ManualClientTest(ClientTestMixin, AsyncTestCase): client_class = ManualCapClient + def setUp(self): + self.warning_catcher = warnings.catch_warnings() + self.warning_catcher.__enter__() + warnings.simplefilter('ignore', DeprecationWarning) + super(ManualClientTest, self).setUp() + + def tearDown(self): + super(ManualClientTest, self).tearDown() + self.warning_catcher.__exit__(None, None, None) + class DecoratorClientTest(ClientTestMixin, AsyncTestCase): client_class = DecoratorCapClient + def setUp(self): + self.warning_catcher = warnings.catch_warnings() + self.warning_catcher.__enter__() + warnings.simplefilter('ignore', DeprecationWarning) + super(DecoratorClientTest, self).setUp() + + def tearDown(self): + super(DecoratorClientTest, self).tearDown() + self.warning_catcher.__exit__(None, None, None) + class GeneratorClientTest(ClientTestMixin, AsyncTestCase): client_class = GeneratorCapClient diff --git a/tornado/test/httpclient_test.py b/tornado/test/httpclient_test.py index f980b48cd..db0492de6 100644 --- a/tornado/test/httpclient_test.py +++ b/tornado/test/httpclient_test.py @@ -4,7 +4,6 @@ import base64 import binascii from contextlib import closing import copy -import functools import sys import threading import datetime @@ -190,7 +189,12 @@ class HTTPClientCommonTestCase(AsyncHTTPTestCase): # over several ioloop iterations, but the connection is already closed. sock, port = bind_unused_port() with closing(sock): - def write_response(stream, request_data): + @gen.coroutine + def accept_callback(conn, address): + # fake an HTTP server using chunked encoding where the final chunks + # and connection close all happen at once + stream = IOStream(conn) + request_data = yield stream.read_until(b"\r\n\r\n") if b"HTTP/1." not in request_data: self.skipTest("requires HTTP/1.x") stream.write(b"""\ @@ -204,13 +208,6 @@ Transfer-Encoding: chunked 0 """.replace(b"\n", b"\r\n"), callback=stream.close) - - def accept_callback(conn, address): - # fake an HTTP server using chunked encoding where the final chunks - # and connection close all happen at once - stream = IOStream(conn) - stream.read_until(b"\r\n\r\n", - functools.partial(write_response, stream)) netutil.add_accept_handler(sock, accept_callback) resp = self.fetch("http://127.0.0.1:%d/" % port) resp.rethrow() @@ -383,7 +380,10 @@ Transfer-Encoding: chunked # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 sock, port = bind_unused_port() with closing(sock): - def write_response(stream, request_data): + @gen.coroutine + def accept_callback(conn, address): + stream = IOStream(conn) + request_data = yield stream.read_until(b"\r\n\r\n") if b"HTTP/1." not in request_data: self.skipTest("requires HTTP/1.x") stream.write(b"""\ @@ -393,10 +393,6 @@ X-XSS-Protection: 1; """.replace(b"\n", b"\r\n"), callback=stream.close) - def accept_callback(conn, address): - stream = IOStream(conn) - stream.read_until(b"\r\n\r\n", - functools.partial(write_response, stream)) netutil.add_accept_handler(sock, accept_callback) resp = self.fetch("http://127.0.0.1:%d/" % port) resp.rethrow() diff --git a/tornado/test/httpserver_test.py b/tornado/test/httpserver_test.py index 155dbebe8..19bfabb39 100644 --- a/tornado/test/httpserver_test.py +++ b/tornado/test/httpserver_test.py @@ -1,6 +1,6 @@ from __future__ import absolute_import, division, print_function -from tornado import netutil +from tornado import gen, netutil from tornado.concurrent import Future from tornado.escape import json_decode, json_encode, utf8, _unicode, recursive_unicode, native_str from tornado.http1connection import HTTP1Connection @@ -8,6 +8,7 @@ from tornado.httpclient import HTTPError from tornado.httpserver import HTTPServer from tornado.httputil import HTTPHeaders, HTTPMessageDelegate, HTTPServerConnectionDelegate, ResponseStartLine # noqa: E501 from tornado.iostream import IOStream +from tornado.locks import Event from tornado.log import gen_log from tornado.netutil import ssl_options_to_context from tornado.simple_httpclient import SimpleAsyncHTTPClient @@ -250,31 +251,27 @@ class HTTPConnectionTest(AsyncHTTPTestCase): newline=newline) self.assertEqual(response, b'Hello world') + @gen_test def test_100_continue(self): # Run through a 100-continue interaction by hand: # When given Expect: 100-continue, we get a 100 response after the # headers, and then the real response after the body. stream = IOStream(socket.socket()) - stream.connect(("127.0.0.1", self.get_http_port()), callback=self.stop) - self.wait() - stream.write(b"\r\n".join([b"POST /hello HTTP/1.1", - b"Content-Length: 1024", - b"Expect: 100-continue", - b"Connection: close", - b"\r\n"]), callback=self.stop) - self.wait() - stream.read_until(b"\r\n\r\n", self.stop) - data = self.wait() + yield stream.connect(("127.0.0.1", self.get_http_port())) + yield stream.write(b"\r\n".join([ + b"POST /hello HTTP/1.1", + b"Content-Length: 1024", + b"Expect: 100-continue", + b"Connection: close", + b"\r\n"])) + data = yield stream.read_until(b"\r\n\r\n") self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data) stream.write(b"a" * 1024) - stream.read_until(b"\r\n", self.stop) - first_line = self.wait() + first_line = yield stream.read_until(b"\r\n") self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line) - stream.read_until(b"\r\n\r\n", self.stop) - header_data = self.wait() + header_data = yield stream.read_until(b"\r\n\r\n") headers = HTTPHeaders.parse(native_str(header_data.decode('latin1'))) - stream.read_bytes(int(headers["Content-Length"]), self.stop) - body = self.wait() + body = yield stream.read_bytes(int(headers["Content-Length"])) self.assertEqual(body, b"Got 1024 bytes in POST") stream.close() @@ -467,6 +464,7 @@ bar start_line, headers, response = self.wait() self.assertEqual(json_decode(response), {u'foo': [u'bar']}) + @gen_test def test_invalid_content_length(self): with ExpectLog(gen_log, '.*Only integer Content-Length is allowed'): self.stream.write(b"""\ @@ -476,8 +474,7 @@ Content-Length: foo bar """.replace(b"\n", b"\r\n")) - self.stream.read_until_close(self.stop) - self.wait() + yield self.stream.read_until_close() class XHeaderTest(HandlerBaseTestCase): @@ -631,24 +628,23 @@ class UnixSocketTest(AsyncTestCase): shutil.rmtree(self.tmpdir) super(UnixSocketTest, self).tearDown() + @gen_test def test_unix_socket(self): self.stream.write(b"GET /hello HTTP/1.0\r\n\r\n") - self.stream.read_until(b"\r\n", self.stop) - response = self.wait() + response = yield self.stream.read_until(b"\r\n") self.assertEqual(response, b"HTTP/1.1 200 OK\r\n") - self.stream.read_until(b"\r\n\r\n", self.stop) - headers = HTTPHeaders.parse(self.wait().decode('latin1')) - self.stream.read_bytes(int(headers["Content-Length"]), self.stop) - body = self.wait() + header_data = yield self.stream.read_until(b"\r\n\r\n") + headers = HTTPHeaders.parse(header_data.decode('latin1')) + body = yield self.stream.read_bytes(int(headers["Content-Length"])) self.assertEqual(body, b"Hello world") + @gen_test def test_unix_socket_bad_request(self): # Unix sockets don't have remote addresses so they just return an # empty string. with ExpectLog(gen_log, "Malformed HTTP message from"): self.stream.write(b"garbage\r\n\r\n") - self.stream.read_until_close(self.stop) - response = self.wait() + response = yield self.stream.read_until_close() self.assertEqual(response, b"HTTP/1.1 400 Bad Request\r\n\r\n") @@ -702,123 +698,129 @@ class KeepAliveTest(AsyncHTTPTestCase): super(KeepAliveTest, self).tearDown() # The next few methods are a crude manual http client + @gen.coroutine def connect(self): self.stream = IOStream(socket.socket()) - self.stream.connect(('127.0.0.1', self.get_http_port()), self.stop) - self.wait() + yield self.stream.connect(('127.0.0.1', self.get_http_port())) + @gen.coroutine def read_headers(self): - self.stream.read_until(b'\r\n', self.stop) - first_line = self.wait() + first_line = yield self.stream.read_until(b'\r\n') self.assertTrue(first_line.startswith(b'HTTP/1.1 200'), first_line) - self.stream.read_until(b'\r\n\r\n', self.stop) - header_bytes = self.wait() + header_bytes = yield self.stream.read_until(b'\r\n\r\n') headers = HTTPHeaders.parse(header_bytes.decode('latin1')) - return headers + raise gen.Return(headers) + @gen.coroutine def read_response(self): - self.headers = self.read_headers() - self.stream.read_bytes(int(self.headers['Content-Length']), self.stop) - body = self.wait() + self.headers = yield self.read_headers() + body = yield self.stream.read_bytes(int(self.headers['Content-Length'])) self.assertEqual(b'Hello world', body) def close(self): self.stream.close() del self.stream + @gen_test def test_two_requests(self): - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.1\r\n\r\n') - self.read_response() + yield self.read_response() self.stream.write(b'GET / HTTP/1.1\r\n\r\n') - self.read_response() + yield self.read_response() self.close() + @gen_test def test_request_close(self): - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n') - self.read_response() - self.stream.read_until_close(callback=self.stop) - data = self.wait() + yield self.read_response() + data = yield self.stream.read_until_close() self.assertTrue(not data) self.assertEqual(self.headers['Connection'], 'close') self.close() # keepalive is supported for http 1.0 too, but it's opt-in + @gen_test def test_http10(self): self.http_version = b'HTTP/1.0' - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.0\r\n\r\n') - self.read_response() - self.stream.read_until_close(callback=self.stop) - data = self.wait() + yield self.read_response() + data = yield self.stream.read_until_close() self.assertTrue(not data) self.assertTrue('Connection' not in self.headers) self.close() + @gen_test def test_http10_keepalive(self): self.http_version = b'HTTP/1.0' - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.close() + @gen_test def test_http10_keepalive_extra_crlf(self): self.http_version = b'HTTP/1.0' - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.close() + @gen_test def test_pipelined_requests(self): - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n') - self.read_response() - self.read_response() + yield self.read_response() + yield self.read_response() self.close() + @gen_test def test_pipelined_cancel(self): - self.connect() + yield self.connect() self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n') # only read once - self.read_response() + yield self.read_response() self.close() + @gen_test def test_cancel_during_download(self): - self.connect() + yield self.connect() self.stream.write(b'GET /large HTTP/1.1\r\n\r\n') - self.read_headers() - self.stream.read_bytes(1024, self.stop) - self.wait() + yield self.read_headers() + yield self.stream.read_bytes(1024) self.close() + @gen_test def test_finish_while_closed(self): - self.connect() + yield self.connect() self.stream.write(b'GET /finish_on_close HTTP/1.1\r\n\r\n') - self.read_headers() + yield self.read_headers() self.close() + @gen_test def test_keepalive_chunked(self): self.http_version = b'HTTP/1.0' - self.connect() + yield self.connect() self.stream.write(b'POST / HTTP/1.0\r\n' b'Connection: keep-alive\r\n' b'Transfer-Encoding: chunked\r\n' b'\r\n' b'0\r\n' b'\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n') - self.read_response() + yield self.read_response() self.assertEqual(self.headers['Connection'], 'Keep-Alive') self.close() @@ -990,34 +992,35 @@ class IdleTimeoutTest(AsyncHTTPTestCase): for stream in self.streams: stream.close() + @gen.coroutine def connect(self): stream = IOStream(socket.socket()) - stream.connect(('127.0.0.1', self.get_http_port()), self.stop) - self.wait() + yield stream.connect(('127.0.0.1', self.get_http_port())) self.streams.append(stream) - return stream + raise gen.Return(stream) + @gen_test def test_unused_connection(self): - stream = self.connect() - stream.set_close_callback(self.stop) - self.wait() + stream = yield self.connect() + event = Event() + stream.set_close_callback(event.set) + yield event.wait() + @gen_test def test_idle_after_use(self): - stream = self.connect() - stream.set_close_callback(lambda: self.stop("closed")) + stream = yield self.connect() + event = Event() + stream.set_close_callback(event.set) # Use the connection twice to make sure keep-alives are working for i in range(2): stream.write(b"GET / HTTP/1.1\r\n\r\n") - stream.read_until(b"\r\n\r\n", self.stop) - self.wait() - stream.read_bytes(11, self.stop) - data = self.wait() + yield stream.read_until(b"\r\n\r\n") + data = yield stream.read_bytes(11) self.assertEqual(data, b"Hello world") # Now let the timeout trigger and close the connection. - data = self.wait() - self.assertEqual(data, "closed") + yield event.wait() class BodyLimitsTest(AsyncHTTPTestCase): diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index 45799db2f..200a3a0c3 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -4,12 +4,14 @@ from tornado import gen from tornado import netutil from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer from tornado.httputil import HTTPHeaders +from tornado.locks import Condition, Event from tornado.log import gen_log, app_log from tornado.netutil import ssl_wrap_socket from tornado.stack_context import NullContext from tornado.tcpserver import TCPServer from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test # noqa: E501 -from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58 +from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58, + ignore_deprecation) from tornado.web import RequestHandler, Application import errno import hashlib @@ -59,48 +61,45 @@ class TestIOStreamWebMixin(object): response = self.fetch("/", headers={"Connection": "close"}) response.rethrow() + @gen_test def test_read_until_close(self): stream = self._make_client_iostream() - stream.connect(('127.0.0.1', self.get_http_port()), callback=self.stop) - self.wait() + yield stream.connect(('127.0.0.1', self.get_http_port())) stream.write(b"GET / HTTP/1.0\r\n\r\n") - stream.read_until_close(self.stop) - data = self.wait() + data = yield stream.read_until_close() self.assertTrue(data.startswith(b"HTTP/1.1 200")) self.assertTrue(data.endswith(b"Hello")) + @gen_test def test_read_zero_bytes(self): self.stream = self._make_client_iostream() - self.stream.connect(("127.0.0.1", self.get_http_port()), - callback=self.stop) - self.wait() + yield self.stream.connect(("127.0.0.1", self.get_http_port())) self.stream.write(b"GET / HTTP/1.0\r\n\r\n") # normal read - self.stream.read_bytes(9, self.stop) - data = self.wait() + data = yield self.stream.read_bytes(9) self.assertEqual(data, b"HTTP/1.1 ") # zero bytes - self.stream.read_bytes(0, self.stop) - data = self.wait() + data = yield self.stream.read_bytes(0) self.assertEqual(data, b"") # another normal read - self.stream.read_bytes(3, self.stop) - data = self.wait() + data = yield self.stream.read_bytes(3) self.assertEqual(data, b"200") self.stream.close() + @gen_test def test_write_while_connecting(self): stream = self._make_client_iostream() connected = [False] + cond = Condition() def connected_callback(): connected[0] = True - self.stop() + cond.notify() stream.connect(("127.0.0.1", self.get_http_port()), callback=connected_callback) # unlike the previous tests, try to write before the connection @@ -109,19 +108,19 @@ class TestIOStreamWebMixin(object): def write_callback(): written[0] = True - self.stop() + cond.notify() stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n", callback=write_callback) self.assertTrue(not connected[0]) # by the time the write has flushed, the connection callback has # also run try: - self.wait(lambda: connected[0] and written[0]) + while not (connected[0] and written[0]): + yield cond.wait() finally: logging.debug((connected, written)) - stream.read_until_close(self.stop) - data = self.wait() + data = yield stream.read_until_close() self.assertTrue(data.endswith(b"Hello")) stream.close() @@ -176,91 +175,124 @@ class TestReadWriteMixin(object): def make_iostream_pair(self, **kwargs): raise NotImplementedError + @gen_test def test_write_zero_bytes(self): # Attempting to write zero bytes should run the callback without # going into an infinite loop. - rs, ws = self.make_iostream_pair() - ws.write(b'', callback=self.stop) - self.wait() + rs, ws = yield self.make_iostream_pair() + yield ws.write(b'') ws.close() rs.close() + @gen_test def test_streaming_callback(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() + try: + chunks = [] + cond = Condition() + + def streaming_callback(data): + chunks.append(data) + cond.notify() + + fut = rs.read_bytes(6, streaming_callback=streaming_callback) + ws.write(b"1234") + while not chunks: + yield cond.wait() + ws.write(b"5678") + final_data = yield(fut) + self.assertFalse(final_data) + self.assertEqual(chunks, [b"1234", b"56"]) + + # the rest of the last chunk is still in the buffer + data = yield rs.read_bytes(2) + self.assertEqual(data, b"78") + finally: + rs.close() + ws.close() + + @gen_test + def test_streaming_callback_with_final_callback(self): + rs, ws = yield self.make_iostream_pair() try: chunks = [] final_called = [] + cond = Condition() def streaming_callback(data): chunks.append(data) - self.stop() + cond.notify() def final_callback(data): self.assertFalse(data) final_called.append(True) - self.stop() - rs.read_bytes(6, callback=final_callback, - streaming_callback=streaming_callback) + cond.notify() + with ignore_deprecation(): + rs.read_bytes(6, callback=final_callback, + streaming_callback=streaming_callback) ws.write(b"1234") - self.wait(condition=lambda: chunks) + while not chunks: + yield cond.wait() ws.write(b"5678") - self.wait(condition=lambda: final_called) + while not final_called: + yield cond.wait() self.assertEqual(chunks, [b"1234", b"56"]) # the rest of the last chunk is still in the buffer - rs.read_bytes(2, callback=self.stop) - data = self.wait() + data = yield rs.read_bytes(2) self.assertEqual(data, b"78") finally: rs.close() ws.close() + @gen_test def test_streaming_callback_with_data_in_buffer(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() ws.write(b"abcd\r\nefgh") - rs.read_until(b"\r\n", self.stop) - data = self.wait() + data = yield rs.read_until(b"\r\n") self.assertEqual(data, b"abcd\r\n") - def closed_callback(chunk): - self.fail() - rs.read_until_close(callback=closed_callback, - streaming_callback=self.stop) - #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop) - data = self.wait() + streaming_fut = Future() + rs.read_until_close(streaming_callback=streaming_fut.set_result) + data = yield streaming_fut self.assertEqual(data, b"efgh") rs.close() ws.close() + @gen_test def test_streaming_until_close(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: chunks = [] closed = [False] + cond = Condition() def streaming_callback(data): chunks.append(data) - self.stop() + cond.notify() def close_callback(data): assert not data, data closed[0] = True - self.stop() - rs.read_until_close(callback=close_callback, - streaming_callback=streaming_callback) + cond.notify() + with ignore_deprecation(): + rs.read_until_close(callback=close_callback, + streaming_callback=streaming_callback) ws.write(b"1234") - self.wait(condition=lambda: len(chunks) == 1) - ws.write(b"5678", self.stop) - self.wait() + while len(chunks) != 1: + yield cond.wait() + yield ws.write(b"5678") ws.close() - self.wait(condition=lambda: closed[0]) + while not closed[0]: + yield cond.wait() self.assertEqual(chunks, [b"1234", b"5678"]) finally: ws.close() rs.close() + @gen_test def test_streaming_until_close_future(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: chunks = [] @@ -275,60 +307,58 @@ class TestReadWriteMixin(object): yield ws.write(b"5678") ws.close() - @gen.coroutine - def f(): - yield [rs_task(), ws_task()] - self.io_loop.run_sync(f) + yield [rs_task(), ws_task()] self.assertEqual(chunks, [b"1234", b"5678"]) finally: ws.close() rs.close() + @gen_test def test_delayed_close_callback(self): # The scenario: Server closes the connection while there is a pending # read that can be served out of buffered data. The client does not # run the close_callback as soon as it detects the close, but rather # defers it until after the buffered read has finished. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: - rs.set_close_callback(self.stop) + event = Event() + rs.set_close_callback(event.set) ws.write(b"12") chunks = [] def callback1(data): chunks.append(data) - rs.read_bytes(1, callback2) + with ignore_deprecation(): + rs.read_bytes(1, callback2) ws.close() def callback2(data): chunks.append(data) - rs.read_bytes(1, callback1) - self.wait() # stopped by close_callback + with ignore_deprecation(): + rs.read_bytes(1, callback1) + yield event.wait() # stopped by close_callback self.assertEqual(chunks, [b"1", b"2"]) finally: ws.close() rs.close() + @gen_test def test_future_delayed_close_callback(self): # Same as test_delayed_close_callback, but with the future interface. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() - # We can't call make_iostream_pair inside a gen_test function - # because the ioloop is not reentrant. - @gen_test - def f(self): + try: ws.write(b"12") chunks = [] chunks.append((yield rs.read_bytes(1))) ws.close() chunks.append((yield rs.read_bytes(1))) self.assertEqual(chunks, [b"1", b"2"]) - try: - f(self) finally: ws.close() rs.close() + @gen_test def test_close_buffered_data(self): # Similar to the previous test, but with data stored in the OS's # socket buffers instead of the IOStream's read buffer. Out-of-band @@ -338,71 +368,70 @@ class TestReadWriteMixin(object): # # This depends on the read_chunk_size being smaller than the # OS socket buffer, so make it small. - rs, ws = self.make_iostream_pair(read_chunk_size=256) + rs, ws = yield self.make_iostream_pair(read_chunk_size=256) try: ws.write(b"A" * 512) - rs.read_bytes(256, self.stop) - data = self.wait() + data = yield rs.read_bytes(256) self.assertEqual(b"A" * 256, data) ws.close() # Allow the close to propagate to the `rs` side of the # connection. Using add_callback instead of add_timeout # doesn't seem to work, even with multiple iterations - self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop) - self.wait() - rs.read_bytes(256, self.stop) - data = self.wait() + yield gen.sleep(0.01) + data = yield rs.read_bytes(256) self.assertEqual(b"A" * 256, data) finally: ws.close() rs.close() + @gen_test def test_read_until_close_after_close(self): # Similar to test_delayed_close_callback, but read_until_close takes # a separate code path so test it separately. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: ws.write(b"1234") ws.close() # Read one byte to make sure the client has received the data. # It won't run the close callback as long as there is more buffered # data that could satisfy a later read. - rs.read_bytes(1, self.stop) - data = self.wait() + data = yield rs.read_bytes(1) self.assertEqual(data, b"1") - rs.read_until_close(self.stop) - data = self.wait() + data = yield rs.read_until_close() self.assertEqual(data, b"234") finally: ws.close() rs.close() + @gen_test def test_streaming_read_until_close_after_close(self): # Same as the preceding test but with a streaming_callback. # All data should go through the streaming callback, # and the final read callback just gets an empty string. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: ws.write(b"1234") ws.close() - rs.read_bytes(1, self.stop) - data = self.wait() + data = yield rs.read_bytes(1) self.assertEqual(data, b"1") streaming_data = [] - rs.read_until_close(self.stop, - streaming_callback=streaming_data.append) - data = self.wait() - self.assertEqual(b'', data) + final_future = Future() + with ignore_deprecation(): + rs.read_until_close(final_future.set_result, + streaming_callback=streaming_data.append) + final_data = yield final_future + self.assertEqual(b'', final_data) self.assertEqual(b''.join(streaming_data), b"234") finally: ws.close() rs.close() + @gen_test def test_large_read_until(self): # Performance test: read_until used to have a quadratic component # so a read_until of 4MB would take 8 seconds; now it takes 0.25 # seconds. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: # This test fails on pypy with ssl. I think it's because # pypy's gc defeats moves objects, breaking the @@ -415,126 +444,131 @@ class TestReadWriteMixin(object): for i in range(NUM_KB): ws.write(b"A" * 1024) ws.write(b"\r\n") - rs.read_until(b"\r\n", self.stop) - data = self.wait() + data = yield rs.read_until(b"\r\n") self.assertEqual(len(data), NUM_KB * 1024 + 2) finally: ws.close() rs.close() + @gen_test def test_close_callback_with_pending_read(self): # Regression test for a bug that was introduced in 2.3 # where the IOStream._close_callback would never be called # if there were pending reads. OK = b"OK\r\n" - rs, ws = self.make_iostream_pair() - rs.set_close_callback(self.stop) + rs, ws = yield self.make_iostream_pair() + event = Event() + rs.set_close_callback(event.set) try: ws.write(OK) - rs.read_until(b"\r\n", self.stop) - res = self.wait() + res = yield rs.read_until(b"\r\n") self.assertEqual(res, OK) ws.close() - rs.read_until(b"\r\n", lambda x: x) + with ignore_deprecation(): + rs.read_until(b"\r\n", lambda x: x) # If _close_callback (self.stop) is not called, # an AssertionError: Async operation timed out after 5 seconds # will be raised. - res = self.wait() - self.assertTrue(res is None) + yield event.wait() finally: ws.close() rs.close() + @gen_test def test_future_close_callback(self): # Regression test for interaction between the Future read interfaces # and IOStream._maybe_add_error_listener. - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() closed = [False] + cond = Condition() def close_callback(): closed[0] = True - self.stop() + cond.notify() rs.set_close_callback(close_callback) try: ws.write(b'a') - future = rs.read_bytes(1) - self.io_loop.add_future(future, self.stop) - self.assertEqual(self.wait().result(), b'a') + res = yield rs.read_bytes(1) + self.assertEqual(res, b'a') self.assertFalse(closed[0]) ws.close() - self.wait() + yield cond.wait() self.assertTrue(closed[0]) finally: rs.close() ws.close() + @gen_test def test_write_memoryview(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: - rs.read_bytes(4, self.stop) + fut = rs.read_bytes(4) ws.write(memoryview(b"hello")) - data = self.wait() + data = yield fut self.assertEqual(data, b"hell") finally: ws.close() rs.close() + @gen_test def test_read_bytes_partial(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: # Ask for more than is available with partial=True - rs.read_bytes(50, self.stop, partial=True) + fut = rs.read_bytes(50, partial=True) ws.write(b"hello") - data = self.wait() + data = yield fut self.assertEqual(data, b"hello") # Ask for less than what is available; num_bytes is still # respected. - rs.read_bytes(3, self.stop, partial=True) + fut = rs.read_bytes(3, partial=True) ws.write(b"world") - data = self.wait() + data = yield fut self.assertEqual(data, b"wor") # Partial reads won't return an empty string, but read_bytes(0) # will. - rs.read_bytes(0, self.stop, partial=True) - data = self.wait() + data = yield rs.read_bytes(0, partial=True) self.assertEqual(data, b'') finally: ws.close() rs.close() + @gen_test def test_read_until_max_bytes(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Extra room under the limit - rs.read_until(b"def", self.stop, max_bytes=50) + fut = rs.read_until(b"def", max_bytes=50) ws.write(b"abcdef") - data = self.wait() + data = yield fut self.assertEqual(data, b"abcdef") # Just enough space - rs.read_until(b"def", self.stop, max_bytes=6) + fut = rs.read_until(b"def", max_bytes=6) ws.write(b"abcdef") - data = self.wait() + data = yield fut self.assertEqual(data, b"abcdef") # Not enough space, but we don't know it until all we can do is # log a warning and close the connection. with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until(b"def", self.stop, max_bytes=5) + fut = rs.read_until(b"def", max_bytes=5) ws.write(b"123456") - data = self.wait() - self.assertEqual(data, "closed") + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_read_until_max_bytes_inline(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Similar to the error case in the previous test, but the # ws writes first so rs reads are satisfied @@ -542,59 +576,63 @@ class TestReadWriteMixin(object): # do not raise the error synchronously. ws.write(b"123456") with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until(b"def", self.stop, max_bytes=5) - data = self.wait() - self.assertEqual(data, "closed") + with ignore_deprecation(): + rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5) + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_read_until_max_bytes_ignores_extra(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Even though data that matches arrives the same packet that # puts us over the limit, we fail the request because it was not # found within the limit. ws.write(b"abcdef") with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until(b"def", self.stop, max_bytes=5) - data = self.wait() - self.assertEqual(data, "closed") + rs.read_until(b"def", max_bytes=5) + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_read_until_regex_max_bytes(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Extra room under the limit - rs.read_until_regex(b"def", self.stop, max_bytes=50) + fut = rs.read_until_regex(b"def", max_bytes=50) ws.write(b"abcdef") - data = self.wait() + data = yield fut self.assertEqual(data, b"abcdef") # Just enough space - rs.read_until_regex(b"def", self.stop, max_bytes=6) + fut = rs.read_until_regex(b"def", max_bytes=6) ws.write(b"abcdef") - data = self.wait() + data = yield fut self.assertEqual(data, b"abcdef") # Not enough space, but we don't know it until all we can do is # log a warning and close the connection. with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until_regex(b"def", self.stop, max_bytes=5) + rs.read_until_regex(b"def", max_bytes=5) ws.write(b"123456") - data = self.wait() - self.assertEqual(data, "closed") + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_read_until_regex_max_bytes_inline(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Similar to the error case in the previous test, but the # ws writes first so rs reads are satisfied @@ -602,102 +640,101 @@ class TestReadWriteMixin(object): # do not raise the error synchronously. ws.write(b"123456") with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until_regex(b"def", self.stop, max_bytes=5) - data = self.wait() - self.assertEqual(data, "closed") + rs.read_until_regex(b"def", max_bytes=5) + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_read_until_regex_max_bytes_ignores_extra(self): - rs, ws = self.make_iostream_pair() - rs.set_close_callback(lambda: self.stop("closed")) + rs, ws = yield self.make_iostream_pair() + closed = Event() + rs.set_close_callback(closed.set) try: # Even though data that matches arrives the same packet that # puts us over the limit, we fail the request because it was not # found within the limit. ws.write(b"abcdef") with ExpectLog(gen_log, "Unsatisfiable read"): - rs.read_until_regex(b"def", self.stop, max_bytes=5) - data = self.wait() - self.assertEqual(data, "closed") + rs.read_until_regex(b"def", max_bytes=5) + yield closed.wait() finally: ws.close() rs.close() + @gen_test def test_small_reads_from_large_buffer(self): # 10KB buffer size, 100KB available to read. # Read 1KB at a time and make sure that the buffer is not eagerly # filled. - rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024) + rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024) try: ws.write(b"a" * 1024 * 100) for i in range(100): - rs.read_bytes(1024, self.stop) - data = self.wait() + data = yield rs.read_bytes(1024) self.assertEqual(data, b"a" * 1024) finally: ws.close() rs.close() + @gen_test def test_small_read_untils_from_large_buffer(self): # 10KB buffer size, 100KB available to read. # Read 1KB at a time and make sure that the buffer is not eagerly # filled. - rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024) + rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024) try: ws.write((b"a" * 1023 + b"\n") * 100) for i in range(100): - rs.read_until(b"\n", self.stop, max_bytes=4096) - data = self.wait() + data = yield rs.read_until(b"\n", max_bytes=4096) self.assertEqual(data, b"a" * 1023 + b"\n") finally: ws.close() rs.close() + @gen_test def test_flow_control(self): MB = 1024 * 1024 - rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB) + rs, ws = yield self.make_iostream_pair(max_buffer_size=5 * MB) try: # Client writes more than the rs will accept. ws.write(b"a" * 10 * MB) # The rs pauses while reading. - rs.read_bytes(MB, self.stop) - self.wait() - self.io_loop.call_later(0.1, self.stop) - self.wait() + yield rs.read_bytes(MB) + yield gen.sleep(0.1) # The ws's writes have been blocked; the rs can # continue to read gradually. for i in range(9): - rs.read_bytes(MB, self.stop) - self.wait() + yield rs.read_bytes(MB) finally: rs.close() ws.close() + @gen_test def test_read_into(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() def sleep_some(): self.io_loop.run_sync(lambda: gen.sleep(0.05)) try: buf = bytearray(10) - rs.read_into(buf, callback=self.stop) + fut = rs.read_into(buf) ws.write(b"hello") - sleep_some() + yield gen.sleep(0.05) self.assertTrue(rs.reading()) ws.write(b"world!!") - data = self.wait() + data = yield fut self.assertFalse(rs.reading()) self.assertEqual(data, 10) self.assertEqual(bytes(buf), b"helloworld") # Existing buffer is fed into user buffer - rs.read_into(buf, callback=self.stop) - sleep_some() + fut = rs.read_into(buf) + yield gen.sleep(0.05) self.assertTrue(rs.reading()) ws.write(b"1234567890") - data = self.wait() + data = yield fut self.assertFalse(rs.reading()) self.assertEqual(data, 10) self.assertEqual(bytes(buf), b"!!12345678") @@ -705,43 +742,38 @@ class TestReadWriteMixin(object): # Existing buffer can satisfy read immediately buf = bytearray(4) ws.write(b"abcdefghi") - rs.read_into(buf, callback=self.stop) - data = self.wait() + data = yield rs.read_into(buf) self.assertEqual(data, 4) self.assertEqual(bytes(buf), b"90ab") - rs.read_bytes(7, self.stop) - data = self.wait() + data = yield rs.read_bytes(7) self.assertEqual(data, b"cdefghi") finally: ws.close() rs.close() + @gen_test def test_read_into_partial(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() - def sleep_some(): - self.io_loop.run_sync(lambda: gen.sleep(0.05)) try: # Partial read buf = bytearray(10) - rs.read_into(buf, callback=self.stop, partial=True) + fut = rs.read_into(buf, partial=True) ws.write(b"hello") - data = self.wait() + data = yield fut self.assertFalse(rs.reading()) self.assertEqual(data, 5) self.assertEqual(bytes(buf), b"hello\0\0\0\0\0") # Full read despite partial=True ws.write(b"world!1234567890") - rs.read_into(buf, callback=self.stop, partial=True) - data = self.wait() + data = yield rs.read_into(buf, partial=True) self.assertEqual(data, 10) self.assertEqual(bytes(buf), b"world!1234") # Existing buffer can satisfy read immediately - rs.read_into(buf, callback=self.stop, partial=True) - data = self.wait() + data = yield rs.read_into(buf, partial=True) self.assertEqual(data, 6) self.assertEqual(bytes(buf), b"5678901234") @@ -749,8 +781,9 @@ class TestReadWriteMixin(object): ws.close() rs.close() + @gen_test def test_read_into_zero_bytes(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() try: buf = bytearray() fut = rs.read_into(buf) @@ -759,13 +792,14 @@ class TestReadWriteMixin(object): ws.close() rs.close() + @gen_test def test_many_mixed_reads(self): # Stress buffer handling when going back and forth between # read_bytes() (using an internal buffer) and read_into() # (using a user-allocated buffer). r = random.Random(42) nbytes = 1000000 - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() produce_hash = hashlib.sha1() consume_hash = hashlib.sha1() @@ -801,13 +835,9 @@ class TestReadWriteMixin(object): remaining -= size assert remaining == 0 - @gen.coroutine - def main(): + try: yield [produce(), consume()] assert produce_hash.hexdigest() == consume_hash.hexdigest() - - try: - self.io_loop.run_sync(main) finally: ws.close() rs.close() @@ -820,25 +850,21 @@ class TestIOStreamMixin(TestReadWriteMixin): def _make_client_iostream(self, connection, **kwargs): raise NotImplementedError() + @gen.coroutine def make_iostream_pair(self, **kwargs): listener, port = bind_unused_port() - streams = [None, None] + server_stream_fut = Future() def accept_callback(connection, address): - streams[0] = self._make_server_iostream(connection, **kwargs) - self.stop() + server_stream_fut.set_result(self._make_server_iostream(connection, **kwargs)) - def connect_callback(): - streams[1] = client_stream - self.stop() netutil.add_accept_handler(listener, accept_callback) client_stream = self._make_client_iostream(socket.socket(), **kwargs) - client_stream.connect(('127.0.0.1', port), - callback=connect_callback) - self.wait(condition=lambda: all(streams)) + connect_fut = client_stream.connect(('127.0.0.1', port)) + server_stream, client_stream = yield [server_stream_fut, connect_fut] self.io_loop.remove_handler(listener.fileno()) listener.close() - return streams + raise gen.Return((server_stream, client_stream)) def test_connection_refused(self): # When a connection is refused, the connect callback should not @@ -883,38 +909,44 @@ class TestIOStreamMixin(TestReadWriteMixin): self.wait() self.assertIsInstance(stream.error, socket.gaierror) + @gen_test def test_read_callback_error(self): # Test that IOStream sets its exc_info when a read callback throws - server, client = self.make_iostream_pair() + server, client = yield self.make_iostream_pair() try: - server.set_close_callback(self.stop) + closed = Event() + server.set_close_callback(closed.set) with ExpectLog( app_log, "(Uncaught exception|Exception in callback)" ): # Clear ExceptionStackContext so IOStream catches error with NullContext(): - server.read_bytes(1, callback=lambda data: 1 / 0) + with ignore_deprecation(): + server.read_bytes(1, callback=lambda data: 1 / 0) client.write(b"1") - self.wait() + yield closed.wait() self.assertTrue(isinstance(server.error, ZeroDivisionError)) finally: server.close() client.close() @unittest.skipIf(mock is None, 'mock package not present') + @gen_test def test_read_until_close_with_error(self): - server, client = self.make_iostream_pair() + server, client = yield self.make_iostream_pair() try: with mock.patch('tornado.iostream.BaseIOStream._try_inline_read', side_effect=IOError('boom')): with self.assertRaisesRegexp(IOError, 'boom'): - client.read_until_close(self.stop) + with ignore_deprecation(): + client.read_until_close(lambda x: None) finally: server.close() client.close() @skipIfNonUnix @skipPypy3V58 + @gen_test def test_inline_read_error(self): # An error on an inline read is raised without logging (on the # assumption that it will eventually be noticed or logged further @@ -924,24 +956,27 @@ class TestIOStreamMixin(TestReadWriteMixin): # on socket FDs, but we can't close the socket object normally # because we won't get the error we want if the socket knows # it's closed. - server, client = self.make_iostream_pair() + server, client = yield self.make_iostream_pair() try: os.close(server.socket.fileno()) with self.assertRaises(socket.error): - server.read_bytes(1, lambda data: None) + server.read_bytes(1) finally: server.close() client.close() @skipPypy3V58 + @gen_test def test_async_read_error_logging(self): # Socket errors on asynchronous reads should be logged (but only # once). - server, client = self.make_iostream_pair() - server.set_close_callback(self.stop) + server, client = yield self.make_iostream_pair() + closed = Event() + server.set_close_callback(closed.set) try: # Start a read that will be fulfilled asynchronously. - server.read_bytes(1, lambda data: None) + with ignore_deprecation(): + server.read_bytes(1, lambda data: None) client.write(b'a') # Stub out read_from_fd to make it fail. @@ -951,11 +986,12 @@ class TestIOStreamMixin(TestReadWriteMixin): server.read_from_fd = fake_read_from_fd # This log message is from _handle_read (not read_from_fd). with ExpectLog(gen_log, "error on read"): - self.wait() + yield closed.wait() finally: server.close() client.close() + @gen_test def test_future_write(self): """ Test that write() Futures are never orphaned. @@ -965,7 +1001,7 @@ class TestIOStreamMixin(TestReadWriteMixin): m, n = 10000, 1000 nproducers = 10 total_bytes = m * n * nproducers - server, client = self.make_iostream_pair(max_buffer_size=total_bytes) + server, client = yield self.make_iostream_pair(max_buffer_size=total_bytes) @gen.coroutine def produce(): @@ -980,12 +1016,8 @@ class TestIOStreamMixin(TestReadWriteMixin): res = yield client.read_bytes(m) nread += len(res) - @gen.coroutine - def main(): - yield [produce() for i in range(nproducers)] + [consume()] - try: - self.io_loop.run_sync(main) + yield [produce() for i in range(nproducers)] + [consume()] finally: server.close() client.close() @@ -1241,43 +1273,42 @@ class WaitForHandshakeTest(AsyncTestCase): @skipIfNonUnix class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase): + @gen.coroutine def make_iostream_pair(self, **kwargs): r, w = os.pipe() return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs) + @gen_test def test_pipe_iostream(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() ws.write(b"hel") ws.write(b"lo world") - rs.read_until(b' ', callback=self.stop) - data = self.wait() + data = yield rs.read_until(b' ') self.assertEqual(data, b"hello ") - rs.read_bytes(3, self.stop) - data = self.wait() + data = yield rs.read_bytes(3) self.assertEqual(data, b"wor") ws.close() - rs.read_until_close(self.stop) - data = self.wait() + data = yield rs.read_until_close() self.assertEqual(data, b"ld") rs.close() + @gen_test def test_pipe_iostream_big_write(self): - rs, ws = self.make_iostream_pair() + rs, ws = yield self.make_iostream_pair() NUM_BYTES = 1048576 # Write 1MB of data, which should fill the buffer ws.write(b"1" * NUM_BYTES) - rs.read_bytes(NUM_BYTES, self.stop) - data = self.wait() + data = yield rs.read_bytes(NUM_BYTES) self.assertEqual(data, b"1" * NUM_BYTES) ws.close() diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py index 4589e51bc..1095b1126 100644 --- a/tornado/test/process_test.py +++ b/tornado/test/process_test.py @@ -143,6 +143,7 @@ class ProcessTest(unittest.TestCase): @skipIfNonUnix class SubprocessTest(AsyncTestCase): + @gen_test def test_subprocess(self): if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'): # This test fails non-deterministically with LayeredTwistedIOLoop. @@ -158,33 +159,29 @@ class SubprocessTest(AsyncTestCase): self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) self.addCleanup(subproc.stdout.close) self.addCleanup(subproc.stdin.close) - subproc.stdout.read_until(b'>>> ', self.stop) - self.wait() + yield subproc.stdout.read_until(b'>>> ') subproc.stdin.write(b"print('hello')\n") - subproc.stdout.read_until(b'\n', self.stop) - data = self.wait() + data = yield subproc.stdout.read_until(b'\n') self.assertEqual(data, b"hello\n") - subproc.stdout.read_until(b">>> ", self.stop) - self.wait() + yield subproc.stdout.read_until(b">>> ") subproc.stdin.write(b"raise SystemExit\n") - subproc.stdout.read_until_close(self.stop) - data = self.wait() + data = yield subproc.stdout.read_until_close() self.assertEqual(data, b"") + @gen_test def test_close_stdin(self): # Close the parent's stdin handle and see that the child recognizes it. subproc = Subprocess([sys.executable, '-u', '-i'], stdin=Subprocess.STREAM, stdout=Subprocess.STREAM, stderr=subprocess.STDOUT) self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) - subproc.stdout.read_until(b'>>> ', self.stop) - self.wait() + yield subproc.stdout.read_until(b'>>> ') subproc.stdin.close() - subproc.stdout.read_until_close(self.stop) - data = self.wait() + data = yield subproc.stdout.read_until_close() self.assertEqual(data, b"\n") + @gen_test def test_stderr(self): # This test is mysteriously flaky on twisted: it succeeds, but logs # an error of EBADF on closing a file descriptor. @@ -193,8 +190,7 @@ class SubprocessTest(AsyncTestCase): r"import sys; sys.stderr.write('hello\n')"], stderr=Subprocess.STREAM) self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) - subproc.stderr.read_until(b'\n', self.stop) - data = self.wait() + data = yield subproc.stderr.read_until(b'\n') self.assertEqual(data, b'hello\n') # More mysterious EBADF: This fails if done with self.addCleanup instead of here. subproc.stderr.close() diff --git a/tornado/websocket.py b/tornado/websocket.py index 738a9ccb8..ced6d657f 100644 --- a/tornado/websocket.py +++ b/tornado/websocket.py @@ -19,11 +19,13 @@ the protocol (known as "draft 76") and are not compatible with this module. from __future__ import absolute_import, division, print_function import base64 +import contextlib import hashlib import os import struct import tornado.escape import tornado.web +import warnings import zlib from tornado.concurrent import Future, future_set_result_unless_cancelled @@ -44,6 +46,14 @@ else: from urlparse import urlparse # py3 +@contextlib.contextmanager +def ignore_deprecation(): + """Context manager to ignore deprecation warnings.""" + with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + yield + + class WebSocketError(Exception): pass @@ -838,7 +848,8 @@ class WebSocketProtocol13(WebSocketProtocol): def _receive_frame(self): try: - self.stream.read_bytes(2, self._on_frame_start) + with ignore_deprecation(): + self.stream.read_bytes(2, self._on_frame_start) except StreamClosedError: self._abort() @@ -863,16 +874,17 @@ class WebSocketProtocol13(WebSocketProtocol): self._abort() return try: - if payloadlen < 126: - self._frame_length = payloadlen - if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) - else: - self._read_frame_data(False) - elif payloadlen == 126: - self.stream.read_bytes(2, self._on_frame_length_16) - elif payloadlen == 127: - self.stream.read_bytes(8, self._on_frame_length_64) + with ignore_deprecation(): + if payloadlen < 126: + self._frame_length = payloadlen + if self._masked_frame: + self.stream.read_bytes(4, self._on_masking_key) + else: + self._read_frame_data(False) + elif payloadlen == 126: + self.stream.read_bytes(2, self._on_frame_length_16) + elif payloadlen == 127: + self.stream.read_bytes(8, self._on_frame_length_64) except StreamClosedError: self._abort() @@ -883,16 +895,18 @@ class WebSocketProtocol13(WebSocketProtocol): if new_len > (self.handler.max_message_size or 10 * 1024 * 1024): self.close(1009, "message too big") return - self.stream.read_bytes( - self._frame_length, - self._on_masked_frame_data if masked else self._on_frame_data) + with ignore_deprecation(): + self.stream.read_bytes( + self._frame_length, + self._on_masked_frame_data if masked else self._on_frame_data) def _on_frame_length_16(self, data): self._wire_bytes_in += len(data) self._frame_length = struct.unpack("!H", data)[0] try: if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) + with ignore_deprecation(): + self.stream.read_bytes(4, self._on_masking_key) else: self._read_frame_data(False) except StreamClosedError: @@ -903,7 +917,8 @@ class WebSocketProtocol13(WebSocketProtocol): self._frame_length = struct.unpack("!Q", data)[0] try: if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) + with ignore_deprecation(): + self.stream.read_bytes(4, self._on_masking_key) else: self._read_frame_data(False) except StreamClosedError: