From: Ben Darnell Date: Wed, 6 Jul 2011 05:51:37 +0000 (-0700) Subject: Document IOStream fast-path/slow-path, and allow for chains of fast-path ops. X-Git-Tag: v2.1.0~106 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=64d8ce60a9e224a30af3f91cc737229783b14f84;p=thirdparty%2Ftornado.git Document IOStream fast-path/slow-path, and allow for chains of fast-path ops. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index b41800f10..abf34a2a5 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -97,6 +97,7 @@ class IOStream(object): self._connect_callback = None self._connecting = False self._state = None + self._pending_callbacks = 0 def connect(self, address, callback=None): """Connects the socket to a remote address without blocking. @@ -178,11 +179,7 @@ class IOStream(object): self._handle_write() if self._write_buffer: self._add_io_state(self.io_loop.WRITE) - elif callback is None and self._state is None: - # if callback is not None and we just completed the write - # in the fast path, _add_io_state will be called from - # _run_callback (see comments there for why) - self._add_io_state(0) + self._maybe_add_error_listener() def set_close_callback(self, callback): """Call the given callback when the stream is closed.""" @@ -255,6 +252,7 @@ class IOStream(object): def _run_callback(self, callback, *args): def wrapper(): + self._pending_callbacks -= 1 try: callback(*args) except Exception: @@ -268,14 +266,7 @@ class IOStream(object): # Re-raise the exception so that IOLoop.handle_callback_exception # can see it and log the error raise - if self._state is None: - # If we got here with no self._state, this callback must - # have been triggered by a fast-path read or write - # (and the callback we just executed did not start a - # slow-path operation). If we've never done a slow-path - # op, we can't tell if the connection is closed out from - # under us, so we must add the IOLoop callback here. - self._add_io_state(0) + self._maybe_add_error_listener() # We schedule callbacks to be run on the next IOLoop iteration # rather than running them directly for several reasons: # * Prevents unbounded stack growth when a callback calls an @@ -290,6 +281,7 @@ class IOStream(object): # important if the callback was pre-wrapped before entry to # IOStream (as in HTTPConnection._header_callback), as we could # capture and leak the wrong context here. + self._pending_callbacks += 1 self.io_loop.add_callback(wrapper) def _handle_read(self): @@ -446,7 +438,31 @@ class IOStream(object): if not self.socket: raise IOError("Stream is closed") + def _maybe_add_error_listener(self): + if self._state is None and self._pending_callbacks == 0: + self._add_io_state(0) + def _add_io_state(self, state): + """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler. + + Implementation notes: Reads and writes have a fast path and a + slow path. The fast path reads synchronously from socket + buffers, while the slow path uses `_add_io_state` to schedule + an IOLoop callback. Note that in both cases, the callback is + run asynchronously with `_run_callback`. + + To detect closed connections, we must have called + `_add_io_state` at some point, but we want to delay this as + much as possible so we don't have to set an `IOLoop.ERROR` + listener that will be overwritten by the next slow-path + operation. As long as there are callbacks scheduled for + fast-path ops, those callbacks may do more reads. + If a sequence of fast-path ops do not end in a slow-path op, + (e.g. for an @asynchronous long-poll request), we must add + the error handler. This is done in `_run_callback` and `write` + (since the write callback is optional so we can have a + fast-path write with no `_run_callback`) + """ if self.socket is None: # connection has been closed, so there can be no future events return