target_bytes = None
else:
target_bytes = 0
+ next_find_pos = 0
# Pretend to have a pending callback so that an EOF in
# _read_to_buffer doesn't trigger an immediate close
# callback. At the end of this method we'll either
if self._read_to_buffer() == 0:
break
+ self._run_streaming_callback()
+
# If we've read all the bytes we can use, break out of
- # this loop. It would be better to call
- # read_from_buffer here, but
- # A) this has subtle interactions with the
- # pending_callback and error_listener mechanisms
- # B) Simply calling read_from_buffer on every iteration
- # is inefficient for delimited reads.
- # TODO: restructure this so we can call read_from_buffer
- # for unbounded delimited reads.
+ # this loop. We can't just call read_from_buffer here
+ # because of subtle interactions with the
+ # pending_callback and error_listener mechanisms.
+ #
+ # If we've reached target_bytes, we know we're done.
if (target_bytes is not None and
self._read_buffer_size >= target_bytes):
break
+
+ # Otherwise, we need to call the more expensive find_read_pos.
+ # It's inefficient to do this on every read, so instead
+ # do it on the first read and whenever the read buffer
+ # size has doubled.
+ if self._read_buffer_size >= next_find_pos:
+ pos = self._find_read_pos()
+ if pos is not None:
+ return pos
+ next_find_pos = self._read_buffer_size * 2
+ return self._find_read_pos()
finally:
self._pending_callbacks -= 1
def _handle_read(self):
try:
- self._read_to_buffer_loop()
+ pos = self._read_to_buffer_loop()
except UnsatisfiableReadError:
raise
except Exception:
gen_log.warning("error on read", exc_info=True)
self.close(exc_info=True)
return
- if self._read_from_buffer():
+ if pos is not None:
+ self._read_from_buffer(pos)
return
else:
self._maybe_run_close_callback()
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
- if self._read_from_buffer():
+ self._run_streaming_callback()
+ pos = self._find_read_pos()
+ if pos is not None:
+ self._read_from_buffer(pos)
return
self._check_closed()
try:
- self._read_to_buffer_loop()
+ pos = self._read_to_buffer_loop()
except Exception:
# If there was an in _read_to_buffer, we called close() already,
# but couldn't run the close callback because of _pending_callbacks.
# applicable.
self._maybe_run_close_callback()
raise
- if self._read_from_buffer():
+ if pos is not None:
+ self._read_from_buffer(pos)
return
# We couldn't satisfy the read inline, so either close the stream
# or listen for new data.
raise IOError("Reached maximum read buffer size")
return len(chunk)
- def _read_from_buffer(self):
- """Attempts to complete the currently-pending read from the buffer.
-
- Returns True if the read was completed.
- """
+ def _run_streaming_callback(self):
if self._streaming_callback is not None and self._read_buffer_size:
bytes_to_consume = self._read_buffer_size
if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume
self._run_read_callback(bytes_to_consume, True)
+
+ def _read_from_buffer(self, pos):
+ """Attempts to complete the currently-pending read from the buffer.
+
+ The argument is either a position in the read buffer or None,
+ as returned by _find_read_pos.
+ """
+ self._read_bytes = self._read_delimiter = self._read_regex = None
+ self._read_partial = False
+ self._run_read_callback(pos, False)
+
+ def _find_read_pos(self):
+ """Attempts to find a position in the read buffer that satisfies
+ the currently-pending read.
+
+ Returns a position in the buffer if the current read can be satisfied,
+ or None if it cannot.
+ """
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
- self._read_bytes = None
- self._read_partial = False
- self._run_read_callback(num_bytes, False)
- return True
+ return num_bytes
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
# chunks in the read buffer, so we can't easily find them
delimiter_len = len(self._read_delimiter)
self._check_max_bytes(self._read_delimiter,
loc + delimiter_len)
- self._read_delimiter = None
- self._run_read_callback(loc + delimiter_len, False)
- return True
+ return loc + delimiter_len
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
m = self._read_regex.search(self._read_buffer[0])
if m is not None:
self._check_max_bytes(self._read_regex, m.end())
- self._read_regex = None
- self._run_read_callback(m.end(), False)
- return True
+ return m.end()
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
self._check_max_bytes(self._read_regex,
len(self._read_buffer[0]))
- return False
+ return None
def _check_max_bytes(self, delimiter, size):
if (self._read_max_bytes is not None and