if hasattr(errno, "WSAEINPROGRESS"):
_ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore
+_WINDOWS = sys.platform.startswith('win')
+
class StreamClosedError(IOError):
"""Exception raised by `IOStream` methods when the stream is closed.
self.max_buffer_size // 2)
self.max_write_buffer_size = max_write_buffer_size
self.error = None
- self._read_buffer = collections.deque()
- self._write_buffer = collections.deque()
+ self._read_buffer = bytearray()
+ self._read_buffer_pos = 0
self._read_buffer_size = 0
+ self._write_buffer = bytearray()
+ self._write_buffer_pos = 0
self._write_buffer_size = 0
self._write_buffer_frozen = False
self._read_delimiter = None
"""
assert isinstance(data, bytes)
self._check_closed()
- # We use bool(_write_buffer) as a proxy for write_buffer_size>0,
- # so never put empty strings in the buffer.
if data:
if (self.max_write_buffer_size is not None and
self._write_buffer_size + len(data) > self.max_write_buffer_size):
raise StreamBufferFullError("Reached maximum write buffer size")
- # Break up large contiguous strings before inserting them in the
- # write buffer, so we don't have to recopy the entire thing
- # as we slice off pieces to send to the socket.
- WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
- for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
- self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
+ self._write_buffer += data
self._write_buffer_size += len(data)
if callback is not None:
self._write_callback = stack_context.wrap(callback)
future.add_done_callback(lambda f: f.exception())
if not self._connecting:
self._handle_write()
- if self._write_buffer:
+ if self._write_buffer_size:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
return future
# if the IOStream object is kept alive by a reference cycle.
# TODO: Clear the read buffer too; it currently breaks some tests.
self._write_buffer = None
+ self._write_buffer_size = 0
def reading(self):
"""Returns true if we are currently reading from the stream."""
def writing(self):
"""Returns true if we are currently writing to the stream."""
- return bool(self._write_buffer)
+ return self._write_buffer_size > 0
def closed(self):
"""Returns true if the stream has been closed."""
break
if chunk is None:
return 0
- self._read_buffer.append(chunk)
+ self._read_buffer += chunk
self._read_buffer_size += len(chunk)
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
# since large merges are relatively expensive and get undone in
# _consume().
if self._read_buffer:
- while True:
- loc = self._read_buffer[0].find(self._read_delimiter)
- if loc != -1:
- delimiter_len = len(self._read_delimiter)
- self._check_max_bytes(self._read_delimiter,
- loc + delimiter_len)
- return loc + delimiter_len
- if len(self._read_buffer) == 1:
- break
- _double_prefix(self._read_buffer)
+ loc = self._read_buffer.find(self._read_delimiter,
+ self._read_buffer_pos)
+ if loc != -1:
+ loc -= self._read_buffer_pos
+ delimiter_len = len(self._read_delimiter)
+ self._check_max_bytes(self._read_delimiter,
+ loc + delimiter_len)
+ return loc + delimiter_len
self._check_max_bytes(self._read_delimiter,
- len(self._read_buffer[0]))
+ self._read_buffer_size)
elif self._read_regex is not None:
if self._read_buffer:
- while True:
- m = self._read_regex.search(self._read_buffer[0])
- if m is not None:
- self._check_max_bytes(self._read_regex, m.end())
- return m.end()
- if len(self._read_buffer) == 1:
- break
- _double_prefix(self._read_buffer)
- self._check_max_bytes(self._read_regex,
- len(self._read_buffer[0]))
+ m = self._read_regex.search(self._read_buffer,
+ self._read_buffer_pos)
+ if m is not None:
+ loc = m.end() - self._read_buffer_pos
+ self._check_max_bytes(self._read_regex, loc)
+ return loc
+ self._check_max_bytes(self._read_regex, self._read_buffer_size)
return None
def _check_max_bytes(self, delimiter, size):
delimiter, self._read_max_bytes))
def _handle_write(self):
- while self._write_buffer:
+ while self._write_buffer_size:
try:
- if not self._write_buffer_frozen:
+ start = self._write_buffer_pos
+ if _WINDOWS:
# On windows, socket.send blows up if given a
# write buffer that's too large, instead of just
# returning the number of bytes it was able to
# process. Therefore we must not call socket.send
# with more than 128KB at a time.
- _merge_prefix(self._write_buffer, 128 * 1024)
- num_bytes = self.write_to_fd(self._write_buffer[0])
+ stop = start + 128 * 1024
+ else:
+ stop = None
+ num_bytes = self.write_to_fd(
+ memoryview(self._write_buffer)[start:stop])
+ assert self._write_buffer_size >= 0
if num_bytes == 0:
# With OpenSSL, if we couldn't write the entire buffer,
# the very same string object must be used on the
self._write_buffer_frozen = True
break
self._write_buffer_frozen = False
- _merge_prefix(self._write_buffer, num_bytes)
- self._write_buffer.popleft()
+ self._write_buffer_pos += num_bytes
self._write_buffer_size -= num_bytes
+ # Amortized O(1) shrink
+ if self._write_buffer_pos > self._write_buffer_size:
+ self._write_buffer = self._write_buffer[self._write_buffer_pos:]
+ self._write_buffer_pos = 0
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
self._write_buffer_frozen = True
self.fileno(), e)
self.close(exc_info=True)
return
- if not self._write_buffer:
+ if not self._write_buffer_size:
if self._write_callback:
callback = self._write_callback
self._write_callback = None
future.set_result(None)
def _consume(self, loc):
+ # Consume loc bytes from the read buffer and return them
if loc == 0:
return b""
- _merge_prefix(self._read_buffer, loc)
+ assert loc <= self._read_buffer_size
+ # Slice the bytearray buffer into bytes, without intermediate copying
+ b = (memoryview(self._read_buffer)
+ [self._read_buffer_pos:self._read_buffer_pos + loc]
+ ).tobytes()
+ self._read_buffer_pos += loc
self._read_buffer_size -= loc
- return self._read_buffer.popleft()
+ # Amortized O(1) shrink
+ if self._read_buffer_pos > self._read_buffer_size:
+ self._read_buffer = self._read_buffer[self._read_buffer_pos:]
+ self._read_buffer_pos = 0
+ return b
def _check_closed(self):
if self.closed():
return chunk
-def _double_prefix(deque):
- """Grow by doubling, but don't split the second chunk just because the
- first one is small.
- """
- new_len = max(len(deque[0]) * 2,
- (len(deque[0]) + len(deque[1])))
- _merge_prefix(deque, new_len)
-
-
-def _merge_prefix(deque, size):
- """Replace the first entries in a deque of strings with a single
- string of up to size bytes.
-
- >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
- >>> _merge_prefix(d, 5); print(d)
- deque(['abcde', 'fghi', 'j'])
-
- Strings will be split as necessary to reach the desired size.
- >>> _merge_prefix(d, 7); print(d)
- deque(['abcdefg', 'hi', 'j'])
-
- >>> _merge_prefix(d, 3); print(d)
- deque(['abc', 'defg', 'hi', 'j'])
-
- >>> _merge_prefix(d, 100); print(d)
- deque(['abcdefghij'])
- """
- if len(deque) == 1 and len(deque[0]) <= size:
- return
- prefix = []
- remaining = size
- while deque and remaining > 0:
- chunk = deque.popleft()
- if len(chunk) > remaining:
- deque.appendleft(chunk[remaining:])
- chunk = chunk[:remaining]
- prefix.append(chunk)
- remaining -= len(chunk)
- # This data structure normally just contains byte strings, but
- # the unittest gets messy if it doesn't use the default str() type,
- # so do the merge based on the type of data that's actually present.
- if prefix:
- deque.appendleft(type(prefix[0])().join(prefix))
- if not deque:
- deque.appendleft(b"")
-
-
def doctests():
import doctest
return doctest.DocTestSuite()