self._connect_callback = None
self._connecting = False
self._state = None
+ self._pending_callbacks = 0
def connect(self, address, callback=None):
"""Connects the socket to a remote address without blocking.
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
- elif callback is None and self._state is None:
- # if callback is not None and we just completed the write
- # in the fast path, _add_io_state will be called from
- # _run_callback (see comments there for why)
- self._add_io_state(0)
+ self._maybe_add_error_listener()
def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
def _run_callback(self, callback, *args):
def wrapper():
+ self._pending_callbacks -= 1
try:
callback(*args)
except Exception:
# Re-raise the exception so that IOLoop.handle_callback_exception
# can see it and log the error
raise
- if self._state is None:
- # If we got here with no self._state, this callback must
- # have been triggered by a fast-path read or write
- # (and the callback we just executed did not start a
- # slow-path operation). If we've never done a slow-path
- # op, we can't tell if the connection is closed out from
- # under us, so we must add the IOLoop callback here.
- self._add_io_state(0)
+ self._maybe_add_error_listener()
# We schedule callbacks to be run on the next IOLoop iteration
# rather than running them directly for several reasons:
# * Prevents unbounded stack growth when a callback calls an
# important if the callback was pre-wrapped before entry to
# IOStream (as in HTTPConnection._header_callback), as we could
# capture and leak the wrong context here.
+ self._pending_callbacks += 1
self.io_loop.add_callback(wrapper)
def _handle_read(self):
if not self.socket:
raise IOError("Stream is closed")
+ def _maybe_add_error_listener(self):
+ if self._state is None and self._pending_callbacks == 0:
+ self._add_io_state(0)
+
def _add_io_state(self, state):
+ """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
+
+ Implementation notes: Reads and writes have a fast path and a
+ slow path. The fast path reads synchronously from socket
+ buffers, while the slow path uses `_add_io_state` to schedule
+ an IOLoop callback. Note that in both cases, the callback is
+ run asynchronously with `_run_callback`.
+
+ To detect closed connections, we must have called
+ `_add_io_state` at some point, but we want to delay this as
+ much as possible so we don't have to set an `IOLoop.ERROR`
+ listener that will be overwritten by the next slow-path
+ operation. As long as there are callbacks scheduled for
+ fast-path ops, those callbacks may do more reads.
+ If a sequence of fast-path ops do not end in a slow-path op,
+ (e.g. for an @asynchronous long-poll request), we must add
+ the error handler. This is done in `_run_callback` and `write`
+ (since the write callback is optional so we can have a
+ fast-path write with no `_run_callback`)
+ """
if self.socket is None:
# connection has been closed, so there can be no future events
return