source=_socket)
enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo)
+ @unittest.skipUnless(hasattr(socket.socket, "sendmsg"),"sendmsg not supported")
+ def test_sendmsg_reentrant_ancillary_mutation(self):
+
+ class Mut:
+ def __index__(self):
+ seq.clear()
+ return 0
+
+ seq = [
+ (socket.SOL_SOCKET, Mut(), b'x'),
+ (socket.SOL_SOCKET, 0, b'x'),
+ ]
+
+ left, right = socket.socketpair()
+ self.addCleanup(left.close)
+ self.addCleanup(right.close)
+ self.assertRaises(OSError, left.sendmsg, [b'x'], seq)
+
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
class BasicCANTest(unittest.TestCase):
if (cmsg_arg == NULL)
ncmsgs = 0;
else {
- if ((cmsg_fast = PySequence_Fast(cmsg_arg,
- "sendmsg() argument 2 must be an "
- "iterable")) == NULL)
+ cmsg_fast = PySequence_Tuple(cmsg_arg);
+ if (cmsg_fast == NULL) {
+ PyErr_SetString(PyExc_TypeError,
+ "sendmsg() argument 2 must be an iterable");
goto finally;
- ncmsgs = PySequence_Fast_GET_SIZE(cmsg_fast);
+ }
+ ncmsgs = PyTuple_GET_SIZE(cmsg_fast);
}
#ifndef CMSG_SPACE
controllen = controllen_last = 0;
while (ncmsgbufs < ncmsgs) {
size_t bufsize, space;
+ PyObject *item = PyTuple_GET_ITEM(cmsg_fast, ncmsgbufs);
- if (!PyArg_Parse(PySequence_Fast_GET_ITEM(cmsg_fast, ncmsgbufs),
+ if (!PyArg_Parse(item,
"(iiy*):[sendmsg() ancillary data items]",
&cmsgs[ncmsgbufs].level,
&cmsgs[ncmsgbufs].type,