class _ProactorDatagramTransport(_ProactorBasePipeTransport,
transports.DatagramTransport):
max_size = 256 * 1024
+ _header_size = 8
+
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
self._address = address
# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
- self._buffer_size += len(data) + 8 # include header bytes
+ self._buffer_size += len(data) + self._header_size
if self._write_fut is None:
# No current write operations are active, kick one off
return
data, addr = self._buffer.popleft()
- self._buffer_size -= len(data)
+ self._buffer_size -= len(data) + self._header_size
if self._address is not None:
self._write_fut = self._loop._proactor.send(self._sock,
data)
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
_buffer_factory = collections.deque
+ _header_size = 8
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
- self._buffer_size += len(data) + 8 # include header bytes
+ self._buffer_size += len(data) + self._header_size
self._maybe_pause_protocol()
def _sendto_ready(self):
while self._buffer:
data, addr = self._buffer.popleft()
- self._buffer_size -= len(data)
+ self._buffer_size -= len(data) + self._header_size
try:
if self._extra['peername']:
self._sock.send(data)
self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
- self._buffer_size += len(data)
+ self._buffer_size += len(data) + self._header_size
break
except OSError as exc:
self._protocol.error_received(exc)
self.assertTrue(self.proactor.sendto.called)
self.proactor.sendto.assert_called_with(
self.sock, data, addr=('0.0.0.0', 1234))
+ self.assertFalse(transport._buffer)
+ self.assertEqual(0, transport._buffer_size)
def test_sendto_bytearray(self):
data = bytearray(b'data')
transport.sendto(b'data', (1,))
self.assertEqual(transport._conn_lost, 2)
+ def test_sendto_sendto_ready(self):
+ data = b'data'
+
+ # First queue up the buffer by having the socket blocked
+ self.sock.sendto.side_effect = BlockingIOError
+ transport = self.datagram_transport()
+ transport.sendto(data, ('0.0.0.0', 12345))
+ self.loop.assert_writer(7, transport._sendto_ready)
+ self.assertEqual(1, len(transport._buffer))
+ self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
+
+ # Now let the socket send the buffer
+ self.sock.sendto.side_effect = None
+ transport._sendto_ready()
+ self.assertTrue(self.sock.sendto.called)
+ self.assertEqual(
+ self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
+ self.assertFalse(self.loop.writers)
+ self.assertFalse(transport._buffer)
+ self.assertEqual(transport._buffer_size, 0)
+
+ def test_sendto_sendto_ready_blocked(self):
+ data = b'data'
+
+ # First queue up the buffer by having the socket blocked
+ self.sock.sendto.side_effect = BlockingIOError
+ transport = self.datagram_transport()
+ transport.sendto(data, ('0.0.0.0', 12345))
+ self.loop.assert_writer(7, transport._sendto_ready)
+ self.assertEqual(1, len(transport._buffer))
+ self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
+
+ # Now try to send the buffer, it will be added to buffer again if it fails
+ transport._sendto_ready()
+ self.assertTrue(self.sock.sendto.called)
+ self.assertEqual(
+ self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
+ self.assertTrue(self.loop.writers)
+ self.assertEqual(1, len(transport._buffer))
+ self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
+
def test_sendto_ready(self):
data = b'data'
self.sock.sendto.return_value = len(data)
--- /dev/null
+Fix :meth:`asyncio.DatagramTransport.sendto` to account for datagram header size when
+data cannot be sent.