]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Implement flow control for streaming RequestHandlers.
authorBen Darnell <ben@bendarnell.com>
Sat, 29 Mar 2014 17:23:32 +0000 (17:23 +0000)
committerBen Darnell <ben@bendarnell.com>
Sat, 29 Mar 2014 17:23:32 +0000 (17:23 +0000)
tornado/http1connection.py
tornado/httpserver.py
tornado/test/web_test.py
tornado/web.py

index 571ed678d8bfe867f232d7e4e528e7ee5f8b7a15..078d967b866b92a70f1b1d8004782f37217739f5 100644 (file)
@@ -118,7 +118,10 @@ class HTTP1Connection(object):
 
             self._disconnect_on_finish = not self._can_keep_alive(
                 start_line, headers)
-            self.message_delegate.headers_received(start_line, headers)
+            header_future = self.message_delegate.headers_received(
+                start_line, headers)
+            if header_future is not None:
+                yield header_future
             if self.stream is None:
                 # We've been detached.
                 # TODO: where else do we need to check for detach?
index 1515150b4df3d461f2e338b9a0cc835331676cdc..1a0aff297bbb61b05e4360ce98c3ac130d6a19de 100644 (file)
@@ -211,13 +211,13 @@ class _ServerRequestAdapter(httputil.HTTPMessageDelegate):
                 connection=self.connection, start_line=start_line,
                 headers=headers)
         else:
-            self.delegate.headers_received(start_line, headers)
+            return self.delegate.headers_received(start_line, headers)
 
     def data_received(self, chunk):
         if self.delegate is None:
             self._chunks.append(chunk)
         else:
-            self.delegate.data_received(chunk)
+            return self.delegate.data_received(chunk)
 
     def finish(self):
         if self.delegate is None:
index b080ddf566e4662f93940ad1ac4eee0fa01ab61d..53d034a35245c993ddc4b7cd90b49cc37d920e90 100644 (file)
@@ -13,6 +13,7 @@ from tornado.util import u, bytes_type, ObjectDict, unicode_type
 from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body
 
 import binascii
+import contextlib
 import datetime
 import email.utils
 import logging
@@ -1876,3 +1877,63 @@ class StreamingRequestBodyTest(WebTestCase):
         stream.write(b"4\r\nasdf\r\n")
         data = yield gen.Task(stream.read_until_close)
         self.assertTrue(data.startswith(b"HTTP/1.1 401"))
+
+
+class StreamingRequestFlowControlTest(WebTestCase):
+    def get_handlers(self):
+        from tornado.ioloop import IOLoop
+
+        # Each method in this handler returns a Future and yields to the
+        # IOLoop so the future is not immediately ready.  Ensure that the
+        # Futures are respected and no method is called before the previous
+        # one has completed.
+        @stream_request_body
+        class FlowControlHandler(RequestHandler):
+            def initialize(self, test):
+                self.test = test
+                self.method = None
+                self.methods = []
+
+            @contextlib.contextmanager
+            def in_method(self, method):
+                if self.method is not None:
+                    self.test.fail("entered method %s while in %s" %
+                                   (method, self.method))
+                self.method = method
+                self.methods.append(method)
+                try:
+                    yield
+                finally:
+                    self.method = None
+
+            @gen.coroutine
+            def prepare(self):
+                with self.in_method('prepare'):
+                    yield gen.Task(IOLoop.current().add_callback)
+
+            @gen.coroutine
+            def data_received(self, data):
+                with self.in_method('data_received'):
+                    yield gen.Task(IOLoop.current().add_callback)
+
+            @gen.coroutine
+            def post(self):
+                with self.in_method('post'):
+                    yield gen.Task(IOLoop.current().add_callback)
+                self.write(dict(methods=self.methods))
+
+        return [('/', FlowControlHandler, dict(test=self))]
+
+    def get_httpserver_options(self):
+        # Use a small chunk size so flow control is relevant even though
+        # all the data arrives at once.
+        return dict(chunk_size=10)
+
+    def test_flow_control(self):
+        response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
+                              method='POST')
+        response.rethrow()
+        self.assertEqual(json_decode(response.body),
+                         dict(methods=['prepare', 'data_received',
+                                       'data_received', 'data_received',
+                                       'post']))
index 60b854f5fe9ac9473bf79e0a54ca24cbc672d1b1..c318f33c0c70cfede40f05778fa43eb5ddc9e73b 100644 (file)
@@ -129,6 +129,7 @@ class RequestHandler(object):
         self._finished = False
         self._auto_finish = True
         self._transforms = None  # will be set in _execute
+        self._prepared_future = None
         self.path_args = None
         self.path_kwargs = None
         self.ui = ObjectDict((n, self._ui_method(m)) for n, m in
@@ -1216,6 +1217,10 @@ class RequestHandler(object):
                 result = yield result
             if result is not None:
                 raise TypeError("Expected None, got %r" % result)
+            if self._prepared_future is not None:
+                # Tell the Application we've finished with prepare()
+                # and are ready for the body to arrive.
+                self._prepared_future.set_result(None)
             if self._finished:
                 return
 
@@ -1236,6 +1241,12 @@ class RequestHandler(object):
                 self.finish()
         except Exception as e:
             self._handle_request_exception(e)
+            if (self._prepared_future is not None and
+                not self._prepared_future.done()):
+                # In case we failed before setting _prepared_future, do it
+                # now (to unblock the HTTP server).  Note that this is not
+                # in a finally block to avoid GC issues prior to Python 3.4.
+                self._prepared_future.set_result(None)
 
     def _log(self):
         """Logs the current request.
@@ -1386,6 +1397,9 @@ def stream_request_body(cls):
     * The subclass must define a method ``data_received(self, data):``, which
       will be called zero or more times as data is available.  Note that
       if the request has an empty body, ``data_received`` may not be called.
+    * ``prepare`` and ``data_received`` may return Futures (such as via
+      ``@gen.coroutine``, in which case the next method will not be called
+      until those futures have completed.
     * The regular HTTP method (``post``, ``put``, etc) will be called after
       the entire body has been read.
 
@@ -1703,7 +1717,7 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate):
             connection=self.connection, start_line=start_line, headers=headers))
         if self.stream_request_body:
             self.request.body = Future()
-            self.execute()
+            return self.execute()
 
     def set_request(self, request):
         self.request = request
@@ -1747,7 +1761,7 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate):
 
     def data_received(self, data):
         if self.stream_request_body:
-            self.handler.data_received(data)
+            return self.handler.data_received(data)
         else:
             self.chunks.append(data)
 
@@ -1773,12 +1787,20 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate):
         self.handler = self.handler_class(self.application, self.request,
                                      **self.handler_kwargs)
         transforms = [t(self.request) for t in self.application.transforms]
+
+        if self.stream_request_body:
+            self.handler._prepared_future = Future()
         # Note that if an exception escapes handler._execute it will be
         # trapped in the Future it returns (which we are ignoring here).
         # However, that shouldn't happen because _execute has a blanket
         # except handler, and we cannot easily access the IOLoop here to
         # call add_future.
         self.handler._execute(transforms, *self.path_args, **self.path_kwargs)
+        # If we are streaming the request body, then execute() is finished
+        # when the handler has prepared to receive the body.  If not,
+        # it doesn't matter when execute() finishes (so we return None)
+        return self.handler._prepared_future
+
 
 
 class HTTPError(Exception):