From: Antoine Pitrou Date: Tue, 1 Nov 2016 20:28:17 +0000 (+0100) Subject: Use bytearray buffers in IOStream X-Git-Tag: v4.5.0~32^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=992a00e079ae5a874d0428ef2b6f5128462f3d1f;p=thirdparty%2Ftornado.git Use bytearray buffers in IOStream --- diff --git a/tornado/iostream.py b/tornado/iostream.py index bcf444148..aee114d66 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -82,6 +82,8 @@ _ERRNO_INPROGRESS = (errno.EINPROGRESS,) if hasattr(errno, "WSAEINPROGRESS"): _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore +_WINDOWS = sys.platform.startswith('win') + class StreamClosedError(IOError): """Exception raised by `IOStream` methods when the stream is closed. @@ -158,9 +160,11 @@ class BaseIOStream(object): self.max_buffer_size // 2) self.max_write_buffer_size = max_write_buffer_size self.error = None - self._read_buffer = collections.deque() - self._write_buffer = collections.deque() + self._read_buffer = bytearray() + self._read_buffer_pos = 0 self._read_buffer_size = 0 + self._write_buffer = bytearray() + self._write_buffer_pos = 0 self._write_buffer_size = 0 self._write_buffer_frozen = False self._read_delimiter = None @@ -375,18 +379,11 @@ class BaseIOStream(object): """ assert isinstance(data, bytes) self._check_closed() - # We use bool(_write_buffer) as a proxy for write_buffer_size>0, - # so never put empty strings in the buffer. if data: if (self.max_write_buffer_size is not None and self._write_buffer_size + len(data) > self.max_write_buffer_size): raise StreamBufferFullError("Reached maximum write buffer size") - # Break up large contiguous strings before inserting them in the - # write buffer, so we don't have to recopy the entire thing - # as we slice off pieces to send to the socket. - WRITE_BUFFER_CHUNK_SIZE = 128 * 1024 - for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE): - self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE]) + self._write_buffer += data self._write_buffer_size += len(data) if callback is not None: self._write_callback = stack_context.wrap(callback) @@ -396,7 +393,7 @@ class BaseIOStream(object): future.add_done_callback(lambda f: f.exception()) if not self._connecting: self._handle_write() - if self._write_buffer: + if self._write_buffer_size: self._add_io_state(self.io_loop.WRITE) self._maybe_add_error_listener() return future @@ -466,6 +463,7 @@ class BaseIOStream(object): # if the IOStream object is kept alive by a reference cycle. # TODO: Clear the read buffer too; it currently breaks some tests. self._write_buffer = None + self._write_buffer_size = 0 def reading(self): """Returns true if we are currently reading from the stream.""" @@ -473,7 +471,7 @@ class BaseIOStream(object): def writing(self): """Returns true if we are currently writing to the stream.""" - return bool(self._write_buffer) + return self._write_buffer_size > 0 def closed(self): """Returns true if the stream has been closed.""" @@ -743,7 +741,7 @@ class BaseIOStream(object): break if chunk is None: return 0 - self._read_buffer.append(chunk) + self._read_buffer += chunk self._read_buffer_size += len(chunk) if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") @@ -791,30 +789,25 @@ class BaseIOStream(object): # since large merges are relatively expensive and get undone in # _consume(). if self._read_buffer: - while True: - loc = self._read_buffer[0].find(self._read_delimiter) - if loc != -1: - delimiter_len = len(self._read_delimiter) - self._check_max_bytes(self._read_delimiter, - loc + delimiter_len) - return loc + delimiter_len - if len(self._read_buffer) == 1: - break - _double_prefix(self._read_buffer) + loc = self._read_buffer.find(self._read_delimiter, + self._read_buffer_pos) + if loc != -1: + loc -= self._read_buffer_pos + delimiter_len = len(self._read_delimiter) + self._check_max_bytes(self._read_delimiter, + loc + delimiter_len) + return loc + delimiter_len self._check_max_bytes(self._read_delimiter, - len(self._read_buffer[0])) + self._read_buffer_size) elif self._read_regex is not None: if self._read_buffer: - while True: - m = self._read_regex.search(self._read_buffer[0]) - if m is not None: - self._check_max_bytes(self._read_regex, m.end()) - return m.end() - if len(self._read_buffer) == 1: - break - _double_prefix(self._read_buffer) - self._check_max_bytes(self._read_regex, - len(self._read_buffer[0])) + m = self._read_regex.search(self._read_buffer, + self._read_buffer_pos) + if m is not None: + loc = m.end() - self._read_buffer_pos + self._check_max_bytes(self._read_regex, loc) + return loc + self._check_max_bytes(self._read_regex, self._read_buffer_size) return None def _check_max_bytes(self, delimiter, size): @@ -825,16 +818,21 @@ class BaseIOStream(object): delimiter, self._read_max_bytes)) def _handle_write(self): - while self._write_buffer: + while self._write_buffer_size: try: - if not self._write_buffer_frozen: + start = self._write_buffer_pos + if _WINDOWS: # On windows, socket.send blows up if given a # write buffer that's too large, instead of just # returning the number of bytes it was able to # process. Therefore we must not call socket.send # with more than 128KB at a time. - _merge_prefix(self._write_buffer, 128 * 1024) - num_bytes = self.write_to_fd(self._write_buffer[0]) + stop = start + 128 * 1024 + else: + stop = None + num_bytes = self.write_to_fd( + memoryview(self._write_buffer)[start:stop]) + assert self._write_buffer_size >= 0 if num_bytes == 0: # With OpenSSL, if we couldn't write the entire buffer, # the very same string object must be used on the @@ -847,9 +845,12 @@ class BaseIOStream(object): self._write_buffer_frozen = True break self._write_buffer_frozen = False - _merge_prefix(self._write_buffer, num_bytes) - self._write_buffer.popleft() + self._write_buffer_pos += num_bytes self._write_buffer_size -= num_bytes + # Amortized O(1) shrink + if self._write_buffer_pos > self._write_buffer_size: + self._write_buffer = self._write_buffer[self._write_buffer_pos:] + self._write_buffer_pos = 0 except (socket.error, IOError, OSError) as e: if e.args[0] in _ERRNO_WOULDBLOCK: self._write_buffer_frozen = True @@ -863,7 +864,7 @@ class BaseIOStream(object): self.fileno(), e) self.close(exc_info=True) return - if not self._write_buffer: + if not self._write_buffer_size: if self._write_callback: callback = self._write_callback self._write_callback = None @@ -874,11 +875,21 @@ class BaseIOStream(object): future.set_result(None) def _consume(self, loc): + # Consume loc bytes from the read buffer and return them if loc == 0: return b"" - _merge_prefix(self._read_buffer, loc) + assert loc <= self._read_buffer_size + # Slice the bytearray buffer into bytes, without intermediate copying + b = (memoryview(self._read_buffer) + [self._read_buffer_pos:self._read_buffer_pos + loc] + ).tobytes() + self._read_buffer_pos += loc self._read_buffer_size -= loc - return self._read_buffer.popleft() + # Amortized O(1) shrink + if self._read_buffer_pos > self._read_buffer_size: + self._read_buffer = self._read_buffer[self._read_buffer_pos:] + self._read_buffer_pos = 0 + return b def _check_closed(self): if self.closed(): @@ -1498,53 +1509,6 @@ class PipeIOStream(BaseIOStream): return chunk -def _double_prefix(deque): - """Grow by doubling, but don't split the second chunk just because the - first one is small. - """ - new_len = max(len(deque[0]) * 2, - (len(deque[0]) + len(deque[1]))) - _merge_prefix(deque, new_len) - - -def _merge_prefix(deque, size): - """Replace the first entries in a deque of strings with a single - string of up to size bytes. - - >>> d = collections.deque(['abc', 'de', 'fghi', 'j']) - >>> _merge_prefix(d, 5); print(d) - deque(['abcde', 'fghi', 'j']) - - Strings will be split as necessary to reach the desired size. - >>> _merge_prefix(d, 7); print(d) - deque(['abcdefg', 'hi', 'j']) - - >>> _merge_prefix(d, 3); print(d) - deque(['abc', 'defg', 'hi', 'j']) - - >>> _merge_prefix(d, 100); print(d) - deque(['abcdefghij']) - """ - if len(deque) == 1 and len(deque[0]) <= size: - return - prefix = [] - remaining = size - while deque and remaining > 0: - chunk = deque.popleft() - if len(chunk) > remaining: - deque.appendleft(chunk[remaining:]) - chunk = chunk[:remaining] - prefix.append(chunk) - remaining -= len(chunk) - # This data structure normally just contains byte strings, but - # the unittest gets messy if it doesn't use the default str() type, - # so do the merge based on the type of data that's actually present. - if prefix: - deque.appendleft(type(prefix[0])().join(prefix)) - if not deque: - deque.appendleft(b"") - - def doctests(): import doctest return doctest.DocTestSuite()