@gen.coroutine
def _read_message(self, delegate):
+ need_delegate_close = False
try:
header_future = self.stream.read_until_regex(
b"\r?\n\r?\n",
self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
+ need_delegate_close = True
header_future = delegate.headers_received(start_line, headers)
if header_future is not None:
yield header_future
if self.stream is None:
# We've been detached.
+ need_delegate_close = False
raise gen.Return(False)
skip_body = False
if self.is_client:
raise gen.Return(False)
self._read_finished = True
if not self._write_finished or self.is_client:
+ need_delegate_close = False
delegate.finish()
yield self._finish_future
if self.is_client and self._disconnect_on_finish:
self.close()
raise gen.Return(False)
finally:
+ if need_delegate_close:
+ delegate.on_connection_close()
self._clear_callbacks()
raise gen.Return(True)
self.server.request_callback(self.request)
else:
self.delegate.finish()
+ self._cleanup()
+
+ def on_connection_close(self):
+ if self.delegate is None:
+ self._chunks = None
+ else:
+ self.delegate.on_connection_close()
+ self._cleanup()
+
+ def _cleanup(self):
if self.server.xheaders:
self.connection.context._unapply_xheaders()
def finish(self):
pass
+ def on_connection_close(self):
+ pass
+
def url_concat(url, args):
"""Concatenate url and argument dictionary regardless of whether
# the (non-existent) data_received.
raise HTTPError(401)
+ @stream_request_body
+ class CloseDetectionHandler(RequestHandler):
+ def initialize(self, test):
+ self.test = test
+
+ def on_connection_close(self):
+ self.test.close_future.set_result(None)
+
return [('/stream_body', StreamingBodyHandler, dict(test=self)),
- ('/early_return', EarlyReturnHandler)]
+ ('/early_return', EarlyReturnHandler),
+ ('/close_detection', CloseDetectionHandler, dict(test=self))]
def connect(self, url, connection_close):
# Use a raw connection so we can control the sending of data.
data = yield gen.Task(stream.read_until_close)
self.assertTrue(data.startswith(b"HTTP/1.1 401"))
+ @gen_test
+ def test_close_during_upload(self):
+ self.close_future = Future()
+ stream = self.connect(b"/close_detection", connection_close=False)
+ stream.close()
+ yield self.close_future
+
class StreamingRequestFlowControlTest(WebTestCase):
def get_handlers(self):
from tornado import escape
from tornado import gen
from tornado import httputil
+from tornado import iostream
from tornado import locale
from tornado.log import access_log, app_log, gen_log
from tornado import stack_context
may not be called promptly after the end user closes their
connection.
"""
- pass
+ if _has_stream_request_body(self.__class__):
+ if not self.request.body.done():
+ self.request.body.set_exception(iostream.StreamClosedError())
def clear(self):
"""Resets all headers and content for this response."""
# the body has been completely received. The Future has no
# result; the data has been passed to self.data_received
# instead.
- yield self.request.body
+ try:
+ yield self.request.body
+ except iostream.StreamClosedError:
+ return
method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
self.request._parse_body()
self.execute()
+ def on_connection_close(self):
+ if self.stream_request_body:
+ self.handler.on_connection_close()
+ else:
+ self.chunks = None
+
def execute(self):
# If template cache is disabled (usually in the debug mode),
# re-compile templates and reload static files on every