self._server = server
self._buffer = collections.deque()
+ self._buffer_size = 0
self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called
return
if self._buffer:
self._buffer.clear()
+ self._buffer_size = 0
self._loop._remove_writer(self._sock_fd)
if not self._closing:
self._closing = True
self._server = None
def get_write_buffer_size(self):
- return sum(map(len, self._buffer))
+ return self._buffer_size
def _add_reader(self, fd, callback, *args):
if not self.is_reading():
# Add it to the buffer.
self._buffer.append(data)
+ self._buffer_size += len(data)
self._maybe_pause_protocol()
def _get_sendmsg_buffer(self):
except BaseException as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
+ self._buffer_size = 0
self._fatal_error(exc, 'Fatal write error on socket transport')
if self._empty_waiter is not None:
self._empty_waiter.set_exception(exc)
self._sock.shutdown(socket.SHUT_WR)
def _adjust_leftover_buffer(self, nbytes: int) -> None:
+ self._buffer_size -= nbytes
buffer = self._buffer
while nbytes:
b = buffer.popleft()
if n != len(buffer):
# Not all data was written
self._buffer.appendleft(buffer[n:])
+ self._buffer_size -= n
except (BlockingIOError, InterruptedError):
- pass
+ self._buffer.appendleft(buffer)
+ return
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
+ self._buffer_size = 0
self._fatal_error(exc, 'Fatal write error on socket transport')
if self._empty_waiter is not None:
self._empty_waiter.set_exception(exc)
self._conn_lost += 1
return
- self._buffer.extend([memoryview(data) for data in list_of_data])
+ for data in list_of_data:
+ self._buffer.append(memoryview(data))
+ self._buffer_size += len(data)
self._write_ready()
# If the entire buffer couldn't be written, register a write handler
if self._buffer: