# While reading a body with a content-length, this is the
# amount left to read.
self._expected_content_remaining = None
+ # A Future for our outgoing writes, returned by IOStream.write.
+ self._pending_write = None
def read_response(self, delegate):
"""Read a single HTTP response.
data = b"\r\n".join(lines) + b"\r\n\r\n"
if chunk:
data += self._format_chunk(chunk)
- self.stream.write(data, self._on_write_complete)
+ self._pending_write = self.stream.write(data)
+ self._pending_write.add_done_callback(self._on_write_complete)
return self._write_future
def _format_chunk(self, chunk):
skip `write_headers` and instead call `write()` with a
pre-encoded header block.
"""
+ future = None
if self.stream.closed():
- self._write_future = Future()
+ future = self._write_future = Future()
self._write_future.set_exception(iostream.StreamClosedError())
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
- self._write_future = Future()
- self.stream.write(self._format_chunk(chunk),
- self._on_write_complete)
- return self._write_future
+ future = self._write_future = Future()
+ self._pending_write = self.stream.write(self._format_chunk(chunk))
+ self._pending_write.add_done_callback(self._on_write_complete)
+ return future
def finish(self):
"""Implements `.HTTPConnection.finish`."""
self._expected_content_remaining)
if self._chunking_output:
if not self.stream.closed():
- self.stream.write(b"0\r\n\r\n", self._on_write_complete)
+ self._pending_write = self.stream.write(b"0\r\n\r\n")
+ self._pending_write.add_done_callback(self._on_write_complete)
self._write_finished = True
# If the app finished the request while we're still reading,
# divert any remaining data away from the delegate and
# No more data is coming, so instruct TCP to send any remaining
# data immediately instead of waiting for a full packet or ack.
self.stream.set_nodelay(True)
- if not self.stream.writing():
- self._finish_request()
+ if self._pending_write is None:
+ self._finish_request(None)
+ else:
+ self._pending_write.add_done_callback(self._finish_request)
- def _on_write_complete(self):
+ def _on_write_complete(self, future):
if self._write_callback is not None:
callback = self._write_callback
self._write_callback = None
- callback()
+ self.stream.io_loop.add_callback(callback)
if self._write_future is not None:
future = self._write_future
self._write_future = None
future.set_result(None)
- # _on_write_complete is enqueued on the IOLoop whenever the
- # IOStream's write buffer becomes empty, but it's possible for
- # another callback that runs on the IOLoop before it to
- # simultaneously write more data and finish the request. If
- # there is still data in the IOStream, a future
- # _on_write_complete will be responsible for calling
- # _finish_request.
- if self._write_finished and not self.stream.writing():
- self._finish_request()
def _can_keep_alive(self, start_line, headers):
if self.params.no_keep_alive:
return connection_header == "keep-alive"
return False
- def _finish_request(self):
+ def _finish_request(self, future):
self._clear_callbacks()
if not self.is_client and self._disconnect_on_finish:
self.close()