]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Make HTTP1Connection single-use-only.
authorBen Darnell <ben@bendarnell.com>
Sun, 20 Apr 2014 17:58:49 +0000 (13:58 -0400)
committerBen Darnell <ben@bendarnell.com>
Sun, 20 Apr 2014 18:53:00 +0000 (14:53 -0400)
The server loop is now on a separate HTTP1ServerConnection class.

tornado/http1connection.py
tornado/httpserver.py
tornado/simple_httpclient.py
tornado/test/httpserver_test.py

index ded8724c9511c5bbe745d28989262ae4447b1734..c94598b36b6d4cefe9ecb40504891a3dc981fd06 100644 (file)
@@ -27,6 +27,18 @@ from tornado.log import gen_log, app_log
 from tornado import stack_context
 from tornado.util import GzipDecompressor
 
+class HTTP1ConnectionParameters(object):
+    def __init__(self, no_keep_alive=False, protocol=None, chunk_size=None,
+                 max_header_size=None, header_timeout=None, max_body_size=None,
+                 body_timeout=None, use_gzip=False):
+        self.no_keep_alive = no_keep_alive
+        self.protocol = protocol
+        self.chunk_size = chunk_size or 65536
+        self.max_header_size = max_header_size or 65536
+        self.header_timeout = header_timeout
+        self.max_body_size = max_body_size
+        self.body_timeout = body_timeout
+        self.use_gzip = use_gzip
 
 class HTTP1Connection(object):
     """Handles a connection to an HTTP client, executing HTTP requests.
@@ -34,18 +46,21 @@ class HTTP1Connection(object):
     We parse HTTP headers and bodies, and execute the request callback
     until the HTTP conection is closed.
     """
-    def __init__(self, stream, address, is_client,
-                 no_keep_alive=False, protocol=None, chunk_size=None,
-                 max_header_size=None, header_timeout=None,
-                 max_body_size=None, body_timeout=None):
+    def __init__(self, stream, address, is_client, params=None, method=None):
         self.is_client = is_client
         self.stream = stream
         self.address = address
+        if params is None:
+            params = HTTP1ConnectionParameters()
+        self.params = params
+        self.no_keep_alive = params.no_keep_alive
         # Save the socket's address family now so we know how to
         # interpret self.address even after the stream is closed
         # and its socket attribute replaced with None.
-        self.address_family = stream.socket.family
-        self.no_keep_alive = no_keep_alive
+        if stream.socket is not None:
+            self.address_family = stream.socket.family
+        else:
+            self.address_family = None
         # In HTTPServerRequest we want an IP, not a full socket address.
         if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
             address is not None):
@@ -53,19 +68,18 @@ class HTTP1Connection(object):
         else:
             # Unix (or other) socket; fake the remote address.
             self.remote_ip = '0.0.0.0'
-        if protocol:
-            self.protocol = protocol
+        if self.params.protocol:
+            self.protocol = self.params.protocol
         elif isinstance(stream, iostream.SSLIOStream):
             self.protocol = "https"
         else:
             self.protocol = "http"
-        self._chunk_size = chunk_size or 65536
-        self._max_header_size = max_header_size or 65536
-        self._header_timeout = header_timeout
-        self._default_max_body_size = (max_body_size or
-                                       self.stream.max_buffer_size)
-        self._default_body_timeout = body_timeout
+        self._max_body_size = (self.params.max_body_size or
+                               self.stream.max_buffer_size)
+        self._body_timeout = self.params.body_timeout
+        self._method = method
         self._disconnect_on_finish = False
+        self._request_finished = False
         self._clear_request_state()
         self.stream.set_close_callback(self._on_connection_close)
         self._finish_future = None
@@ -75,61 +89,24 @@ class HTTP1Connection(object):
         # True if we have read HTTP headers but have not yet read the
         # corresponding body.
         self._reading = False
-        # This is set in _read_message.  It's ugly to have this here,
-        # but we need to be able to reset the delegate in finish() to divert
-        # remaining input data to a null delegate when the request is aborted.
-        self.message_delegate = None
 
-    def start_serving(self, delegate, gzip=False):
-        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
-        # Register the future on the IOLoop so its errors get logged.
-        self.stream.io_loop.add_future(
-            self._server_request_loop(delegate, gzip=gzip),
-            lambda f: f.result())
+    def read_response(self, delegate):
+        if self.params.use_gzip:
+            delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
+        return self._read_message(delegate)
 
     @gen.coroutine
-    def _server_request_loop(self, delegate, gzip=False):
-        while True:
-            request_delegate = delegate.start_request(self)
-            if gzip:
-                request_delegate = _GzipMessageDelegate(request_delegate,
-                                                        self._chunk_size)
-            try:
-                ret = yield self._read_message(request_delegate)
-            except iostream.StreamClosedError:
-                self.close()
-                return
-            except Exception:
-                # TODO: this is probably too broad; it would be better to
-                # wrap all delegate calls in something that writes to app_log,
-                # and then errors that reach this point can be gen_log.
-                app_log.error("Uncaught exception", exc_info=True)
-                self.close()
-                return
-            if not ret:
-                return
-
-    def read_response(self, delegate, method, use_gzip=False):
-        if use_gzip:
-            delegate = _GzipMessageDelegate(delegate, self._chunk_size)
-        return self._read_message(delegate, method=method)
-
-    @gen.coroutine
-    def _read_message(self, delegate, method=None):
-        assert isinstance(delegate, httputil.HTTPMessageDelegate)
-        self.message_delegate = delegate
-        self._max_body_size = self._default_max_body_size
-        self._body_timeout = self._default_body_timeout
+    def _read_message(self, delegate):
         try:
             header_future = self.stream.read_until_regex(
                         b"\r?\n\r?\n",
-                        max_bytes=self._max_header_size)
-            if self._header_timeout is None:
+                        max_bytes=self.params.max_header_size)
+            if self.params.header_timeout is None:
                 header_data = yield header_future
             else:
                 try:
                     header_data = yield gen.with_timeout(
-                        self.stream.io_loop.time() + self._header_timeout,
+                        self.stream.io_loop.time() + self.params.header_timeout,
                         header_future,
                         io_loop=self.stream.io_loop)
                 except gen.TimeoutError:
@@ -148,7 +125,7 @@ class HTTP1Connection(object):
 
             self._disconnect_on_finish = not self._can_keep_alive(
                 start_line, headers)
-            header_future = self.message_delegate.headers_received(
+            header_future = delegate.headers_received(
                 start_line, headers)
             if header_future is not None:
                 yield header_future
@@ -158,7 +135,7 @@ class HTTP1Connection(object):
                 raise gen.Return(False)
             skip_body = False
             if self.is_client:
-                if method == 'HEAD':
+                if self._method == 'HEAD':
                     skip_body = True
                 code = start_line.code
                 if code == 304:
@@ -166,13 +143,12 @@ class HTTP1Connection(object):
                 if code >= 100 and code < 200:
                     # TODO: client delegates will get headers_received twice
                     # in the case of a 100-continue.  Document or change?
-                    yield self._read_message(self.message_delegate,
-                                             method=method)
+                    yield self._read_message(delegate)
             else:
                 if headers.get("Expect") == "100-continue":
                     self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
             if not skip_body:
-                body_future = self._read_body(headers)
+                body_future = self._read_body(headers, delegate)
                 if body_future is not None:
                     if self._body_timeout is None:
                         yield body_future
@@ -187,7 +163,8 @@ class HTTP1Connection(object):
                             self.stream.close()
                             raise gen.Return(False)
             self._reading = False
-            self.message_delegate.finish()
+            if not self._request_finished or self.is_client:
+                delegate.finish()
             yield self._finish_future
             if self.stream is None:
                 raise gen.Return(False)
@@ -198,7 +175,6 @@ class HTTP1Connection(object):
             raise gen.Return(False)
         raise gen.Return(True)
 
-
     def _clear_request_state(self):
         """Clears the per-request state.
 
@@ -207,7 +183,6 @@ class HTTP1Connection(object):
         and when the connection is closed (to break up cycles and
         facilitate garbage collection in cpython).
         """
-        self._request_finished = False
         self._write_callback = None
         self._close_callback = None
 
@@ -332,12 +307,11 @@ class HTTP1Connection(object):
             self._chunking = False
         self._request_finished = True
         # If the app finished the request while we're still reading,
-        # divert any remaining input to a null delegate and close the
-        # connection when we're done sending our response.  Closing
-        # the connection is the only way to avoid reading the whole
-        # input body.
+        # divert any remaining data away from the delegate and
+        # close the connection when we're done sending our response.
+        # Closing the connection is the only way to avoid reading the
+        # whole input body.
         if self._reading:
-            self.message_delegate = httputil.HTTPMessageDelegate()
             self._disconnect_on_finish = True
         # No more data is coming, so instruct TCP to send any remaining
         # data immediately instead of waiting for a full packet or ack.
@@ -361,7 +335,7 @@ class HTTP1Connection(object):
             self._finish_request()
 
     def _can_keep_alive(self, start_line, headers):
-        if self.no_keep_alive:
+        if self.params.no_keep_alive:
             return False
         connection_header = headers.get("Connection")
         if connection_header is not None:
@@ -381,7 +355,7 @@ class HTTP1Connection(object):
         # Turn Nagle's algorithm back on, leaving the stream in its
         # default state for the next request.
         self.stream.set_nodelay(False)
-        if self._finish_future is not None:
+        if self._finish_future is not None and not self._finish_future.done():
             self._finish_future.set_result(None)
 
     def _parse_headers(self, data):
@@ -396,29 +370,30 @@ class HTTP1Connection(object):
                                               data[eol:100])
         return start_line, headers
 
-    def _read_body(self, headers):
+    def _read_body(self, headers, delegate):
         content_length = headers.get("Content-Length")
         if content_length:
             content_length = int(content_length)
             if content_length > self._max_body_size:
                 raise httputil.HTTPInputException("Content-Length too long")
-            return self._read_fixed_body(content_length)
+            return self._read_fixed_body(content_length, delegate)
         if headers.get("Transfer-Encoding") == "chunked":
-            return self._read_chunked_body()
+            return self._read_chunked_body(delegate)
         if self.is_client:
-            return self._read_body_until_close()
+            return self._read_body_until_close(delegate)
         return None
 
     @gen.coroutine
-    def _read_fixed_body(self, content_length):
+    def _read_fixed_body(self, content_length, delegate):
         while content_length > 0:
             body = yield self.stream.read_bytes(
-                min(self._chunk_size, content_length), partial=True)
+                min(self.params.chunk_size, content_length), partial=True)
             content_length -= len(body)
-            yield gen.maybe_future(self.message_delegate.data_received(body))
+            if not self._request_finished or self.is_client:
+                yield gen.maybe_future(delegate.data_received(body))
 
     @gen.coroutine
-    def _read_chunked_body(self):
+    def _read_chunked_body(self, delegate):
         # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
         total_size = 0
         while True:
@@ -432,18 +407,20 @@ class HTTP1Connection(object):
             bytes_to_read = chunk_len
             while bytes_to_read:
                 chunk = yield self.stream.read_bytes(
-                    min(bytes_to_read, self._chunk_size), partial=True)
+                    min(bytes_to_read, self.params.chunk_size), partial=True)
                 bytes_to_read -= len(chunk)
-                yield gen.maybe_future(
-                    self.message_delegate.data_received(chunk))
+                if not self._request_finished or self.is_client:
+                    yield gen.maybe_future(
+                        delegate.data_received(chunk))
             # chunk ends with \r\n
             crlf = yield self.stream.read_bytes(2)
             assert crlf == b"\r\n"
 
     @gen.coroutine
-    def _read_body_until_close(self):
+    def _read_body_until_close(self, delegate):
         body = yield self.stream.read_until_close()
-        self.message_delegate.data_received(body)
+        if not self._request_finished or self.is_client:
+            delegate.data_received(body)
 
 
 class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
@@ -490,3 +467,41 @@ class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
                 # anything, treat it as an extra chunk
                 self._delegate.data_received(tail)
         return self._delegate.finish()
+
+
+class HTTP1ServerConnection(object):
+    def __init__(self, stream, address, params=None):
+        self.stream = stream
+        self.address = address
+        if params is None:
+            params = HTTP1ConnectionParameters()
+        self.params = params
+
+    def start_serving(self, delegate):
+        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
+        # Register the future on the IOLoop so its errors get logged.
+        self.stream.io_loop.add_future(
+            self._server_request_loop(delegate),
+            lambda f: f.result())
+
+    @gen.coroutine
+    def _server_request_loop(self, delegate):
+        while True:
+            conn = HTTP1Connection(self.stream, self.address, is_client=False,
+                                   params=self.params)
+            request_delegate = delegate.start_request(conn)
+            try:
+                ret = yield conn.read_response(request_delegate)
+                conn._clear_request_state()
+            except iostream.StreamClosedError:
+                conn.close()
+                return
+            except Exception:
+                # TODO: this is probably too broad; it would be better to
+                # wrap all delegate calls in something that writes to app_log,
+                # and then errors that reach this point can be gen_log.
+                app_log.error("Uncaught exception", exc_info=True)
+                conn.close()
+                return
+            if not ret:
+                return
index 556b891e1d8692500112ff91884e00ae2fd9ecc0..036c1580034c97f1c00c11bbd4b140b68ebcbb25 100644 (file)
@@ -28,7 +28,7 @@ class except to start a server at the beginning of the process
 
 from __future__ import absolute_import, division, print_function, with_statement
 
-from tornado.http1connection import HTTP1Connection
+from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters
 from tornado import httputil
 from tornado import netutil
 from tornado.tcpserver import TCPServer
@@ -140,28 +140,23 @@ class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate):
         self.request_callback = request_callback
         self.no_keep_alive = no_keep_alive
         self.xheaders = xheaders
-        self.protocol = protocol
-        self.gzip = gzip
-        self.chunk_size = chunk_size
-        self.max_header_size = max_header_size
-        self.idle_connection_timeout = idle_connection_timeout or 3600
-        self.body_timeout = body_timeout
-        self.max_body_size = max_body_size
+        self.conn_params = HTTP1ConnectionParameters(
+            protocol=protocol,
+            use_gzip=gzip,
+            chunk_size=chunk_size,
+            max_header_size=max_header_size,
+            header_timeout=idle_connection_timeout or 3600,
+            max_body_size=max_body_size,
+            body_timeout=body_timeout)
         TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
                            max_buffer_size=max_buffer_size,
                            read_chunk_size=chunk_size)
 
     def handle_stream(self, stream, address):
-        conn = HTTP1Connection(
-            stream, address=address, is_client=False,
-            no_keep_alive=self.no_keep_alive,
-            protocol=self.protocol,
-            chunk_size=self.chunk_size,
-            max_header_size=self.max_header_size,
-            header_timeout=self.idle_connection_timeout,
-            max_body_size=self.max_body_size,
-            body_timeout=self.body_timeout)
-        conn.start_serving(self, gzip=self.gzip)
+        conn = HTTP1ServerConnection(
+            stream, address=address,
+            params=self.conn_params)
+        conn.start_serving(self)
 
     def start_request(self, connection):
         return _ServerRequestAdapter(self, connection)
index bb59b30f717d501af96f89e430e95f03b8507a6b..e5248d53a0d21907319488423491c39bef05766f 100644 (file)
@@ -5,7 +5,7 @@ from tornado.concurrent import is_future
 from tornado.escape import utf8, _unicode
 from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
 from tornado import httputil
-from tornado.http1connection import HTTP1Connection
+from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
 from tornado.iostream import IOStream, SSLIOStream, StreamClosedError
 from tornado.netutil import Resolver, OverrideResolver
 from tornado.log import gen_log
@@ -334,8 +334,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
         self.stream.set_nodelay(True)
         self.connection = HTTP1Connection(
             self.stream, self._sockaddr, is_client=True,
-            no_keep_alive=True, protocol=self.parsed.scheme,
-            max_header_size=self.max_header_size)
+            params=HTTP1ConnectionParameters(
+                no_keep_alive=True, protocol=self.parsed.scheme,
+                max_header_size=self.max_header_size,
+                use_gzip=self.request.use_gzip),
+            method=self.request.method)
         start_line = httputil.RequestStartLine(self.request.method,
                                                req_path, 'HTTP/1.1')
         self.connection.write_headers(
@@ -361,8 +364,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
         # Ensure that any exception raised in read_response ends up in our
         # stack context.
         self.io_loop.add_future(
-            self.connection.read_response(self, method=self.request.method,
-                                          use_gzip=self.request.use_gzip),
+            self.connection.read_response(self),
             lambda f: f.result())
 
     def _release(self):
index 0c46fb358b664fd7627504be56707574749325e6..0e8fb36befe3a01ab4a54734961493681ab82e56 100644 (file)
@@ -42,7 +42,7 @@ def read_stream_body(stream, callback):
         def finish(self):
             callback(b''.join(chunks))
     conn = HTTP1Connection(stream, None, is_client=True)
-    conn.read_response(Delegate(), method='GET')
+    conn.read_response(Delegate())
 
 
 class HandlerBaseTestCase(AsyncHTTPTestCase):