We parse HTTP headers and bodies, and execute the request callback
until the HTTP conection is closed.
"""
- def __init__(self, stream, address, is_client, params=None, method=None):
+ def __init__(self, stream, address, is_client, params=None):
self.is_client = is_client
self.stream = stream
self.address = address
self.protocol = "https"
else:
self.protocol = "http"
+ # The body limits can be altered by the delegate, so save them
+ # here instead of just referencing self.params later.
self._max_body_size = (self.params.max_body_size or
self.stream.max_buffer_size)
self._body_timeout = self.params.body_timeout
- self._method = method
+ # _write_finished is set to True when finish() has been called,
+ # i.e. there will be no more data sent. Data may still be in the
+ # stream's write buffer.
+ self._write_finished = False
+ # True when we have read the entire incoming body.
+ self._read_finished = False
+ # _finish_future resolves when all data has been written and flushed
+ # to the IOStream.
+ self._finish_future = Future()
+ # If true, the connection should be closed after this request
+ # (after the response has been written in the server side,
+ # and after it has been read in the client)
self._disconnect_on_finish = False
- self._request_finished = False
- self._clear_request_state()
+ self._clear_callbacks()
self.stream.set_close_callback(self._on_connection_close)
- self._finish_future = None
+ # Save the start lines after we read or write them; they
+ # affect later processing (e.g. 304 responses and HEAD methods
+ # have content-length but no bodies)
self._request_start_line = None
- self._chunking = None
+ self._response_start_line = None
+ # True if we are writing output with chunked encoding.
+ self._chunking_output = None
+ # While reading a body with a content-length, this is the
+ # amount left to read.
self._expected_content_remaining = None
- # True if we have read HTTP headers but have not yet read the
- # corresponding body.
- self._reading = False
def read_response(self, delegate):
if self.params.use_gzip:
except gen.TimeoutError:
self.close()
raise gen.Return(False)
- self._reading = True
- self._finish_future = Future()
start_line, headers = self._parse_headers(header_data)
if self.is_client:
start_line = httputil.parse_response_start_line(start_line)
+ self._response_start_line = start_line
else:
start_line = httputil.parse_request_start_line(start_line)
- # It's kind of ugly to set this here, but we need it in
- # write_header().
- self._request_start_line = start_line
+ self._request_start_line = start_line
self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
- header_future = delegate.headers_received(
- start_line, headers)
+ header_future = delegate.headers_received(start_line, headers)
if header_future is not None:
yield header_future
if self.stream is None:
# We've been detached.
- # TODO: where else do we need to check for detach?
raise gen.Return(False)
skip_body = False
if self.is_client:
- if self._method == 'HEAD':
+ if (self._request_start_line is not None and
+ self._request_start_line.method == 'HEAD'):
skip_body = True
code = start_line.code
if code == 304:
self.address)
self.stream.close()
raise gen.Return(False)
- self._reading = False
- if not self._request_finished or self.is_client:
+ self._read_finished = True
+ if not self._write_finished or self.is_client:
delegate.finish()
yield self._finish_future
+ if self.is_client and self._disconnect_on_finish:
+ self.close()
if self.stream is None:
raise gen.Return(False)
except httputil.HTTPInputException as e:
self.address, e)
self.close()
raise gen.Return(False)
+ finally:
+ self._clear_callbacks()
raise gen.Return(True)
- def _clear_request_state(self):
- """Clears the per-request state.
+ def _clear_callbacks(self):
+ """Clears the callback attributes.
- This is run in between requests to allow the previous handler
- to be garbage collected (and prevent spurious close callbacks),
- and when the connection is closed (to break up cycles and
- facilitate garbage collection in cpython).
+ This allows the request handler to be garbage collected more
+ quickly in CPython by breaking up reference cycles.
"""
self._write_callback = None
self._close_callback = None
callback = self._close_callback
self._close_callback = None
callback()
- if self._finish_future is not None and not self._finish_future.done():
+ if not self._finish_future.done():
self._finish_future.set_result(None)
- # Delete any unfinished callbacks to break up reference cycles.
- self._clear_request_state()
+ self._clear_callbacks()
def close(self):
self.stream.close()
- # Remove this reference to self, which would otherwise cause a
- # cycle and delay garbage collection of this connection.
- self._clear_request_state()
+ self._clear_callbacks()
def detach(self):
stream = self.stream
def write_headers(self, start_line, headers, chunk=None, callback=None,
has_body=True):
if self.is_client:
+ self._request_start_line = start_line
# Client requests with a non-empty body must have either a
# Content-Length or a Transfer-Encoding.
- self._chunking = (
+ self._chunking_output = (
has_body and
'Content-Length' not in headers and
'Transfer-Encoding' not in headers)
else:
- self._chunking = (
+ self._response_start_line = start_line
+ self._chunking_output = (
has_body and
# TODO: should this use
# self._request_start_line.version or
# Applications are discouraged from touching Transfer-Encoding,
# but if they do, leave it alone.
'Transfer-Encoding' not in headers)
- if self._chunking:
+ if self._chunking_output:
headers['Transfer-Encoding'] = 'chunked'
if (not self.is_client and
(self._request_start_line.method == 'HEAD' or
self.stream.close()
raise httputil.HTTPOutputException(
"Tried to write more data than Content-Length")
- if self._chunking and chunk:
+ if self._chunking_output and chunk:
# Don't write out empty chunks because that means END-OF-STREAM
# with chunked encoding
return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
raise httputil.HTTPOutputException(
"Tried to write %d bytes less than Content-Length" %
self._expected_content_remaining)
- if self._chunking:
+ if self._chunking_output:
if not self.stream.closed():
self.stream.write(b"0\r\n\r\n", self._on_write_complete)
- self._chunking = False
- self._request_finished = True
+ self._write_finished = True
# If the app finished the request while we're still reading,
# divert any remaining data away from the delegate and
# close the connection when we're done sending our response.
# Closing the connection is the only way to avoid reading the
# whole input body.
- if self._reading:
+ if not self._read_finished:
self._disconnect_on_finish = True
# No more data is coming, so instruct TCP to send any remaining
# data immediately instead of waiting for a full packet or ack.
# there is still data in the IOStream, a future
# _on_write_complete will be responsible for calling
# _finish_request.
- if self._request_finished and not self.stream.writing():
+ if self._write_finished and not self.stream.writing():
self._finish_request()
def _can_keep_alive(self, start_line, headers):
return False
def _finish_request(self):
- self._clear_request_state()
- if self._disconnect_on_finish:
+ self._clear_callbacks()
+ if not self.is_client and self._disconnect_on_finish:
self.close()
return
# Turn Nagle's algorithm back on, leaving the stream in its
# default state for the next request.
self.stream.set_nodelay(False)
- if self._finish_future is not None and not self._finish_future.done():
+ if not self._finish_future.done():
self._finish_future.set_result(None)
def _parse_headers(self, data):
body = yield self.stream.read_bytes(
min(self.params.chunk_size, content_length), partial=True)
content_length -= len(body)
- if not self._request_finished or self.is_client:
+ if not self._write_finished or self.is_client:
yield gen.maybe_future(delegate.data_received(body))
@gen.coroutine
chunk = yield self.stream.read_bytes(
min(bytes_to_read, self.params.chunk_size), partial=True)
bytes_to_read -= len(chunk)
- if not self._request_finished or self.is_client:
+ if not self._write_finished or self.is_client:
yield gen.maybe_future(
delegate.data_received(chunk))
# chunk ends with \r\n
@gen.coroutine
def _read_body_until_close(self, delegate):
body = yield self.stream.read_until_close()
- if not self._request_finished or self.is_client:
+ if not self._write_finished or self.is_client:
delegate.data_received(body)
@gen.coroutine
def _server_request_loop(self, delegate):
while True:
- conn = HTTP1Connection(self.stream, self.address, is_client=False,
- params=self.params)
+ conn = HTTP1Connection(self.stream, self.address, False,
+ self.params)
request_delegate = delegate.start_request(conn)
try:
ret = yield conn.read_response(request_delegate)
- conn._clear_request_state()
except iostream.StreamClosedError:
- conn.close()
return
except Exception:
# TODO: this is probably too broad; it would be better to