self.max_write_buffer_size = max_write_buffer_size
self.error = None # type: Optional[BaseException]
self._read_buffer = bytearray()
- self._read_buffer_pos = 0
self._read_buffer_size = 0
self._user_read_buffer = False
self._after_user_read_buffer = None # type: Optional[bytearray]
available_bytes = self._read_buffer_size
n = len(buf)
if available_bytes >= n:
- end = self._read_buffer_pos + n
- buf[:] = memoryview(self._read_buffer)[self._read_buffer_pos : end]
- del self._read_buffer[:end]
+ buf[:] = memoryview(self._read_buffer)[:n]
+ del self._read_buffer[:n]
self._after_user_read_buffer = self._read_buffer
elif available_bytes > 0:
- buf[:available_bytes] = memoryview(self._read_buffer)[
- self._read_buffer_pos :
- ]
+ buf[:available_bytes] = memoryview(self._read_buffer)[:]
# Set up the supplied buffer as our temporary read buffer.
# The original (if it had any data remaining) has been
# saved for later.
self._user_read_buffer = True
self._read_buffer = buf
- self._read_buffer_pos = 0
self._read_buffer_size = available_bytes
self._read_bytes = n
self._read_partial = partial
if self._user_read_buffer:
self._read_buffer = self._after_user_read_buffer or bytearray()
self._after_user_read_buffer = None
- self._read_buffer_pos = 0
self._read_buffer_size = len(self._read_buffer)
self._user_read_buffer = False
result = size # type: Union[int, bytes]
# since large merges are relatively expensive and get undone in
# _consume().
if self._read_buffer:
- loc = self._read_buffer.find(
- self._read_delimiter, self._read_buffer_pos
- )
+ loc = self._read_buffer.find(self._read_delimiter)
if loc != -1:
- loc -= self._read_buffer_pos
delimiter_len = len(self._read_delimiter)
self._check_max_bytes(self._read_delimiter, loc + delimiter_len)
return loc + delimiter_len
self._check_max_bytes(self._read_delimiter, self._read_buffer_size)
elif self._read_regex is not None:
if self._read_buffer:
- m = self._read_regex.search(self._read_buffer, self._read_buffer_pos)
+ m = self._read_regex.search(self._read_buffer)
if m is not None:
- loc = m.end() - self._read_buffer_pos
+ loc = m.end()
self._check_max_bytes(self._read_regex, loc)
return loc
self._check_max_bytes(self._read_regex, self._read_buffer_size)
return b""
assert loc <= self._read_buffer_size
# Slice the bytearray buffer into bytes, without intermediate copying
- b = (
- memoryview(self._read_buffer)[
- self._read_buffer_pos : self._read_buffer_pos + loc
- ]
- ).tobytes()
- self._read_buffer_pos += loc
+ b = (memoryview(self._read_buffer)[:loc]).tobytes()
self._read_buffer_size -= loc
- del self._read_buffer[: self._read_buffer_pos]
- self._read_buffer_pos = 0
+ del self._read_buffer[:loc]
return b
def _check_closed(self) -> None: