self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
- self.message_delegate.headers_received(start_line, headers)
+ header_future = self.message_delegate.headers_received(
+ start_line, headers)
+ if header_future is not None:
+ yield header_future
if self.stream is None:
# We've been detached.
# TODO: where else do we need to check for detach?
connection=self.connection, start_line=start_line,
headers=headers)
else:
- self.delegate.headers_received(start_line, headers)
+ return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk):
if self.delegate is None:
self._chunks.append(chunk)
else:
- self.delegate.data_received(chunk)
+ return self.delegate.data_received(chunk)
def finish(self):
if self.delegate is None:
from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body
import binascii
+import contextlib
import datetime
import email.utils
import logging
stream.write(b"4\r\nasdf\r\n")
data = yield gen.Task(stream.read_until_close)
self.assertTrue(data.startswith(b"HTTP/1.1 401"))
+
+
+class StreamingRequestFlowControlTest(WebTestCase):
+ def get_handlers(self):
+ from tornado.ioloop import IOLoop
+
+ # Each method in this handler returns a Future and yields to the
+ # IOLoop so the future is not immediately ready. Ensure that the
+ # Futures are respected and no method is called before the previous
+ # one has completed.
+ @stream_request_body
+ class FlowControlHandler(RequestHandler):
+ def initialize(self, test):
+ self.test = test
+ self.method = None
+ self.methods = []
+
+ @contextlib.contextmanager
+ def in_method(self, method):
+ if self.method is not None:
+ self.test.fail("entered method %s while in %s" %
+ (method, self.method))
+ self.method = method
+ self.methods.append(method)
+ try:
+ yield
+ finally:
+ self.method = None
+
+ @gen.coroutine
+ def prepare(self):
+ with self.in_method('prepare'):
+ yield gen.Task(IOLoop.current().add_callback)
+
+ @gen.coroutine
+ def data_received(self, data):
+ with self.in_method('data_received'):
+ yield gen.Task(IOLoop.current().add_callback)
+
+ @gen.coroutine
+ def post(self):
+ with self.in_method('post'):
+ yield gen.Task(IOLoop.current().add_callback)
+ self.write(dict(methods=self.methods))
+
+ return [('/', FlowControlHandler, dict(test=self))]
+
+ def get_httpserver_options(self):
+ # Use a small chunk size so flow control is relevant even though
+ # all the data arrives at once.
+ return dict(chunk_size=10)
+
+ def test_flow_control(self):
+ response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
+ method='POST')
+ response.rethrow()
+ self.assertEqual(json_decode(response.body),
+ dict(methods=['prepare', 'data_received',
+ 'data_received', 'data_received',
+ 'post']))
self._finished = False
self._auto_finish = True
self._transforms = None # will be set in _execute
+ self._prepared_future = None
self.path_args = None
self.path_kwargs = None
self.ui = ObjectDict((n, self._ui_method(m)) for n, m in
result = yield result
if result is not None:
raise TypeError("Expected None, got %r" % result)
+ if self._prepared_future is not None:
+ # Tell the Application we've finished with prepare()
+ # and are ready for the body to arrive.
+ self._prepared_future.set_result(None)
if self._finished:
return
self.finish()
except Exception as e:
self._handle_request_exception(e)
+ if (self._prepared_future is not None and
+ not self._prepared_future.done()):
+ # In case we failed before setting _prepared_future, do it
+ # now (to unblock the HTTP server). Note that this is not
+ # in a finally block to avoid GC issues prior to Python 3.4.
+ self._prepared_future.set_result(None)
def _log(self):
"""Logs the current request.
* The subclass must define a method ``data_received(self, data):``, which
will be called zero or more times as data is available. Note that
if the request has an empty body, ``data_received`` may not be called.
+ * ``prepare`` and ``data_received`` may return Futures (such as via
+ ``@gen.coroutine``, in which case the next method will not be called
+ until those futures have completed.
* The regular HTTP method (``post``, ``put``, etc) will be called after
the entire body has been read.
connection=self.connection, start_line=start_line, headers=headers))
if self.stream_request_body:
self.request.body = Future()
- self.execute()
+ return self.execute()
def set_request(self, request):
self.request = request
def data_received(self, data):
if self.stream_request_body:
- self.handler.data_received(data)
+ return self.handler.data_received(data)
else:
self.chunks.append(data)
self.handler = self.handler_class(self.application, self.request,
**self.handler_kwargs)
transforms = [t(self.request) for t in self.application.transforms]
+
+ if self.stream_request_body:
+ self.handler._prepared_future = Future()
# Note that if an exception escapes handler._execute it will be
# trapped in the Future it returns (which we are ignoring here).
# However, that shouldn't happen because _execute has a blanket
# except handler, and we cannot easily access the IOLoop here to
# call add_future.
self.handler._execute(transforms, *self.path_args, **self.path_kwargs)
+ # If we are streaming the request body, then execute() is finished
+ # when the handler has prepared to receive the body. If not,
+ # it doesn't matter when execute() finishes (so we return None)
+ return self.handler._prepared_future
+
class HTTPError(Exception):