self.loop.run_until_complete(
self._basetest_datagram_recvfrom_into(server_address))
+ async def _basetest_datagram_recvfrom_into_wrong_size(self, server_address):
+ # Call sock_sendto() with a size larger than the buffer
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
+ sock.setblocking(False)
+
+ buf = bytearray(5000)
+ data = b'\x01' * 4096
+ wrong_size = len(buf) + 1
+ await self.loop.sock_sendto(sock, data, server_address)
+ with self.assertRaises(ValueError):
+ await self.loop.sock_recvfrom_into(
+ sock, buf, wrong_size)
+
+ size, addr = await self.loop.sock_recvfrom_into(sock, buf)
+ self.assertEqual(buf[:size], data)
+
+ def test_recvfrom_into_wrong_size(self):
+ with test_utils.run_udp_echo_server() as server_address:
+ self.loop.run_until_complete(
+ self._basetest_datagram_recvfrom_into_wrong_size(server_address))
+
async def _basetest_datagram_sendto_blocking(self, server_address):
# Sad path, sock.sendto() raises BlockingIOError
# This involves patching sock.sendto() to raise BlockingIOError but
}
#endif
+ if (bufobj->len < (Py_ssize_t)size) {
+ PyErr_SetString(PyExc_ValueError, "nbytes is greater than the length of the buffer");
+ return NULL;
+ }
+
wsabuf.buf = bufobj->buf;
wsabuf.len = size;