From: Ben Darnell Date: Sun, 20 Apr 2014 17:58:49 +0000 (-0400) Subject: Make HTTP1Connection single-use-only. X-Git-Tag: v4.0.0b1~91^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=292e72d5e5a4558c13a292446588ebc55918cc31;p=thirdparty%2Ftornado.git Make HTTP1Connection single-use-only. The server loop is now on a separate HTTP1ServerConnection class. --- diff --git a/tornado/http1connection.py b/tornado/http1connection.py index ded8724c9..c94598b36 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -27,6 +27,18 @@ from tornado.log import gen_log, app_log from tornado import stack_context from tornado.util import GzipDecompressor +class HTTP1ConnectionParameters(object): + def __init__(self, no_keep_alive=False, protocol=None, chunk_size=None, + max_header_size=None, header_timeout=None, max_body_size=None, + body_timeout=None, use_gzip=False): + self.no_keep_alive = no_keep_alive + self.protocol = protocol + self.chunk_size = chunk_size or 65536 + self.max_header_size = max_header_size or 65536 + self.header_timeout = header_timeout + self.max_body_size = max_body_size + self.body_timeout = body_timeout + self.use_gzip = use_gzip class HTTP1Connection(object): """Handles a connection to an HTTP client, executing HTTP requests. @@ -34,18 +46,21 @@ class HTTP1Connection(object): We parse HTTP headers and bodies, and execute the request callback until the HTTP conection is closed. """ - def __init__(self, stream, address, is_client, - no_keep_alive=False, protocol=None, chunk_size=None, - max_header_size=None, header_timeout=None, - max_body_size=None, body_timeout=None): + def __init__(self, stream, address, is_client, params=None, method=None): self.is_client = is_client self.stream = stream self.address = address + if params is None: + params = HTTP1ConnectionParameters() + self.params = params + self.no_keep_alive = params.no_keep_alive # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed # and its socket attribute replaced with None. - self.address_family = stream.socket.family - self.no_keep_alive = no_keep_alive + if stream.socket is not None: + self.address_family = stream.socket.family + else: + self.address_family = None # In HTTPServerRequest we want an IP, not a full socket address. if (self.address_family in (socket.AF_INET, socket.AF_INET6) and address is not None): @@ -53,19 +68,18 @@ class HTTP1Connection(object): else: # Unix (or other) socket; fake the remote address. self.remote_ip = '0.0.0.0' - if protocol: - self.protocol = protocol + if self.params.protocol: + self.protocol = self.params.protocol elif isinstance(stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" - self._chunk_size = chunk_size or 65536 - self._max_header_size = max_header_size or 65536 - self._header_timeout = header_timeout - self._default_max_body_size = (max_body_size or - self.stream.max_buffer_size) - self._default_body_timeout = body_timeout + self._max_body_size = (self.params.max_body_size or + self.stream.max_buffer_size) + self._body_timeout = self.params.body_timeout + self._method = method self._disconnect_on_finish = False + self._request_finished = False self._clear_request_state() self.stream.set_close_callback(self._on_connection_close) self._finish_future = None @@ -75,61 +89,24 @@ class HTTP1Connection(object): # True if we have read HTTP headers but have not yet read the # corresponding body. self._reading = False - # This is set in _read_message. It's ugly to have this here, - # but we need to be able to reset the delegate in finish() to divert - # remaining input data to a null delegate when the request is aborted. - self.message_delegate = None - def start_serving(self, delegate, gzip=False): - assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) - # Register the future on the IOLoop so its errors get logged. - self.stream.io_loop.add_future( - self._server_request_loop(delegate, gzip=gzip), - lambda f: f.result()) + def read_response(self, delegate): + if self.params.use_gzip: + delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) + return self._read_message(delegate) @gen.coroutine - def _server_request_loop(self, delegate, gzip=False): - while True: - request_delegate = delegate.start_request(self) - if gzip: - request_delegate = _GzipMessageDelegate(request_delegate, - self._chunk_size) - try: - ret = yield self._read_message(request_delegate) - except iostream.StreamClosedError: - self.close() - return - except Exception: - # TODO: this is probably too broad; it would be better to - # wrap all delegate calls in something that writes to app_log, - # and then errors that reach this point can be gen_log. - app_log.error("Uncaught exception", exc_info=True) - self.close() - return - if not ret: - return - - def read_response(self, delegate, method, use_gzip=False): - if use_gzip: - delegate = _GzipMessageDelegate(delegate, self._chunk_size) - return self._read_message(delegate, method=method) - - @gen.coroutine - def _read_message(self, delegate, method=None): - assert isinstance(delegate, httputil.HTTPMessageDelegate) - self.message_delegate = delegate - self._max_body_size = self._default_max_body_size - self._body_timeout = self._default_body_timeout + def _read_message(self, delegate): try: header_future = self.stream.read_until_regex( b"\r?\n\r?\n", - max_bytes=self._max_header_size) - if self._header_timeout is None: + max_bytes=self.params.max_header_size) + if self.params.header_timeout is None: header_data = yield header_future else: try: header_data = yield gen.with_timeout( - self.stream.io_loop.time() + self._header_timeout, + self.stream.io_loop.time() + self.params.header_timeout, header_future, io_loop=self.stream.io_loop) except gen.TimeoutError: @@ -148,7 +125,7 @@ class HTTP1Connection(object): self._disconnect_on_finish = not self._can_keep_alive( start_line, headers) - header_future = self.message_delegate.headers_received( + header_future = delegate.headers_received( start_line, headers) if header_future is not None: yield header_future @@ -158,7 +135,7 @@ class HTTP1Connection(object): raise gen.Return(False) skip_body = False if self.is_client: - if method == 'HEAD': + if self._method == 'HEAD': skip_body = True code = start_line.code if code == 304: @@ -166,13 +143,12 @@ class HTTP1Connection(object): if code >= 100 and code < 200: # TODO: client delegates will get headers_received twice # in the case of a 100-continue. Document or change? - yield self._read_message(self.message_delegate, - method=method) + yield self._read_message(delegate) else: if headers.get("Expect") == "100-continue": self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") if not skip_body: - body_future = self._read_body(headers) + body_future = self._read_body(headers, delegate) if body_future is not None: if self._body_timeout is None: yield body_future @@ -187,7 +163,8 @@ class HTTP1Connection(object): self.stream.close() raise gen.Return(False) self._reading = False - self.message_delegate.finish() + if not self._request_finished or self.is_client: + delegate.finish() yield self._finish_future if self.stream is None: raise gen.Return(False) @@ -198,7 +175,6 @@ class HTTP1Connection(object): raise gen.Return(False) raise gen.Return(True) - def _clear_request_state(self): """Clears the per-request state. @@ -207,7 +183,6 @@ class HTTP1Connection(object): and when the connection is closed (to break up cycles and facilitate garbage collection in cpython). """ - self._request_finished = False self._write_callback = None self._close_callback = None @@ -332,12 +307,11 @@ class HTTP1Connection(object): self._chunking = False self._request_finished = True # If the app finished the request while we're still reading, - # divert any remaining input to a null delegate and close the - # connection when we're done sending our response. Closing - # the connection is the only way to avoid reading the whole - # input body. + # divert any remaining data away from the delegate and + # close the connection when we're done sending our response. + # Closing the connection is the only way to avoid reading the + # whole input body. if self._reading: - self.message_delegate = httputil.HTTPMessageDelegate() self._disconnect_on_finish = True # No more data is coming, so instruct TCP to send any remaining # data immediately instead of waiting for a full packet or ack. @@ -361,7 +335,7 @@ class HTTP1Connection(object): self._finish_request() def _can_keep_alive(self, start_line, headers): - if self.no_keep_alive: + if self.params.no_keep_alive: return False connection_header = headers.get("Connection") if connection_header is not None: @@ -381,7 +355,7 @@ class HTTP1Connection(object): # Turn Nagle's algorithm back on, leaving the stream in its # default state for the next request. self.stream.set_nodelay(False) - if self._finish_future is not None: + if self._finish_future is not None and not self._finish_future.done(): self._finish_future.set_result(None) def _parse_headers(self, data): @@ -396,29 +370,30 @@ class HTTP1Connection(object): data[eol:100]) return start_line, headers - def _read_body(self, headers): + def _read_body(self, headers, delegate): content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self._max_body_size: raise httputil.HTTPInputException("Content-Length too long") - return self._read_fixed_body(content_length) + return self._read_fixed_body(content_length, delegate) if headers.get("Transfer-Encoding") == "chunked": - return self._read_chunked_body() + return self._read_chunked_body(delegate) if self.is_client: - return self._read_body_until_close() + return self._read_body_until_close(delegate) return None @gen.coroutine - def _read_fixed_body(self, content_length): + def _read_fixed_body(self, content_length, delegate): while content_length > 0: body = yield self.stream.read_bytes( - min(self._chunk_size, content_length), partial=True) + min(self.params.chunk_size, content_length), partial=True) content_length -= len(body) - yield gen.maybe_future(self.message_delegate.data_received(body)) + if not self._request_finished or self.is_client: + yield gen.maybe_future(delegate.data_received(body)) @gen.coroutine - def _read_chunked_body(self): + def _read_chunked_body(self, delegate): # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 total_size = 0 while True: @@ -432,18 +407,20 @@ class HTTP1Connection(object): bytes_to_read = chunk_len while bytes_to_read: chunk = yield self.stream.read_bytes( - min(bytes_to_read, self._chunk_size), partial=True) + min(bytes_to_read, self.params.chunk_size), partial=True) bytes_to_read -= len(chunk) - yield gen.maybe_future( - self.message_delegate.data_received(chunk)) + if not self._request_finished or self.is_client: + yield gen.maybe_future( + delegate.data_received(chunk)) # chunk ends with \r\n crlf = yield self.stream.read_bytes(2) assert crlf == b"\r\n" @gen.coroutine - def _read_body_until_close(self): + def _read_body_until_close(self, delegate): body = yield self.stream.read_until_close() - self.message_delegate.data_received(body) + if not self._request_finished or self.is_client: + delegate.data_received(body) class _GzipMessageDelegate(httputil.HTTPMessageDelegate): @@ -490,3 +467,41 @@ class _GzipMessageDelegate(httputil.HTTPMessageDelegate): # anything, treat it as an extra chunk self._delegate.data_received(tail) return self._delegate.finish() + + +class HTTP1ServerConnection(object): + def __init__(self, stream, address, params=None): + self.stream = stream + self.address = address + if params is None: + params = HTTP1ConnectionParameters() + self.params = params + + def start_serving(self, delegate): + assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) + # Register the future on the IOLoop so its errors get logged. + self.stream.io_loop.add_future( + self._server_request_loop(delegate), + lambda f: f.result()) + + @gen.coroutine + def _server_request_loop(self, delegate): + while True: + conn = HTTP1Connection(self.stream, self.address, is_client=False, + params=self.params) + request_delegate = delegate.start_request(conn) + try: + ret = yield conn.read_response(request_delegate) + conn._clear_request_state() + except iostream.StreamClosedError: + conn.close() + return + except Exception: + # TODO: this is probably too broad; it would be better to + # wrap all delegate calls in something that writes to app_log, + # and then errors that reach this point can be gen_log. + app_log.error("Uncaught exception", exc_info=True) + conn.close() + return + if not ret: + return diff --git a/tornado/httpserver.py b/tornado/httpserver.py index 556b891e1..036c15800 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -28,7 +28,7 @@ class except to start a server at the beginning of the process from __future__ import absolute_import, division, print_function, with_statement -from tornado.http1connection import HTTP1Connection +from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters from tornado import httputil from tornado import netutil from tornado.tcpserver import TCPServer @@ -140,28 +140,23 @@ class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate): self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders - self.protocol = protocol - self.gzip = gzip - self.chunk_size = chunk_size - self.max_header_size = max_header_size - self.idle_connection_timeout = idle_connection_timeout or 3600 - self.body_timeout = body_timeout - self.max_body_size = max_body_size + self.conn_params = HTTP1ConnectionParameters( + protocol=protocol, + use_gzip=gzip, + chunk_size=chunk_size, + max_header_size=max_header_size, + header_timeout=idle_connection_timeout or 3600, + max_body_size=max_body_size, + body_timeout=body_timeout) TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options, max_buffer_size=max_buffer_size, read_chunk_size=chunk_size) def handle_stream(self, stream, address): - conn = HTTP1Connection( - stream, address=address, is_client=False, - no_keep_alive=self.no_keep_alive, - protocol=self.protocol, - chunk_size=self.chunk_size, - max_header_size=self.max_header_size, - header_timeout=self.idle_connection_timeout, - max_body_size=self.max_body_size, - body_timeout=self.body_timeout) - conn.start_serving(self, gzip=self.gzip) + conn = HTTP1ServerConnection( + stream, address=address, + params=self.conn_params) + conn.start_serving(self) def start_request(self, connection): return _ServerRequestAdapter(self, connection) diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index bb59b30f7..e5248d53a 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -5,7 +5,7 @@ from tornado.concurrent import is_future from tornado.escape import utf8, _unicode from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy from tornado import httputil -from tornado.http1connection import HTTP1Connection +from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters from tornado.iostream import IOStream, SSLIOStream, StreamClosedError from tornado.netutil import Resolver, OverrideResolver from tornado.log import gen_log @@ -334,8 +334,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self.stream.set_nodelay(True) self.connection = HTTP1Connection( self.stream, self._sockaddr, is_client=True, - no_keep_alive=True, protocol=self.parsed.scheme, - max_header_size=self.max_header_size) + params=HTTP1ConnectionParameters( + no_keep_alive=True, protocol=self.parsed.scheme, + max_header_size=self.max_header_size, + use_gzip=self.request.use_gzip), + method=self.request.method) start_line = httputil.RequestStartLine(self.request.method, req_path, 'HTTP/1.1') self.connection.write_headers( @@ -361,8 +364,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): # Ensure that any exception raised in read_response ends up in our # stack context. self.io_loop.add_future( - self.connection.read_response(self, method=self.request.method, - use_gzip=self.request.use_gzip), + self.connection.read_response(self), lambda f: f.result()) def _release(self): diff --git a/tornado/test/httpserver_test.py b/tornado/test/httpserver_test.py index 0c46fb358..0e8fb36be 100644 --- a/tornado/test/httpserver_test.py +++ b/tornado/test/httpserver_test.py @@ -42,7 +42,7 @@ def read_stream_body(stream, callback): def finish(self): callback(b''.join(chunks)) conn = HTTP1Connection(stream, None, is_client=True) - conn.read_response(Delegate(), method='GET') + conn.read_response(Delegate()) class HandlerBaseTestCase(AsyncHTTPTestCase):