From: Ben Darnell Date: Sat, 29 Mar 2014 17:23:32 +0000 (+0000) Subject: Implement flow control for streaming RequestHandlers. X-Git-Tag: v4.0.0b1~91^2~32 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f95b4d7a34f161a56ee3ffabf8cb5ff252e05973;p=thirdparty%2Ftornado.git Implement flow control for streaming RequestHandlers. --- diff --git a/tornado/http1connection.py b/tornado/http1connection.py index 571ed678d..078d967b8 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -118,7 +118,10 @@ class HTTP1Connection(object): self._disconnect_on_finish = not self._can_keep_alive( start_line, headers) - self.message_delegate.headers_received(start_line, headers) + header_future = self.message_delegate.headers_received( + start_line, headers) + if header_future is not None: + yield header_future if self.stream is None: # We've been detached. # TODO: where else do we need to check for detach? diff --git a/tornado/httpserver.py b/tornado/httpserver.py index 1515150b4..1a0aff297 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -211,13 +211,13 @@ class _ServerRequestAdapter(httputil.HTTPMessageDelegate): connection=self.connection, start_line=start_line, headers=headers) else: - self.delegate.headers_received(start_line, headers) + return self.delegate.headers_received(start_line, headers) def data_received(self, chunk): if self.delegate is None: self._chunks.append(chunk) else: - self.delegate.data_received(chunk) + return self.delegate.data_received(chunk) def finish(self): if self.delegate is None: diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index b080ddf56..53d034a35 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -13,6 +13,7 @@ from tornado.util import u, bytes_type, ObjectDict, unicode_type from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body import binascii +import contextlib import datetime import email.utils import logging @@ -1876,3 +1877,63 @@ class StreamingRequestBodyTest(WebTestCase): stream.write(b"4\r\nasdf\r\n") data = yield gen.Task(stream.read_until_close) self.assertTrue(data.startswith(b"HTTP/1.1 401")) + + +class StreamingRequestFlowControlTest(WebTestCase): + def get_handlers(self): + from tornado.ioloop import IOLoop + + # Each method in this handler returns a Future and yields to the + # IOLoop so the future is not immediately ready. Ensure that the + # Futures are respected and no method is called before the previous + # one has completed. + @stream_request_body + class FlowControlHandler(RequestHandler): + def initialize(self, test): + self.test = test + self.method = None + self.methods = [] + + @contextlib.contextmanager + def in_method(self, method): + if self.method is not None: + self.test.fail("entered method %s while in %s" % + (method, self.method)) + self.method = method + self.methods.append(method) + try: + yield + finally: + self.method = None + + @gen.coroutine + def prepare(self): + with self.in_method('prepare'): + yield gen.Task(IOLoop.current().add_callback) + + @gen.coroutine + def data_received(self, data): + with self.in_method('data_received'): + yield gen.Task(IOLoop.current().add_callback) + + @gen.coroutine + def post(self): + with self.in_method('post'): + yield gen.Task(IOLoop.current().add_callback) + self.write(dict(methods=self.methods)) + + return [('/', FlowControlHandler, dict(test=self))] + + def get_httpserver_options(self): + # Use a small chunk size so flow control is relevant even though + # all the data arrives at once. + return dict(chunk_size=10) + + def test_flow_control(self): + response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz', + method='POST') + response.rethrow() + self.assertEqual(json_decode(response.body), + dict(methods=['prepare', 'data_received', + 'data_received', 'data_received', + 'post'])) diff --git a/tornado/web.py b/tornado/web.py index 60b854f5f..c318f33c0 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -129,6 +129,7 @@ class RequestHandler(object): self._finished = False self._auto_finish = True self._transforms = None # will be set in _execute + self._prepared_future = None self.path_args = None self.path_kwargs = None self.ui = ObjectDict((n, self._ui_method(m)) for n, m in @@ -1216,6 +1217,10 @@ class RequestHandler(object): result = yield result if result is not None: raise TypeError("Expected None, got %r" % result) + if self._prepared_future is not None: + # Tell the Application we've finished with prepare() + # and are ready for the body to arrive. + self._prepared_future.set_result(None) if self._finished: return @@ -1236,6 +1241,12 @@ class RequestHandler(object): self.finish() except Exception as e: self._handle_request_exception(e) + if (self._prepared_future is not None and + not self._prepared_future.done()): + # In case we failed before setting _prepared_future, do it + # now (to unblock the HTTP server). Note that this is not + # in a finally block to avoid GC issues prior to Python 3.4. + self._prepared_future.set_result(None) def _log(self): """Logs the current request. @@ -1386,6 +1397,9 @@ def stream_request_body(cls): * The subclass must define a method ``data_received(self, data):``, which will be called zero or more times as data is available. Note that if the request has an empty body, ``data_received`` may not be called. + * ``prepare`` and ``data_received`` may return Futures (such as via + ``@gen.coroutine``, in which case the next method will not be called + until those futures have completed. * The regular HTTP method (``post``, ``put``, etc) will be called after the entire body has been read. @@ -1703,7 +1717,7 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate): connection=self.connection, start_line=start_line, headers=headers)) if self.stream_request_body: self.request.body = Future() - self.execute() + return self.execute() def set_request(self, request): self.request = request @@ -1747,7 +1761,7 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate): def data_received(self, data): if self.stream_request_body: - self.handler.data_received(data) + return self.handler.data_received(data) else: self.chunks.append(data) @@ -1773,12 +1787,20 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate): self.handler = self.handler_class(self.application, self.request, **self.handler_kwargs) transforms = [t(self.request) for t in self.application.transforms] + + if self.stream_request_body: + self.handler._prepared_future = Future() # Note that if an exception escapes handler._execute it will be # trapped in the Future it returns (which we are ignoring here). # However, that shouldn't happen because _execute has a blanket # except handler, and we cannot easily access the IOLoop here to # call add_future. self.handler._execute(transforms, *self.path_args, **self.path_kwargs) + # If we are streaming the request body, then execute() is finished + # when the handler has prepared to receive the body. If not, + # it doesn't matter when execute() finishes (so we return None) + return self.handler._prepared_future + class HTTPError(Exception):