assert not self._read_callback, "Already reading"
loc = self._read_buffer.find(delimiter)
if loc != -1:
- callback(self._consume(loc + len(delimiter)))
+ self._run_callback(callback, self._consume(loc + len(delimiter)))
return
self._check_closed()
self._read_delimiter = delimiter
self.io_loop.remove_handler(self.socket.fileno())
self.socket.close()
self.socket = None
- if self._close_callback: self._close_callback()
+ if self._close_callback:
+ self._run_callback(self._close_callback)
def reading(self):
"""Returns true if we are currently reading from the stream."""
self._state = state
self.io_loop.update_handler(self.socket.fileno(), self._state)
+ def _run_callback(self, callback, *args, **kwargs):
+ try:
+ callback(*args, **kwargs)
+ except:
+ # Close the socket on an uncaught exception from a user callback
+ # (It would eventually get closed when the socket object is
+ # gc'd, but we don't want to rely on gc happening before we
+ # run out of file descriptors)
+ self.close()
+ # Re-raise the exception so that IOLoop.handle_callback_exception
+ # can see it and log the error
+ raise
+
def _handle_read(self):
try:
chunk = self.socket.recv(self.read_chunk_size)
callback = self._read_callback
self._read_callback = None
self._read_bytes = None
- callback(self._consume(num_bytes))
+ self._run_callback(callback, self._consume(num_bytes))
elif self._read_delimiter:
loc = self._read_buffer.find(self._read_delimiter)
if loc != -1:
delimiter_len = len(self._read_delimiter)
self._read_callback = None
self._read_delimiter = None
- callback(self._consume(loc + delimiter_len))
+ self._run_callback(callback,
+ self._consume(loc + delimiter_len))
def _handle_write(self):
while self._write_buffer:
if not self._write_buffer and self._write_callback:
callback = self._write_callback
self._write_callback = None
- callback()
+ self._run_callback(callback)
def _consume(self, loc):
result = self._read_buffer[:loc]