self._state = None
self.socket.close()
self.socket = None
- if self._close_callback and self._pending_callbacks == 0:
- # if there are pending callbacks, don't run the close callback
- # until they're done (see _maybe_add_error_handler)
- cb = self._close_callback
- self._close_callback = None
- self._run_callback(cb)
+ self._maybe_run_close_callback()
+
+ def _maybe_run_close_callback(self):
+ if (self.socket is None and self._close_callback and
+ self._pending_callbacks == 0):
+ # if there are pending callbacks, don't run the close callback
+ # until they're done (see _maybe_add_error_handler)
+ cb = self._close_callback
+ self._close_callback = None
+ self._run_callback(cb)
def reading(self):
"""Returns true if we are currently reading from the stream."""
self.io_loop.add_callback(wrapper)
def _handle_read(self):
- while True:
+ try:
try:
- # Read from the socket until we get EWOULDBLOCK or equivalent.
- # SSL sockets do some internal buffering, and if the data is
- # sitting in the SSL object's buffer select() and friends
- # can't see it; the only way to find out if it's there is to
- # try to read it.
- result = self._read_to_buffer()
- except Exception:
- self.close()
- return
- if result == 0:
- break
- else:
- if self._read_from_buffer():
- return
+ # Pretend to have a pending callback so that an EOF in
+ # _read_to_buffer doesn't trigger an immediate close
+ # callback. At the end of this method we'll either
+ # estabilsh a real pending callback via
+ # _read_from_buffer or run the close callback.
+ #
+ # We need two try statements here so that
+ # pending_callbacks is decremented before the `except`
+ # clause below (which calls `close` and does need to
+ # trigger the callback)
+ self._pending_callbacks += 1
+ while True:
+ # Read from the socket until we get EWOULDBLOCK or equivalent.
+ # SSL sockets do some internal buffering, and if the data is
+ # sitting in the SSL object's buffer select() and friends
+ # can't see it; the only way to find out if it's there is to
+ # try to read it.
+ if self._read_to_buffer() == 0:
+ break
+ finally:
+ self._pending_callbacks -= 1
+ except Exception:
+ logging.warning("error on read", exc_info=True)
+ self.close()
+ return
+ if self._read_from_buffer():
+ return
+ else:
+ self._maybe_run_close_callback()
+
def _set_read_callback(self, callback):
assert not self._read_callback, "Already reading"
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
+ # See if we've already got the data from a previous read
+ if self._read_from_buffer():
+ return
+ self._check_closed()
while True:
- # See if we've already got the data from a previous read
- if self._read_from_buffer():
- return
- self._check_closed()
if self._read_to_buffer() == 0:
break
+ self._check_closed()
+ if self._read_from_buffer():
+ return
self._add_io_state(self.io_loop.READ)
def _read_from_socket(self):
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if self.socket is None:
- cb = self._close_callback
- if cb is not None:
- self._close_callback = None
- self._run_callback(cb)
+ self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)