From: Ben Darnell Date: Sat, 3 Oct 2015 22:44:01 +0000 (-0400) Subject: Deprecate gen.maybe_future and remove all internal uses. X-Git-Tag: v4.3.0b1~8^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=52f94d006750cfa2f823dcde86ba71f79030ca42;p=thirdparty%2Ftornado.git Deprecate gen.maybe_future and remove all internal uses. This allows native coroutines in the HTTPMessageDelegate.data_received methods. --- diff --git a/docs/gen.rst b/docs/gen.rst index c56413fdd..8e867ed0e 100644 --- a/docs/gen.rst +++ b/docs/gen.rst @@ -23,8 +23,6 @@ .. autofunction:: with_timeout .. autoexception:: TimeoutError - .. autofunction:: maybe_future - .. autofunction:: sleep .. autodata:: moment @@ -50,6 +48,8 @@ .. autofunction:: convert_yielded + .. autofunction:: maybe_future + Legacy interface ---------------- diff --git a/tornado/gen.py b/tornado/gen.py index 31521b332..a1e89dcb2 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -804,6 +804,11 @@ def maybe_future(x): it is wrapped in a new `.Future`. This is suitable for use as ``result = yield gen.maybe_future(f())`` when you don't know whether ``f()`` returns a `.Future` or not. + + .. deprecated:: 4.3 + This function only handles ``Futures``, not other yieldable objects. + Instead of `maybe_future`, check for the non-future result types + you expect (often just ``None``), and ``yield`` anything unknown. """ if is_future(x): return x diff --git a/tornado/http1connection.py b/tornado/http1connection.py index 6226ef7af..c9eb2ad4c 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -558,7 +558,9 @@ class HTTP1Connection(httputil.HTTPConnection): content_length -= len(body) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): - yield gen.maybe_future(delegate.data_received(body)) + ret = delegate.data_received(body) + if ret is not None: + yield ret @gen.coroutine def _read_chunked_body(self, delegate): @@ -579,7 +581,9 @@ class HTTP1Connection(httputil.HTTPConnection): bytes_to_read -= len(chunk) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): - yield gen.maybe_future(delegate.data_received(chunk)) + ret = delegate.data_received(chunk) + if ret is not None: + yield ret # chunk ends with \r\n crlf = yield self.stream.read_bytes(2) assert crlf == b"\r\n" @@ -619,11 +623,14 @@ class _GzipMessageDelegate(httputil.HTTPMessageDelegate): decompressed = self._decompressor.decompress( compressed_data, self._chunk_size) if decompressed: - yield gen.maybe_future( - self._delegate.data_received(decompressed)) + ret = self._delegate.data_received(decompressed) + if ret is not None: + yield ret compressed_data = self._decompressor.unconsumed_tail else: - yield gen.maybe_future(self._delegate.data_received(chunk)) + ret = self._delegate.data_received(chunk) + if ret is not None: + yield ret def finish(self): if self._decompressor is not None: diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 561bad3e5..9cb64afd5 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -3,13 +3,14 @@ from tornado.concurrent import Future from tornado import gen from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring from tornado.httputil import format_timestamp +from tornado.ioloop import IOLoop from tornado.iostream import IOStream from tornado import locale from tornado.log import app_log, gen_log from tornado.simple_httpclient import SimpleAsyncHTTPClient from tornado.template import DictLoader from tornado.testing import AsyncHTTPTestCase, ExpectLog, gen_test -from tornado.test.util import unittest +from tornado.test.util import unittest, skipBefore35, exec_test from tornado.util import u, ObjectDict, unicode_type, timedelta_to_seconds from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body, Finish, removeslash, addslash, RedirectHandler as WebRedirectHandler, get_signature_key_version, GZipContentEncoding @@ -18,6 +19,8 @@ import contextlib import copy import datetime import email.utils +import gzip +from io import BytesIO import itertools import logging import os @@ -2097,59 +2100,55 @@ class StreamingRequestBodyTest(WebTestCase): yield self.close_future -class StreamingRequestFlowControlTest(WebTestCase): - def get_handlers(self): - from tornado.ioloop import IOLoop - - # Each method in this handler returns a Future and yields to the - # IOLoop so the future is not immediately ready. Ensure that the - # Futures are respected and no method is called before the previous - # one has completed. - @stream_request_body - class FlowControlHandler(RequestHandler): - def initialize(self, test): - self.test = test - self.method = None - self.methods = [] - - @contextlib.contextmanager - def in_method(self, method): - if self.method is not None: - self.test.fail("entered method %s while in %s" % - (method, self.method)) - self.method = method - self.methods.append(method) - try: - yield - finally: - self.method = None - - @gen.coroutine - def prepare(self): - # Note that asynchronous prepare() does not block data_received, - # so we don't use in_method here. - self.methods.append('prepare') - yield gen.Task(IOLoop.current().add_callback) +# Each method in this handler returns a yieldable object and yields to the +# IOLoop so the future is not immediately ready. Ensure that the +# yieldables are respected and no method is called before the previous +# one has completed. +@stream_request_body +class BaseFlowControlHandler(RequestHandler): + def initialize(self, test): + self.test = test + self.method = None + self.methods = [] + + @contextlib.contextmanager + def in_method(self, method): + if self.method is not None: + self.test.fail("entered method %s while in %s" % + (method, self.method)) + self.method = method + self.methods.append(method) + try: + yield + finally: + self.method = None - @gen.coroutine - def data_received(self, data): - with self.in_method('data_received'): - yield gen.Task(IOLoop.current().add_callback) + @gen.coroutine + def prepare(self): + # Note that asynchronous prepare() does not block data_received, + # so we don't use in_method here. + self.methods.append('prepare') + yield gen.Task(IOLoop.current().add_callback) - @gen.coroutine - def post(self): - with self.in_method('post'): - yield gen.Task(IOLoop.current().add_callback) - self.write(dict(methods=self.methods)) + @gen.coroutine + def post(self): + with self.in_method('post'): + yield gen.Task(IOLoop.current().add_callback) + self.write(dict(methods=self.methods)) - return [('/', FlowControlHandler, dict(test=self))] +class BaseStreamingRequestFlowControlTest(object): def get_httpserver_options(self): # Use a small chunk size so flow control is relevant even though # all the data arrives at once. - return dict(chunk_size=10) + return dict(chunk_size=10, decompress_request=True) - def test_flow_control(self): + def get_http_client(self): + # simple_httpclient only: curl doesn't support body_producer. + return SimpleAsyncHTTPClient(io_loop=self.io_loop) + + # Test all the slightly different code paths for fixed, chunked, etc bodies. + def test_flow_control_fixed_body(self): response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz', method='POST') response.rethrow() @@ -2158,6 +2157,58 @@ class StreamingRequestFlowControlTest(WebTestCase): 'data_received', 'data_received', 'post'])) + def test_flow_control_chunked_body(self): + chunks = [b'abcd', b'efgh', b'ijkl'] + @gen.coroutine + def body_producer(write): + for i in chunks: + yield write(i) + response = self.fetch('/', body_producer=body_producer, method='POST') + response.rethrow() + self.assertEqual(json_decode(response.body), + dict(methods=['prepare', 'data_received', + 'data_received', 'data_received', + 'post'])) + + def test_flow_control_compressed_body(self): + bytesio = BytesIO() + gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio) + gzip_file.write(b'abcdefghijklmnopqrstuvwxyz') + gzip_file.close() + compressed_body = bytesio.getvalue() + response = self.fetch('/', body=compressed_body, method='POST', + headers={'Content-Encoding': 'gzip'}) + response.rethrow() + self.assertEqual(json_decode(response.body), + dict(methods=['prepare', 'data_received', + 'data_received', 'data_received', + 'post'])) + +class DecoratedStreamingRequestFlowControlTest( + BaseStreamingRequestFlowControlTest, + WebTestCase): + def get_handlers(self): + class DecoratedFlowControlHandler(BaseFlowControlHandler): + @gen.coroutine + def data_received(self, data): + with self.in_method('data_received'): + yield gen.Task(IOLoop.current().add_callback) + return [('/', DecoratedFlowControlHandler, dict(test=self))] + + +@skipBefore35 +class NativeStreamingRequestFlowControlTest( + BaseStreamingRequestFlowControlTest, + WebTestCase): + def get_handlers(self): + class NativeFlowControlHandler(BaseFlowControlHandler): + data_received = exec_test(globals(), locals(), """ + async def data_received(self, data): + with self.in_method('data_received'): + await gen.Task(IOLoop.current().add_callback) + """)["data_received"] + return [('/', NativeFlowControlHandler, dict(test=self))] + @wsgi_safe class IncorrectContentLengthTest(SimpleHandlerTestCase):