From: Ben Darnell Date: Mon, 24 Mar 2014 02:22:39 +0000 (-0400) Subject: When a streaming-body request finishes early, stop reading the request data. X-Git-Tag: v4.0.0b1~91^2~37 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=42df0361e53e0e40e11e1c50c56e3fa8ad0944be;p=thirdparty%2Ftornado.git When a streaming-body request finishes early, stop reading the request data. delegate.data_received should no longer be called after conn.finish(). --- diff --git a/tornado/http1connection.py b/tornado/http1connection.py index a8c9498e0..745e4772a 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -61,6 +61,13 @@ class HTTP1Connection(object): self._finish_future = None self._version = None self._chunking = None + # True if we have read HTTP headers but have not yet read the + # corresponding body. + self._reading = False + # This is set in _read_message. It's ugly to have this here, + # but we need to be able to reset the delegate in finish() to divert + # remaining input data to a null delegate when the request is aborted. + self.message_delegate = None def start_serving(self, delegate, gzip=False): assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) @@ -91,8 +98,10 @@ class HTTP1Connection(object): @gen.coroutine def _read_message(self, delegate, is_client, method=None): assert isinstance(delegate, httputil.HTTPMessageDelegate) + self.message_delegate = delegate try: header_data = yield self.stream.read_until_regex(b"\r?\n\r?\n") + self._reading = True self._finish_future = Future() start_line, headers = self._parse_headers(header_data) if is_client: @@ -105,7 +114,7 @@ class HTTP1Connection(object): self._disconnect_on_finish = not self._can_keep_alive( start_line, headers) - delegate.headers_received(start_line, headers) + self.message_delegate.headers_received(start_line, headers) if self.stream is None: # We've been detached. # TODO: where else do we need to check for detach? @@ -118,15 +127,19 @@ class HTTP1Connection(object): if code == 304: skip_body = True if code >= 100 and code < 200: - yield self._read_message(delegate, is_client, method=method) + # TODO: client delegates will get headers_received twice + # in the case of a 100-continue. Document or change? + yield self._read_message(self.message_delegate, + is_client, method=method) else: if headers.get("Expect") == "100-continue": self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") if not skip_body: - body_future = self._read_body(is_client, headers, delegate) + body_future = self._read_body(is_client, headers) if body_future is not None: yield body_future - delegate.finish() + self._reading = False + self.message_delegate.finish() yield self._finish_future if self.stream is None: raise gen.Return(False) @@ -230,6 +243,14 @@ class HTTP1Connection(object): self.stream.write(b"0\r\n\r\n", self._on_write_complete) self._chunking = False self._request_finished = True + # If the app finished the request while we're still reading, + # divert any remaining input to a null delegate and close the + # connection when we're done sending our response. Closing + # the connection is the only way to avoid reading the whole + # input body. + if self._reading: + self.message_delegate = httputil.HTTPMessageDelegate() + self._disconnect_on_finish = True # No more data is coming, so instruct TCP to send any remaining # data immediately instead of waiting for a full packet or ack. self.stream.set_nodelay(True) @@ -286,26 +307,26 @@ class HTTP1Connection(object): data[eol:100]) return start_line, headers - def _read_body(self, is_client, headers, delegate): + def _read_body(self, is_client, headers): content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self.stream.max_buffer_size: raise httputil.HTTPMessageException("Content-Length too long") - return self._read_fixed_body(content_length, delegate) + return self._read_fixed_body(content_length) if headers.get("Transfer-Encoding") == "chunked": - return self._read_chunked_body(delegate) + return self._read_chunked_body() if is_client: - return self._read_body_until_close(delegate) + return self._read_body_until_close() return None @gen.coroutine - def _read_fixed_body(self, content_length, delegate): + def _read_fixed_body(self, content_length): body = yield self.stream.read_bytes(content_length) - delegate.data_received(body) + self.message_delegate.data_received(body) @gen.coroutine - def _read_chunked_body(self, delegate): + def _read_chunked_body(self): # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 while True: chunk_len = yield self.stream.read_until(b"\r\n") @@ -315,12 +336,12 @@ class HTTP1Connection(object): # chunk ends with \r\n chunk = yield self.stream.read_bytes(chunk_len + 2) assert chunk[-2:] == b"\r\n" - delegate.data_received(chunk[:-2]) + self.message_delegate.data_received(chunk[:-2]) @gen.coroutine - def _read_body_until_close(self, delegate): + def _read_body_until_close(self): body = yield self.stream.read_until_close() - delegate.data_received(body) + self.message_delegate.data_received(body) class _GzipMessageDelegate(httputil.HTTPMessageDelegate): diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 4dc55f34e..b080ddf56 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -1820,21 +1820,34 @@ class StreamingRequestBodyTest(WebTestCase): self.test.finished.set_result(None) self.write({}) - return [('/', StreamingBodyHandler, dict(test=self))] + @stream_request_body + class EarlyReturnHandler(RequestHandler): + def prepare(self): + # If we finish the response in prepare, it won't continue to + # the (non-existent) data_received. + raise HTTPError(401) - @gen_test - def test_streaming_body(self): - self.prepared = Future() - self.data = Future() - self.finished = Future() + return [('/stream_body', StreamingBodyHandler, dict(test=self)), + ('/early_return', EarlyReturnHandler)] + def connect(self, url, connection_close): # Use a raw connection so we can control the sending of data. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) s.connect(("localhost", self.get_http_port())) stream = IOStream(s, io_loop=self.io_loop) - stream.write(b"GET / HTTP/1.1\r\n") - stream.write(b"Connection: close\r\n") + stream.write(b"GET " + url + b" HTTP/1.1\r\n") + if connection_close: + stream.write(b"Connection: close\r\n") stream.write(b"Transfer-Encoding: chunked\r\n\r\n") + return stream + + @gen_test + def test_streaming_body(self): + self.prepared = Future() + self.data = Future() + self.finished = Future() + + stream = self.connect(b"/stream_body", connection_close=True) yield self.prepared stream.write(b"4\r\nasdf\r\n") # Ensure the first chunk is received before we send the second. @@ -1850,3 +1863,16 @@ class StreamingRequestBodyTest(WebTestCase): # This would ideally use an HTTP1Connection to read the response. self.assertTrue(data.endswith(b"{}")) stream.close() + + @gen_test + def test_early_return(self): + stream = self.connect(b"/early_return", connection_close=False) + data = yield gen.Task(stream.read_until_close) + self.assertTrue(data.startswith(b"HTTP/1.1 401")) + + @gen_test + def test_early_return_with_data(self): + stream = self.connect(b"/early_return", connection_close=False) + stream.write(b"4\r\nasdf\r\n") + data = yield gen.Task(stream.read_until_close) + self.assertTrue(data.startswith(b"HTTP/1.1 401"))