From: Justin Bronder Date: Wed, 30 Jul 2025 18:11:28 +0000 (-0400) Subject: gh-135444: fix `asyncio.DatagramTransport.sendto` to account for datagram header... X-Git-Tag: v3.15.0a1~820 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=e3ea8613519bd08aa6ce7d142403e644ae32d843;p=thirdparty%2FPython%2Fcpython.git gh-135444: fix `asyncio.DatagramTransport.sendto` to account for datagram header size when data cannot be sent (#135445) Co-authored-by: Kumar Aditya --- diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 7eb55bd63ddb..f404273c3ae5 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -460,6 +460,8 @@ class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): class _ProactorDatagramTransport(_ProactorBasePipeTransport, transports.DatagramTransport): max_size = 256 * 1024 + _header_size = 8 + def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): self._address = address @@ -499,7 +501,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport, # Ensure that what we buffer is immutable. self._buffer.append((bytes(data), addr)) - self._buffer_size += len(data) + 8 # include header bytes + self._buffer_size += len(data) + self._header_size if self._write_fut is None: # No current write operations are active, kick one off @@ -526,7 +528,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport, return data, addr = self._buffer.popleft() - self._buffer_size -= len(data) + self._buffer_size -= len(data) + self._header_size if self._address is not None: self._write_fut = self._loop._proactor.send(self._sock, data) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 6ad84044adf1..3505d4bb6bd1 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1212,6 +1212,7 @@ class _SelectorSocketTransport(_SelectorTransport): class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): _buffer_factory = collections.deque + _header_size = 8 def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): @@ -1285,13 +1286,13 @@ class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTranspor # Ensure that what we buffer is immutable. self._buffer.append((bytes(data), addr)) - self._buffer_size += len(data) + 8 # include header bytes + self._buffer_size += len(data) + self._header_size self._maybe_pause_protocol() def _sendto_ready(self): while self._buffer: data, addr = self._buffer.popleft() - self._buffer_size -= len(data) + self._buffer_size -= len(data) + self._header_size try: if self._extra['peername']: self._sock.send(data) @@ -1299,7 +1300,7 @@ class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTranspor self._sock.sendto(data, addr) except (BlockingIOError, InterruptedError): self._buffer.appendleft((data, addr)) # Try again later. - self._buffer_size += len(data) + self._buffer_size += len(data) + self._header_size break except OSError as exc: self._protocol.error_received(exc) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index b25daaface08..edfad5e11db3 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -566,6 +566,8 @@ class ProactorDatagramTransportTests(test_utils.TestCase): self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, data, addr=('0.0.0.0', 1234)) + self.assertFalse(transport._buffer) + self.assertEqual(0, transport._buffer_size) def test_sendto_bytearray(self): data = bytearray(b'data') diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 7b6d1bce5e46..9d77e7e5889d 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1497,6 +1497,47 @@ class SelectorDatagramTransportTests(test_utils.TestCase): transport.sendto(b'data', (1,)) self.assertEqual(transport._conn_lost, 2) + def test_sendto_sendto_ready(self): + data = b'data' + + # First queue up the buffer by having the socket blocked + self.sock.sendto.side_effect = BlockingIOError + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 12345)) + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + + # Now let the socket send the buffer + self.sock.sendto.side_effect = None + transport._sendto_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) + self.assertFalse(self.loop.writers) + self.assertFalse(transport._buffer) + self.assertEqual(transport._buffer_size, 0) + + def test_sendto_sendto_ready_blocked(self): + data = b'data' + + # First queue up the buffer by having the socket blocked + self.sock.sendto.side_effect = BlockingIOError + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 12345)) + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + + # Now try to send the buffer, it will be added to buffer again if it fails + transport._sendto_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) + self.assertTrue(self.loop.writers) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + def test_sendto_ready(self): data = b'data' self.sock.sendto.return_value = len(data) diff --git a/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst b/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst new file mode 100644 index 000000000000..e1182f56eb33 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst @@ -0,0 +1,2 @@ +Fix :meth:`asyncio.DatagramTransport.sendto` to account for datagram header size when +data cannot be sent.