self._finish_future = None
self._version = None
self._chunking = None
+ # True if we have read HTTP headers but have not yet read the
+ # corresponding body.
+ self._reading = False
+ # This is set in _read_message. It's ugly to have this here,
+ # but we need to be able to reset the delegate in finish() to divert
+ # remaining input data to a null delegate when the request is aborted.
+ self.message_delegate = None
def start_serving(self, delegate, gzip=False):
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
@gen.coroutine
def _read_message(self, delegate, is_client, method=None):
assert isinstance(delegate, httputil.HTTPMessageDelegate)
+ self.message_delegate = delegate
try:
header_data = yield self.stream.read_until_regex(b"\r?\n\r?\n")
+ self._reading = True
self._finish_future = Future()
start_line, headers = self._parse_headers(header_data)
if is_client:
self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
- delegate.headers_received(start_line, headers)
+ self.message_delegate.headers_received(start_line, headers)
if self.stream is None:
# We've been detached.
# TODO: where else do we need to check for detach?
if code == 304:
skip_body = True
if code >= 100 and code < 200:
- yield self._read_message(delegate, is_client, method=method)
+ # TODO: client delegates will get headers_received twice
+ # in the case of a 100-continue. Document or change?
+ yield self._read_message(self.message_delegate,
+ is_client, method=method)
else:
if headers.get("Expect") == "100-continue":
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
if not skip_body:
- body_future = self._read_body(is_client, headers, delegate)
+ body_future = self._read_body(is_client, headers)
if body_future is not None:
yield body_future
- delegate.finish()
+ self._reading = False
+ self.message_delegate.finish()
yield self._finish_future
if self.stream is None:
raise gen.Return(False)
self.stream.write(b"0\r\n\r\n", self._on_write_complete)
self._chunking = False
self._request_finished = True
+ # If the app finished the request while we're still reading,
+ # divert any remaining input to a null delegate and close the
+ # connection when we're done sending our response. Closing
+ # the connection is the only way to avoid reading the whole
+ # input body.
+ if self._reading:
+ self.message_delegate = httputil.HTTPMessageDelegate()
+ self._disconnect_on_finish = True
# No more data is coming, so instruct TCP to send any remaining
# data immediately instead of waiting for a full packet or ack.
self.stream.set_nodelay(True)
data[eol:100])
return start_line, headers
- def _read_body(self, is_client, headers, delegate):
+ def _read_body(self, is_client, headers):
content_length = headers.get("Content-Length")
if content_length:
content_length = int(content_length)
if content_length > self.stream.max_buffer_size:
raise httputil.HTTPMessageException("Content-Length too long")
- return self._read_fixed_body(content_length, delegate)
+ return self._read_fixed_body(content_length)
if headers.get("Transfer-Encoding") == "chunked":
- return self._read_chunked_body(delegate)
+ return self._read_chunked_body()
if is_client:
- return self._read_body_until_close(delegate)
+ return self._read_body_until_close()
return None
@gen.coroutine
- def _read_fixed_body(self, content_length, delegate):
+ def _read_fixed_body(self, content_length):
body = yield self.stream.read_bytes(content_length)
- delegate.data_received(body)
+ self.message_delegate.data_received(body)
@gen.coroutine
- def _read_chunked_body(self, delegate):
+ def _read_chunked_body(self):
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
while True:
chunk_len = yield self.stream.read_until(b"\r\n")
# chunk ends with \r\n
chunk = yield self.stream.read_bytes(chunk_len + 2)
assert chunk[-2:] == b"\r\n"
- delegate.data_received(chunk[:-2])
+ self.message_delegate.data_received(chunk[:-2])
@gen.coroutine
- def _read_body_until_close(self, delegate):
+ def _read_body_until_close(self):
body = yield self.stream.read_until_close()
- delegate.data_received(body)
+ self.message_delegate.data_received(body)
class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
self.test.finished.set_result(None)
self.write({})
- return [('/', StreamingBodyHandler, dict(test=self))]
+ @stream_request_body
+ class EarlyReturnHandler(RequestHandler):
+ def prepare(self):
+ # If we finish the response in prepare, it won't continue to
+ # the (non-existent) data_received.
+ raise HTTPError(401)
- @gen_test
- def test_streaming_body(self):
- self.prepared = Future()
- self.data = Future()
- self.finished = Future()
+ return [('/stream_body', StreamingBodyHandler, dict(test=self)),
+ ('/early_return', EarlyReturnHandler)]
+ def connect(self, url, connection_close):
# Use a raw connection so we can control the sending of data.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect(("localhost", self.get_http_port()))
stream = IOStream(s, io_loop=self.io_loop)
- stream.write(b"GET / HTTP/1.1\r\n")
- stream.write(b"Connection: close\r\n")
+ stream.write(b"GET " + url + b" HTTP/1.1\r\n")
+ if connection_close:
+ stream.write(b"Connection: close\r\n")
stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
+ return stream
+
+ @gen_test
+ def test_streaming_body(self):
+ self.prepared = Future()
+ self.data = Future()
+ self.finished = Future()
+
+ stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
+
+ @gen_test
+ def test_early_return(self):
+ stream = self.connect(b"/early_return", connection_close=False)
+ data = yield gen.Task(stream.read_until_close)
+ self.assertTrue(data.startswith(b"HTTP/1.1 401"))
+
+ @gen_test
+ def test_early_return_with_data(self):
+ stream = self.connect(b"/early_return", connection_close=False)
+ stream.write(b"4\r\nasdf\r\n")
+ data = yield gen.Task(stream.read_until_close)
+ self.assertTrue(data.startswith(b"HTTP/1.1 401"))