import array
return sock.sendmsg(buffers, [(_socket.SOL_SOCKET,
- _socket.SCM_RIGHTS, array.array("i", fds))])
+ _socket.SCM_RIGHTS, array.array("i", fds))], flags, address)
__all__.append("send_fds")
if hasattr(_socket.socket, "recvmsg"):
data = os.read(rfd, 100)
self.assertEqual(data, str(index).encode())
+ def testSendAndRecvFdsByAddress(self):
+ rfd, wfd = os.pipe()
+ self.addCleanup(os.close, rfd)
+ self.addCleanup(os.close, wfd)
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ address = socket_helper.create_unix_domain_name()
+ self.addCleanup(os_helper.unlink, address)
+ socket_helper.bind_unix_socket(sock, address)
+
+ socket.send_fds(sock, [MSG], [rfd], 0, address)
+
+ # request more data and file descriptors than expected
+ msg, (rfd2,), flags, addr = socket.recv_fds(sock, len(MSG) * 2, 2)
+ self.addCleanup(os.close, rfd2)
+ self.assertEqual(msg, MSG)
+ self.assertEqual(flags, 0)
+ self.assertEqual(addr, address)
+
+ # test that the file descriptor is connected
+ os.write(wfd, b'data')
+ data = os.read(rfd2, 100)
+ self.assertEqual(data, b'data')
+
class FreeThreadingTests(unittest.TestCase):