from tornado import stack_context
from tornado.util import GzipDecompressor
+class HTTP1ConnectionParameters(object):
+ def __init__(self, no_keep_alive=False, protocol=None, chunk_size=None,
+ max_header_size=None, header_timeout=None, max_body_size=None,
+ body_timeout=None, use_gzip=False):
+ self.no_keep_alive = no_keep_alive
+ self.protocol = protocol
+ self.chunk_size = chunk_size or 65536
+ self.max_header_size = max_header_size or 65536
+ self.header_timeout = header_timeout
+ self.max_body_size = max_body_size
+ self.body_timeout = body_timeout
+ self.use_gzip = use_gzip
class HTTP1Connection(object):
"""Handles a connection to an HTTP client, executing HTTP requests.
We parse HTTP headers and bodies, and execute the request callback
until the HTTP conection is closed.
"""
- def __init__(self, stream, address, is_client,
- no_keep_alive=False, protocol=None, chunk_size=None,
- max_header_size=None, header_timeout=None,
- max_body_size=None, body_timeout=None):
+ def __init__(self, stream, address, is_client, params=None, method=None):
self.is_client = is_client
self.stream = stream
self.address = address
+ if params is None:
+ params = HTTP1ConnectionParameters()
+ self.params = params
+ self.no_keep_alive = params.no_keep_alive
# Save the socket's address family now so we know how to
# interpret self.address even after the stream is closed
# and its socket attribute replaced with None.
- self.address_family = stream.socket.family
- self.no_keep_alive = no_keep_alive
+ if stream.socket is not None:
+ self.address_family = stream.socket.family
+ else:
+ self.address_family = None
# In HTTPServerRequest we want an IP, not a full socket address.
if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
address is not None):
else:
# Unix (or other) socket; fake the remote address.
self.remote_ip = '0.0.0.0'
- if protocol:
- self.protocol = protocol
+ if self.params.protocol:
+ self.protocol = self.params.protocol
elif isinstance(stream, iostream.SSLIOStream):
self.protocol = "https"
else:
self.protocol = "http"
- self._chunk_size = chunk_size or 65536
- self._max_header_size = max_header_size or 65536
- self._header_timeout = header_timeout
- self._default_max_body_size = (max_body_size or
- self.stream.max_buffer_size)
- self._default_body_timeout = body_timeout
+ self._max_body_size = (self.params.max_body_size or
+ self.stream.max_buffer_size)
+ self._body_timeout = self.params.body_timeout
+ self._method = method
self._disconnect_on_finish = False
+ self._request_finished = False
self._clear_request_state()
self.stream.set_close_callback(self._on_connection_close)
self._finish_future = None
# True if we have read HTTP headers but have not yet read the
# corresponding body.
self._reading = False
- # This is set in _read_message. It's ugly to have this here,
- # but we need to be able to reset the delegate in finish() to divert
- # remaining input data to a null delegate when the request is aborted.
- self.message_delegate = None
- def start_serving(self, delegate, gzip=False):
- assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
- # Register the future on the IOLoop so its errors get logged.
- self.stream.io_loop.add_future(
- self._server_request_loop(delegate, gzip=gzip),
- lambda f: f.result())
+ def read_response(self, delegate):
+ if self.params.use_gzip:
+ delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
+ return self._read_message(delegate)
@gen.coroutine
- def _server_request_loop(self, delegate, gzip=False):
- while True:
- request_delegate = delegate.start_request(self)
- if gzip:
- request_delegate = _GzipMessageDelegate(request_delegate,
- self._chunk_size)
- try:
- ret = yield self._read_message(request_delegate)
- except iostream.StreamClosedError:
- self.close()
- return
- except Exception:
- # TODO: this is probably too broad; it would be better to
- # wrap all delegate calls in something that writes to app_log,
- # and then errors that reach this point can be gen_log.
- app_log.error("Uncaught exception", exc_info=True)
- self.close()
- return
- if not ret:
- return
-
- def read_response(self, delegate, method, use_gzip=False):
- if use_gzip:
- delegate = _GzipMessageDelegate(delegate, self._chunk_size)
- return self._read_message(delegate, method=method)
-
- @gen.coroutine
- def _read_message(self, delegate, method=None):
- assert isinstance(delegate, httputil.HTTPMessageDelegate)
- self.message_delegate = delegate
- self._max_body_size = self._default_max_body_size
- self._body_timeout = self._default_body_timeout
+ def _read_message(self, delegate):
try:
header_future = self.stream.read_until_regex(
b"\r?\n\r?\n",
- max_bytes=self._max_header_size)
- if self._header_timeout is None:
+ max_bytes=self.params.max_header_size)
+ if self.params.header_timeout is None:
header_data = yield header_future
else:
try:
header_data = yield gen.with_timeout(
- self.stream.io_loop.time() + self._header_timeout,
+ self.stream.io_loop.time() + self.params.header_timeout,
header_future,
io_loop=self.stream.io_loop)
except gen.TimeoutError:
self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
- header_future = self.message_delegate.headers_received(
+ header_future = delegate.headers_received(
start_line, headers)
if header_future is not None:
yield header_future
raise gen.Return(False)
skip_body = False
if self.is_client:
- if method == 'HEAD':
+ if self._method == 'HEAD':
skip_body = True
code = start_line.code
if code == 304:
if code >= 100 and code < 200:
# TODO: client delegates will get headers_received twice
# in the case of a 100-continue. Document or change?
- yield self._read_message(self.message_delegate,
- method=method)
+ yield self._read_message(delegate)
else:
if headers.get("Expect") == "100-continue":
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
if not skip_body:
- body_future = self._read_body(headers)
+ body_future = self._read_body(headers, delegate)
if body_future is not None:
if self._body_timeout is None:
yield body_future
self.stream.close()
raise gen.Return(False)
self._reading = False
- self.message_delegate.finish()
+ if not self._request_finished or self.is_client:
+ delegate.finish()
yield self._finish_future
if self.stream is None:
raise gen.Return(False)
raise gen.Return(False)
raise gen.Return(True)
-
def _clear_request_state(self):
"""Clears the per-request state.
and when the connection is closed (to break up cycles and
facilitate garbage collection in cpython).
"""
- self._request_finished = False
self._write_callback = None
self._close_callback = None
self._chunking = False
self._request_finished = True
# If the app finished the request while we're still reading,
- # divert any remaining input to a null delegate and close the
- # connection when we're done sending our response. Closing
- # the connection is the only way to avoid reading the whole
- # input body.
+ # divert any remaining data away from the delegate and
+ # close the connection when we're done sending our response.
+ # Closing the connection is the only way to avoid reading the
+ # whole input body.
if self._reading:
- self.message_delegate = httputil.HTTPMessageDelegate()
self._disconnect_on_finish = True
# No more data is coming, so instruct TCP to send any remaining
# data immediately instead of waiting for a full packet or ack.
self._finish_request()
def _can_keep_alive(self, start_line, headers):
- if self.no_keep_alive:
+ if self.params.no_keep_alive:
return False
connection_header = headers.get("Connection")
if connection_header is not None:
# Turn Nagle's algorithm back on, leaving the stream in its
# default state for the next request.
self.stream.set_nodelay(False)
- if self._finish_future is not None:
+ if self._finish_future is not None and not self._finish_future.done():
self._finish_future.set_result(None)
def _parse_headers(self, data):
data[eol:100])
return start_line, headers
- def _read_body(self, headers):
+ def _read_body(self, headers, delegate):
content_length = headers.get("Content-Length")
if content_length:
content_length = int(content_length)
if content_length > self._max_body_size:
raise httputil.HTTPInputException("Content-Length too long")
- return self._read_fixed_body(content_length)
+ return self._read_fixed_body(content_length, delegate)
if headers.get("Transfer-Encoding") == "chunked":
- return self._read_chunked_body()
+ return self._read_chunked_body(delegate)
if self.is_client:
- return self._read_body_until_close()
+ return self._read_body_until_close(delegate)
return None
@gen.coroutine
- def _read_fixed_body(self, content_length):
+ def _read_fixed_body(self, content_length, delegate):
while content_length > 0:
body = yield self.stream.read_bytes(
- min(self._chunk_size, content_length), partial=True)
+ min(self.params.chunk_size, content_length), partial=True)
content_length -= len(body)
- yield gen.maybe_future(self.message_delegate.data_received(body))
+ if not self._request_finished or self.is_client:
+ yield gen.maybe_future(delegate.data_received(body))
@gen.coroutine
- def _read_chunked_body(self):
+ def _read_chunked_body(self, delegate):
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
total_size = 0
while True:
bytes_to_read = chunk_len
while bytes_to_read:
chunk = yield self.stream.read_bytes(
- min(bytes_to_read, self._chunk_size), partial=True)
+ min(bytes_to_read, self.params.chunk_size), partial=True)
bytes_to_read -= len(chunk)
- yield gen.maybe_future(
- self.message_delegate.data_received(chunk))
+ if not self._request_finished or self.is_client:
+ yield gen.maybe_future(
+ delegate.data_received(chunk))
# chunk ends with \r\n
crlf = yield self.stream.read_bytes(2)
assert crlf == b"\r\n"
@gen.coroutine
- def _read_body_until_close(self):
+ def _read_body_until_close(self, delegate):
body = yield self.stream.read_until_close()
- self.message_delegate.data_received(body)
+ if not self._request_finished or self.is_client:
+ delegate.data_received(body)
class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
# anything, treat it as an extra chunk
self._delegate.data_received(tail)
return self._delegate.finish()
+
+
+class HTTP1ServerConnection(object):
+ def __init__(self, stream, address, params=None):
+ self.stream = stream
+ self.address = address
+ if params is None:
+ params = HTTP1ConnectionParameters()
+ self.params = params
+
+ def start_serving(self, delegate):
+ assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
+ # Register the future on the IOLoop so its errors get logged.
+ self.stream.io_loop.add_future(
+ self._server_request_loop(delegate),
+ lambda f: f.result())
+
+ @gen.coroutine
+ def _server_request_loop(self, delegate):
+ while True:
+ conn = HTTP1Connection(self.stream, self.address, is_client=False,
+ params=self.params)
+ request_delegate = delegate.start_request(conn)
+ try:
+ ret = yield conn.read_response(request_delegate)
+ conn._clear_request_state()
+ except iostream.StreamClosedError:
+ conn.close()
+ return
+ except Exception:
+ # TODO: this is probably too broad; it would be better to
+ # wrap all delegate calls in something that writes to app_log,
+ # and then errors that reach this point can be gen_log.
+ app_log.error("Uncaught exception", exc_info=True)
+ conn.close()
+ return
+ if not ret:
+ return