def test_read(self):
self.check(os.read, 1)
+ @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
+ def test_readv(self):
+ buf = bytearray(10)
+ self.check(os.readv, [buf])
+
@unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
def test_tcsetpgrpt(self):
self.check(os.tcsetpgrp, 0)
def test_write(self):
self.check(os.write, b" ")
+ @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
+ def test_writev(self):
+ self.check(os.writev, [b'abc'])
+
class LinkTests(unittest.TestCase):
def setUp(self):
def test_writev(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
- os.writev(fd, (b'test1', b'tt2', b't3'))
+ n = os.writev(fd, (b'test1', b'tt2', b't3'))
+ self.assertEqual(n, 10)
+
os.lseek(fd, 0, os.SEEK_SET)
self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
+
+ # Issue #20113: empty list of buffers should not crash
+ self.assertEqual(posix.writev(fd, []), 0)
finally:
os.close(fd)
buf = [bytearray(i) for i in [5, 3, 2]]
self.assertEqual(posix.readv(fd, buf), 10)
self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
+
+ # Issue #20113: empty list of buffers should not crash
+ self.assertEqual(posix.readv(fd, []), 0)
finally:
os.close(fd)
*iov = PyMem_New(struct iovec, cnt);
if (*iov == NULL) {
PyErr_NoMemory();
- return total;
+ return -1;
}
*buf = PyMem_New(Py_buffer, cnt);
if (*buf == NULL) {
PyMem_Del(*iov);
PyErr_NoMemory();
- return total;
+ return -1;
}
for (i = 0; i < cnt; i++) {
PyBuffer_Release(&(*buf)[j]);
}
PyMem_Del(*buf);
- return 0;
+ return -1;
}
static void
}
cnt = PySequence_Size(seq);
- if (!iov_setup(&iov, &buf, seq, cnt, PyBUF_WRITABLE))
+ if (iov_setup(&iov, &buf, seq, cnt, PyBUF_WRITABLE) < 0)
return NULL;
Py_BEGIN_ALLOW_THREADS
Py_END_ALLOW_THREADS
iov_cleanup(iov, buf, cnt);
+ if (n < 0)
+ return posix_error();
+
return PyLong_FromSsize_t(n);
}
#endif
Py_ssize_t i = 0; /* Avoid uninitialized warning */
sf.hdr_cnt = PySequence_Size(headers);
if (sf.hdr_cnt > 0 &&
- !(i = iov_setup(&(sf.headers), &hbuf,
- headers, sf.hdr_cnt, PyBUF_SIMPLE)))
+ (i = iov_setup(&(sf.headers), &hbuf,
+ headers, sf.hdr_cnt, PyBUF_SIMPLE)) < 0)
return NULL;
#ifdef __APPLE__
sbytes += i;
Py_ssize_t i = 0; /* Avoid uninitialized warning */
sf.trl_cnt = PySequence_Size(trailers);
if (sf.trl_cnt > 0 &&
- !(i = iov_setup(&(sf.trailers), &tbuf,
- trailers, sf.trl_cnt, PyBUF_SIMPLE)))
+ (i = iov_setup(&(sf.trailers), &tbuf,
+ trailers, sf.trl_cnt, PyBUF_SIMPLE)) < 0)
return NULL;
#ifdef __APPLE__
sbytes += i;
}
cnt = PySequence_Size(seq);
- if (!iov_setup(&iov, &buf, seq, cnt, PyBUF_SIMPLE)) {
+ if (iov_setup(&iov, &buf, seq, cnt, PyBUF_SIMPLE) < 0) {
return NULL;
}
Py_END_ALLOW_THREADS
iov_cleanup(iov, buf, cnt);
+ if (res < 0)
+ return posix_error();
+
return PyLong_FromSsize_t(res);
}
#endif