self._read_bytes = None
self._read_until_close = False
self._read_callback = None
+ self._read_future = None
self._streaming_callback = None
self._write_callback = None
self._close_callback = None
"""
return None
- @_iostream_return_future
- def read_until_regex(self, regex, callback):
+ def read_until_regex(self, regex, callback=None):
"""Run ``callback`` when we read the given regex pattern.
The callback will get the data read (including the data that
matched the regex and anything that came before it) as an argument.
"""
- self._set_read_callback(callback)
+ future = self._set_read_callback(callback)
self._read_regex = re.compile(regex)
self._try_inline_read()
+ return future
- @_iostream_return_future
- def read_until(self, delimiter, callback):
+ def read_until(self, delimiter, callback=None):
"""Run ``callback`` when we read the given delimiter.
The callback will get the data read (including the delimiter)
as an argument.
"""
- self._set_read_callback(callback)
+ future = self._set_read_callback(callback)
self._read_delimiter = delimiter
self._try_inline_read()
+ return future
- @_iostream_return_future
- def read_bytes(self, num_bytes, callback, streaming_callback=None):
+ def read_bytes(self, num_bytes, callback=None, streaming_callback=None):
"""Run callback when we read the given number of bytes.
If a ``streaming_callback`` is given, it will be called with chunks
``callback`` will be empty. Otherwise, the ``callback`` gets
the data as an argument.
"""
- self._set_read_callback(callback)
+ future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._streaming_callback = stack_context.wrap(streaming_callback)
self._try_inline_read()
+ return future
- @_iostream_return_future
- def read_until_close(self, callback, streaming_callback=None):
+ def read_until_close(self, callback=None, streaming_callback=None):
"""Reads all data from the socket until it is closed.
If a ``streaming_callback`` is given, it will be called with chunks
Subject to ``max_buffer_size`` limit from `IOStream` constructor if
a ``streaming_callback`` is not used.
"""
- self._set_read_callback(callback)
+ future = self._set_read_callback(callback)
self._streaming_callback = stack_context.wrap(streaming_callback)
if self.closed():
if self._streaming_callback is not None:
self._run_callback(self._streaming_callback,
self._consume(self._read_buffer_size))
- self._run_callback(self._read_callback,
- self._consume(self._read_buffer_size))
- self._streaming_callback = None
- self._read_callback = None
- return
+ self._run_read_callback(self._consume(self._read_buffer_size))
+ return future
self._read_until_close = True
self._streaming_callback = stack_context.wrap(streaming_callback)
self._try_inline_read()
+ return future
@_iostream_return_future
def write(self, data, callback=None):
self._read_buffer_size):
self._run_callback(self._streaming_callback,
self._consume(self._read_buffer_size))
- callback = self._read_callback
- self._read_callback = None
self._read_until_close = False
- self._run_callback(callback,
- self._consume(self._read_buffer_size))
+ self._run_read_callback(self._consume(self._read_buffer_size))
if self._state is not None:
self.io_loop.remove_handler(self.fileno())
self._state = None
# from the set as it is closed.
for fut in list(self._pending_futures):
fut.set_exception(StreamClosedError())
+ if self._read_future is not None:
+ self._read_future.set_exception(StreamClosedError())
+ self._read_future = None
if self._close_callback is not None:
cb = self._close_callback
self._close_callback = None
self._maybe_run_close_callback()
def _set_read_callback(self, callback):
- assert not self._read_callback, "Already reading"
- self._read_callback = stack_context.wrap(callback)
+ assert self._read_callback is None, "Already reading"
+ assert self._read_future is None, "Already reading"
+ if callback is not None:
+ self._read_callback = stack_context.wrap(callback)
+ else:
+ self._read_future = TracebackFuture()
+ return self._read_future
+
+ def _run_read_callback(self, data):
+ self._streaming_callback = None
+ if self._read_future is not None:
+ self._read_future.set_result(data)
+ self._read_future = None
+ if self._read_callback is not None:
+ callback = self._read_callback
+ self._read_callback = None
+ self._run_callback(callback, data)
+
def _try_inline_read(self):
"""Attempt to complete the current read operation from buffered data.
self._consume(bytes_to_consume))
if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes:
num_bytes = self._read_bytes
- callback = self._read_callback
- self._read_callback = None
- self._streaming_callback = None
self._read_bytes = None
- self._run_callback(callback, self._consume(num_bytes))
+ self._run_read_callback(self._consume(num_bytes))
return True
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
while True:
loc = self._read_buffer[0].find(self._read_delimiter)
if loc != -1:
- callback = self._read_callback
delimiter_len = len(self._read_delimiter)
- self._read_callback = None
- self._streaming_callback = None
self._read_delimiter = None
- self._run_callback(callback,
- self._consume(loc + delimiter_len))
+ self._run_read_callback(
+ self._consume(loc + delimiter_len))
return True
if len(self._read_buffer) == 1:
break
while True:
m = self._read_regex.search(self._read_buffer[0])
if m is not None:
- callback = self._read_callback
- self._read_callback = None
- self._streaming_callback = None
self._read_regex = None
- self._run_callback(callback, self._consume(m.end()))
+ self._run_read_callback(self._consume(m.end()))
return True
if len(self._read_buffer) == 1:
break