self._streaming_callback = stack_context.wrap(streaming_callback)
if self.closed():
if self._streaming_callback is not None:
- self._run_callback(self._streaming_callback,
- self._consume(self._read_buffer_size))
- self._run_read_callback(self._consume(self._read_buffer_size))
+ self._run_read_callback(self._read_buffer_size, True)
+ self._run_read_callback(self._read_buffer_size, False)
return future
self._read_until_close = True
self._streaming_callback = stack_context.wrap(streaming_callback)
if self._read_until_close:
if (self._streaming_callback is not None and
self._read_buffer_size):
- self._run_callback(self._streaming_callback,
- self._consume(self._read_buffer_size))
+ self._run_read_callback(self._read_buffer_size, True)
self._read_until_close = False
- self._run_read_callback(self._consume(self._read_buffer_size))
+ self._run_read_callback(self._read_buffer_size, False)
if self._state is not None:
self.io_loop.remove_handler(self.fileno())
self._state = None
self._read_future = TracebackFuture()
return self._read_future
- def _run_read_callback(self, data):
- self._streaming_callback = None
+ def _run_read_callback(self, size, streaming):
+ if streaming:
+ callback = self._streaming_callback
+ else:
+ callback = self._read_callback
+ self._read_callback = self._streaming_callback = None
if self._read_future is not None:
+ assert callback is None
future = self._read_future
self._read_future = None
- future.set_result(data)
- if self._read_callback is not None:
- callback = self._read_callback
- self._read_callback = None
- self._run_callback(callback, data)
+ future.set_result(self._consume(size))
+ if callback is not None:
+ assert self._read_future is None
+ self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume
- self._run_callback(self._streaming_callback,
- self._consume(bytes_to_consume))
+ self._run_read_callback(bytes_to_consume, True)
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
self._read_bytes = None
self._read_partial = False
- self._run_read_callback(self._consume(num_bytes))
+ self._run_read_callback(num_bytes, False)
return True
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
# length) tend to be "line" oriented, the delimiter is likely
# to be in the first few chunks. Merge the buffer gradually
# since large merges are relatively expensive and get undone in
- # consume().
+ # _consume().
if self._read_buffer:
while True:
loc = self._read_buffer[0].find(self._read_delimiter)
self._check_max_bytes(self._read_delimiter,
loc + delimiter_len)
self._read_delimiter = None
- self._run_read_callback(
- self._consume(loc + delimiter_len))
+ self._run_read_callback(loc + delimiter_len, False)
return True
if len(self._read_buffer) == 1:
break
if m is not None:
self._check_max_bytes(self._read_regex, m.end())
self._read_regex = None
- self._run_read_callback(self._consume(m.end()))
+ self._run_read_callback(m.end(), False)
return True
if len(self._read_buffer) == 1:
break