]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Use bytearray buffers in IOStream
authorAntoine Pitrou <antoine@python.org>
Tue, 1 Nov 2016 20:28:17 +0000 (21:28 +0100)
committerAntoine Pitrou <antoine@python.org>
Tue, 1 Nov 2016 20:28:17 +0000 (21:28 +0100)
tornado/iostream.py

index bcf444148c0ff2b639571ccb8796d569d8c7bf53..aee114d6636f13a33922da0dd1475a7da0fa0b79 100644 (file)
@@ -82,6 +82,8 @@ _ERRNO_INPROGRESS = (errno.EINPROGRESS,)
 if hasattr(errno, "WSAEINPROGRESS"):
     _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)  # type: ignore
 
+_WINDOWS = sys.platform.startswith('win')
+
 
 class StreamClosedError(IOError):
     """Exception raised by `IOStream` methods when the stream is closed.
@@ -158,9 +160,11 @@ class BaseIOStream(object):
                                    self.max_buffer_size // 2)
         self.max_write_buffer_size = max_write_buffer_size
         self.error = None
-        self._read_buffer = collections.deque()
-        self._write_buffer = collections.deque()
+        self._read_buffer = bytearray()
+        self._read_buffer_pos = 0
         self._read_buffer_size = 0
+        self._write_buffer = bytearray()
+        self._write_buffer_pos = 0
         self._write_buffer_size = 0
         self._write_buffer_frozen = False
         self._read_delimiter = None
@@ -375,18 +379,11 @@ class BaseIOStream(object):
         """
         assert isinstance(data, bytes)
         self._check_closed()
-        # We use bool(_write_buffer) as a proxy for write_buffer_size>0,
-        # so never put empty strings in the buffer.
         if data:
             if (self.max_write_buffer_size is not None and
                     self._write_buffer_size + len(data) > self.max_write_buffer_size):
                 raise StreamBufferFullError("Reached maximum write buffer size")
-            # Break up large contiguous strings before inserting them in the
-            # write buffer, so we don't have to recopy the entire thing
-            # as we slice off pieces to send to the socket.
-            WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
-            for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
-                self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
+            self._write_buffer += data
             self._write_buffer_size += len(data)
         if callback is not None:
             self._write_callback = stack_context.wrap(callback)
@@ -396,7 +393,7 @@ class BaseIOStream(object):
             future.add_done_callback(lambda f: f.exception())
         if not self._connecting:
             self._handle_write()
-            if self._write_buffer:
+            if self._write_buffer_size:
                 self._add_io_state(self.io_loop.WRITE)
             self._maybe_add_error_listener()
         return future
@@ -466,6 +463,7 @@ class BaseIOStream(object):
             # if the IOStream object is kept alive by a reference cycle.
             # TODO: Clear the read buffer too; it currently breaks some tests.
             self._write_buffer = None
+            self._write_buffer_size = 0
 
     def reading(self):
         """Returns true if we are currently reading from the stream."""
@@ -473,7 +471,7 @@ class BaseIOStream(object):
 
     def writing(self):
         """Returns true if we are currently writing to the stream."""
-        return bool(self._write_buffer)
+        return self._write_buffer_size > 0
 
     def closed(self):
         """Returns true if the stream has been closed."""
@@ -743,7 +741,7 @@ class BaseIOStream(object):
             break
         if chunk is None:
             return 0
-        self._read_buffer.append(chunk)
+        self._read_buffer += chunk
         self._read_buffer_size += len(chunk)
         if self._read_buffer_size > self.max_buffer_size:
             gen_log.error("Reached maximum read buffer size")
@@ -791,30 +789,25 @@ class BaseIOStream(object):
             # since large merges are relatively expensive and get undone in
             # _consume().
             if self._read_buffer:
-                while True:
-                    loc = self._read_buffer[0].find(self._read_delimiter)
-                    if loc != -1:
-                        delimiter_len = len(self._read_delimiter)
-                        self._check_max_bytes(self._read_delimiter,
-                                              loc + delimiter_len)
-                        return loc + delimiter_len
-                    if len(self._read_buffer) == 1:
-                        break
-                    _double_prefix(self._read_buffer)
+                loc = self._read_buffer.find(self._read_delimiter,
+                                             self._read_buffer_pos)
+                if loc != -1:
+                    loc -= self._read_buffer_pos
+                    delimiter_len = len(self._read_delimiter)
+                    self._check_max_bytes(self._read_delimiter,
+                                          loc + delimiter_len)
+                    return loc + delimiter_len
                 self._check_max_bytes(self._read_delimiter,
-                                      len(self._read_buffer[0]))
+                                      self._read_buffer_size)
         elif self._read_regex is not None:
             if self._read_buffer:
-                while True:
-                    m = self._read_regex.search(self._read_buffer[0])
-                    if m is not None:
-                        self._check_max_bytes(self._read_regex, m.end())
-                        return m.end()
-                    if len(self._read_buffer) == 1:
-                        break
-                    _double_prefix(self._read_buffer)
-                self._check_max_bytes(self._read_regex,
-                                      len(self._read_buffer[0]))
+                m = self._read_regex.search(self._read_buffer,
+                                            self._read_buffer_pos)
+                if m is not None:
+                    loc = m.end() - self._read_buffer_pos
+                    self._check_max_bytes(self._read_regex, loc)
+                    return loc
+                self._check_max_bytes(self._read_regex, self._read_buffer_size)
         return None
 
     def _check_max_bytes(self, delimiter, size):
@@ -825,16 +818,21 @@ class BaseIOStream(object):
                     delimiter, self._read_max_bytes))
 
     def _handle_write(self):
-        while self._write_buffer:
+        while self._write_buffer_size:
             try:
-                if not self._write_buffer_frozen:
+                start = self._write_buffer_pos
+                if _WINDOWS:
                     # On windows, socket.send blows up if given a
                     # write buffer that's too large, instead of just
                     # returning the number of bytes it was able to
                     # process.  Therefore we must not call socket.send
                     # with more than 128KB at a time.
-                    _merge_prefix(self._write_buffer, 128 * 1024)
-                num_bytes = self.write_to_fd(self._write_buffer[0])
+                    stop = start + 128 * 1024
+                else:
+                    stop = None
+                num_bytes = self.write_to_fd(
+                    memoryview(self._write_buffer)[start:stop])
+                assert self._write_buffer_size >= 0
                 if num_bytes == 0:
                     # With OpenSSL, if we couldn't write the entire buffer,
                     # the very same string object must be used on the
@@ -847,9 +845,12 @@ class BaseIOStream(object):
                     self._write_buffer_frozen = True
                     break
                 self._write_buffer_frozen = False
-                _merge_prefix(self._write_buffer, num_bytes)
-                self._write_buffer.popleft()
+                self._write_buffer_pos += num_bytes
                 self._write_buffer_size -= num_bytes
+                # Amortized O(1) shrink
+                if self._write_buffer_pos > self._write_buffer_size:
+                    self._write_buffer = self._write_buffer[self._write_buffer_pos:]
+                    self._write_buffer_pos = 0
             except (socket.error, IOError, OSError) as e:
                 if e.args[0] in _ERRNO_WOULDBLOCK:
                     self._write_buffer_frozen = True
@@ -863,7 +864,7 @@ class BaseIOStream(object):
                                         self.fileno(), e)
                     self.close(exc_info=True)
                     return
-        if not self._write_buffer:
+        if not self._write_buffer_size:
             if self._write_callback:
                 callback = self._write_callback
                 self._write_callback = None
@@ -874,11 +875,21 @@ class BaseIOStream(object):
                 future.set_result(None)
 
     def _consume(self, loc):
+        # Consume loc bytes from the read buffer and return them
         if loc == 0:
             return b""
-        _merge_prefix(self._read_buffer, loc)
+        assert loc <= self._read_buffer_size
+        # Slice the bytearray buffer into bytes, without intermediate copying
+        b = (memoryview(self._read_buffer)
+                       [self._read_buffer_pos:self._read_buffer_pos + loc]
+                       ).tobytes()
+        self._read_buffer_pos += loc
         self._read_buffer_size -= loc
-        return self._read_buffer.popleft()
+        # Amortized O(1) shrink
+        if self._read_buffer_pos > self._read_buffer_size:
+            self._read_buffer = self._read_buffer[self._read_buffer_pos:]
+            self._read_buffer_pos = 0
+        return b
 
     def _check_closed(self):
         if self.closed():
@@ -1498,53 +1509,6 @@ class PipeIOStream(BaseIOStream):
         return chunk
 
 
-def _double_prefix(deque):
-    """Grow by doubling, but don't split the second chunk just because the
-    first one is small.
-    """
-    new_len = max(len(deque[0]) * 2,
-                  (len(deque[0]) + len(deque[1])))
-    _merge_prefix(deque, new_len)
-
-
-def _merge_prefix(deque, size):
-    """Replace the first entries in a deque of strings with a single
-    string of up to size bytes.
-
-    >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
-    >>> _merge_prefix(d, 5); print(d)
-    deque(['abcde', 'fghi', 'j'])
-
-    Strings will be split as necessary to reach the desired size.
-    >>> _merge_prefix(d, 7); print(d)
-    deque(['abcdefg', 'hi', 'j'])
-
-    >>> _merge_prefix(d, 3); print(d)
-    deque(['abc', 'defg', 'hi', 'j'])
-
-    >>> _merge_prefix(d, 100); print(d)
-    deque(['abcdefghij'])
-    """
-    if len(deque) == 1 and len(deque[0]) <= size:
-        return
-    prefix = []
-    remaining = size
-    while deque and remaining > 0:
-        chunk = deque.popleft()
-        if len(chunk) > remaining:
-            deque.appendleft(chunk[remaining:])
-            chunk = chunk[:remaining]
-        prefix.append(chunk)
-        remaining -= len(chunk)
-    # This data structure normally just contains byte strings, but
-    # the unittest gets messy if it doesn't use the default str() type,
-    # so do the merge based on the type of data that's actually present.
-    if prefix:
-        deque.appendleft(type(prefix[0])().join(prefix))
-    if not deque:
-        deque.appendleft(b"")
-
-
 def doctests():
     import doctest
     return doctest.DocTestSuite()