]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.12] gh-91227: Ignore ERROR_PORT_UNREACHABLE in proactor recvfrom() (GH-32011)...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Mon, 25 Mar 2024 10:21:31 +0000 (11:21 +0100)
committerGitHub <noreply@github.com>
Mon, 25 Mar 2024 10:21:31 +0000 (12:21 +0200)
(cherry picked from commit f11d0d8be8af28e1368c3c7c116218cf65ddf93e)

Co-authored-by: Erik Soma <stillusingirc@gmail.com>
Lib/asyncio/windows_events.py
Lib/test/test_asyncio/test_events.py
Lib/test/test_asyncio/test_sock_lowlevel.py
Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst [new file with mode: 0644]
Modules/overlapped.c

index c71f40159d2bdc8aeb2551df93daab813da66235..cb613451a58304989f7dd0f8e6944e51c116c8d6 100644 (file)
@@ -8,6 +8,7 @@ if sys.platform != 'win32':  # pragma: no cover
 import _overlapped
 import _winapi
 import errno
+from functools import partial
 import math
 import msvcrt
 import socket
@@ -466,6 +467,18 @@ class IocpProactor:
             else:
                 raise
 
+    @classmethod
+    def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
+        try:
+            return cls.finish_socket_func(trans, key, ov)
+        except OSError as exc:
+            # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
+            # socket is used to send to an address that is not listening.
+            if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
+                return empty_result, None
+            else:
+                raise
+
     def recv(self, conn, nbytes, flags=0):
         self._register_with_iocp(conn)
         ov = _overlapped.Overlapped(NULL)
@@ -500,7 +513,8 @@ class IocpProactor:
         except BrokenPipeError:
             return self._result((b'', None))
 
-        return self._register(ov, conn, self.finish_socket_func)
+        return self._register(ov, conn, partial(self._finish_recvfrom,
+                                                empty_result=b''))
 
     def recvfrom_into(self, conn, buf, flags=0):
         self._register_with_iocp(conn)
@@ -510,17 +524,8 @@ class IocpProactor:
         except BrokenPipeError:
             return self._result((0, None))
 
-        def finish_recv(trans, key, ov):
-            try:
-                return ov.getresult()
-            except OSError as exc:
-                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
-                                    _overlapped.ERROR_OPERATION_ABORTED):
-                    raise ConnectionResetError(*exc.args)
-                else:
-                    raise
-
-        return self._register(ov, conn, finish_recv)
+        return self._register(ov, conn, partial(self._finish_recvfrom,
+                                                empty_result=0))
 
     def sendto(self, conn, buf, flags=0, addr=None):
         self._register_with_iocp(conn)
index 81e2956c1711a0a74a52c200072b8286f72709a4..f25580371a27d746e226a9f6b1249486f9f9c342 100644 (file)
@@ -1379,6 +1379,80 @@ class EventLoopTestsMixin:
         tr.close()
         self.loop.run_until_complete(pr.done)
 
+    def test_datagram_send_to_non_listening_address(self):
+        # see:
+        #   https://github.com/python/cpython/issues/91227
+        #   https://github.com/python/cpython/issues/88906
+        #   https://bugs.python.org/issue47071
+        #   https://bugs.python.org/issue44743
+        # The Proactor event loop would fail to receive datagram messages after
+        # sending a message to an address that wasn't listening.
+        loop = self.loop
+
+        class Protocol(asyncio.DatagramProtocol):
+
+            _received_datagram = None
+
+            def datagram_received(self, data, addr):
+                self._received_datagram.set_result(data)
+
+            async def wait_for_datagram_received(self):
+                self._received_datagram = loop.create_future()
+                result = await asyncio.wait_for(self._received_datagram, 10)
+                self._received_datagram = None
+                return result
+
+        def create_socket():
+            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            sock.setblocking(False)
+            sock.bind(('127.0.0.1', 0))
+            return sock
+
+        socket_1 = create_socket()
+        transport_1, protocol_1 = loop.run_until_complete(
+            loop.create_datagram_endpoint(Protocol, sock=socket_1)
+        )
+        addr_1 = socket_1.getsockname()
+
+        socket_2 = create_socket()
+        transport_2, protocol_2 = loop.run_until_complete(
+            loop.create_datagram_endpoint(Protocol, sock=socket_2)
+        )
+        addr_2 = socket_2.getsockname()
+
+        # creating and immediately closing this to try to get an address that
+        # is not listening
+        socket_3 = create_socket()
+        transport_3, protocol_3 = loop.run_until_complete(
+            loop.create_datagram_endpoint(Protocol, sock=socket_3)
+        )
+        addr_3 = socket_3.getsockname()
+        transport_3.abort()
+
+        transport_1.sendto(b'a', addr=addr_2)
+        self.assertEqual(loop.run_until_complete(
+            protocol_2.wait_for_datagram_received()
+        ), b'a')
+
+        transport_2.sendto(b'b', addr=addr_1)
+        self.assertEqual(loop.run_until_complete(
+            protocol_1.wait_for_datagram_received()
+        ), b'b')
+
+        # this should send to an address that isn't listening
+        transport_1.sendto(b'c', addr=addr_3)
+        loop.run_until_complete(asyncio.sleep(0))
+
+        # transport 1 should still be able to receive messages after sending to
+        # an address that wasn't listening
+        transport_2.sendto(b'd', addr=addr_1)
+        self.assertEqual(loop.run_until_complete(
+            protocol_1.wait_for_datagram_received()
+        ), b'd')
+
+        transport_1.close()
+        transport_2.close()
+
     def test_internal_fds(self):
         loop = self.create_event_loop()
         if not isinstance(loop, selector_events.BaseSelectorEventLoop):
index 075113cbe8e4a63a3e47ecec35f02fe66bfa6814..acef24a703ba38b1a5ae0ce0d6110e39a771156e 100644 (file)
@@ -555,12 +555,93 @@ if sys.platform == 'win32':
         def create_event_loop(self):
             return asyncio.SelectorEventLoop()
 
+
     class ProactorEventLoopTests(BaseSockTestsMixin,
                                  test_utils.TestCase):
 
         def create_event_loop(self):
             return asyncio.ProactorEventLoop()
 
+
+        async def _basetest_datagram_send_to_non_listening_address(self,
+                                                                   recvfrom):
+            # see:
+            #   https://github.com/python/cpython/issues/91227
+            #   https://github.com/python/cpython/issues/88906
+            #   https://bugs.python.org/issue47071
+            #   https://bugs.python.org/issue44743
+            # The Proactor event loop would fail to receive datagram messages
+            # after sending a message to an address that wasn't listening.
+
+            def create_socket():
+                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+                sock.setblocking(False)
+                sock.bind(('127.0.0.1', 0))
+                return sock
+
+            socket_1 = create_socket()
+            addr_1 = socket_1.getsockname()
+
+            socket_2 = create_socket()
+            addr_2 = socket_2.getsockname()
+
+            # creating and immediately closing this to try to get an address
+            # that is not listening
+            socket_3 = create_socket()
+            addr_3 = socket_3.getsockname()
+            socket_3.shutdown(socket.SHUT_RDWR)
+            socket_3.close()
+
+            socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
+            socket_2_recv_task = self.loop.create_task(recvfrom(socket_2))
+            await asyncio.sleep(0)
+
+            await self.loop.sock_sendto(socket_1, b'a', addr_2)
+            self.assertEqual(await socket_2_recv_task, b'a')
+
+            await self.loop.sock_sendto(socket_2, b'b', addr_1)
+            self.assertEqual(await socket_1_recv_task, b'b')
+            socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
+            await asyncio.sleep(0)
+
+            # this should send to an address that isn't listening
+            await self.loop.sock_sendto(socket_1, b'c', addr_3)
+            self.assertEqual(await socket_1_recv_task, b'')
+            socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
+            await asyncio.sleep(0)
+
+            # socket 1 should still be able to receive messages after sending
+            # to an address that wasn't listening
+            socket_2.sendto(b'd', addr_1)
+            self.assertEqual(await socket_1_recv_task, b'd')
+
+            socket_1.shutdown(socket.SHUT_RDWR)
+            socket_1.close()
+            socket_2.shutdown(socket.SHUT_RDWR)
+            socket_2.close()
+
+
+        def test_datagram_send_to_non_listening_address_recvfrom(self):
+            async def recvfrom(socket):
+                data, _ = await self.loop.sock_recvfrom(socket, 4096)
+                return data
+
+            self.loop.run_until_complete(
+                self._basetest_datagram_send_to_non_listening_address(
+                    recvfrom))
+
+
+        def test_datagram_send_to_non_listening_address_recvfrom_into(self):
+            async def recvfrom_into(socket):
+                buf = bytearray(4096)
+                length, _ = await self.loop.sock_recvfrom_into(socket, buf,
+                                                               4096)
+                return buf[:length]
+
+            self.loop.run_until_complete(
+                self._basetest_datagram_send_to_non_listening_address(
+                    recvfrom_into))
+
 else:
     import selectors
 
diff --git a/Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst b/Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst
new file mode 100644 (file)
index 0000000..8e53afd
--- /dev/null
@@ -0,0 +1 @@
+Fix the asyncio ProactorEventLoop implementation so that sending a datagram to an address that is not listening does not prevent receiving any more datagrams.
index b53780205eb9e14247b8e5f7a326c5c7d921eefb..686ecf8b33b9309cfbd31da9a12fd01ad4f53116 100644 (file)
@@ -2057,6 +2057,7 @@ overlapped_exec(PyObject *module)
     WINAPI_CONSTANT(F_DWORD,  ERROR_OPERATION_ABORTED);
     WINAPI_CONSTANT(F_DWORD,  ERROR_SEM_TIMEOUT);
     WINAPI_CONSTANT(F_DWORD,  ERROR_PIPE_BUSY);
+    WINAPI_CONSTANT(F_DWORD,  ERROR_PORT_UNREACHABLE);
     WINAPI_CONSTANT(F_DWORD,  INFINITE);
     WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
     WINAPI_CONSTANT(F_HANDLE, NULL);