# include <stropts.h> // I_FLUSHBAND
#endif
+#define GUARDSZ 8
+// NUL followed by random bytes.
+static const char guard[GUARDSZ] = "\x00\xfa\x69\xc4\x67\xa3\x6c\x58";
+
/*[clinic input]
module fcntl
[clinic start generated code]*/
return PyLong_FromLong(ret);
}
if (PyUnicode_Check(arg) || PyObject_CheckBuffer(arg)) {
-#define FCNTL_BUFSZ 1024
Py_buffer view;
- char buf[FCNTL_BUFSZ+1]; /* argument plus NUL byte */
+#define FCNTL_BUFSZ 1024
+ /* argument plus NUL byte plus guard to detect a buffer overflow */
+ char buf[FCNTL_BUFSZ+GUARDSZ];
if (!PyArg_Parse(arg, "s*", &view)) {
return NULL;
return NULL;
}
memcpy(buf, view.buf, len);
- buf[len] = '\0';
+ memcpy(buf + len, guard, GUARDSZ);
PyBuffer_Release(&view);
do {
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
+ if (memcmp(buf + len, guard, GUARDSZ) != 0) {
+ PyErr_SetString(PyExc_SystemError, "buffer overflow");
+ return NULL;
+ }
return PyBytes_FromStringAndSize(buf, len);
#undef FCNTL_BUFSZ
}
if (PyUnicode_Check(arg) || PyObject_CheckBuffer(arg)) {
Py_buffer view;
#define IOCTL_BUFSZ 1024
- char buf[IOCTL_BUFSZ+1]; /* argument plus NUL byte */
+ /* argument plus NUL byte plus guard to detect a buffer overflow */
+ char buf[IOCTL_BUFSZ+GUARDSZ];
if (mutate_arg && !PyBytes_Check(arg) && !PyUnicode_Check(arg)) {
if (PyObject_GetBuffer(arg, &view, PyBUF_WRITABLE) == 0) {
- if (view.len <= IOCTL_BUFSZ) {
- memcpy(buf, view.buf, view.len);
- buf[view.len] = '\0';
- do {
- Py_BEGIN_ALLOW_THREADS
- ret = ioctl(fd, code, buf);
- Py_END_ALLOW_THREADS
- } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
- memcpy(view.buf, buf, view.len);
- }
- else {
- do {
- Py_BEGIN_ALLOW_THREADS
- ret = ioctl(fd, code, view.buf);
- Py_END_ALLOW_THREADS
- } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ Py_ssize_t len = view.len;
+ void *ptr = view.buf;
+ if (len <= IOCTL_BUFSZ) {
+ memcpy(buf, ptr, len);
+ memcpy(buf + len, guard, GUARDSZ);
+ ptr = buf;
}
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = ioctl(fd, code, ptr);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
if (ret < 0) {
if (!async_err) {
PyErr_SetFromErrno(PyExc_OSError);
PyBuffer_Release(&view);
return NULL;
}
+ if (ptr == buf) {
+ memcpy(view.buf, buf, len);
+ }
PyBuffer_Release(&view);
+ if (ptr == buf && memcmp(buf + len, guard, GUARDSZ) != 0) {
+ PyErr_SetString(PyExc_SystemError, "buffer overflow");
+ return NULL;
+ }
return PyLong_FromLong(ret);
}
if (!PyErr_ExceptionMatches(PyExc_BufferError)) {
return NULL;
}
memcpy(buf, view.buf, len);
- buf[len] = '\0';
+ memcpy(buf + len, guard, GUARDSZ);
PyBuffer_Release(&view);
do {
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
+ if (memcmp(buf + len, guard, GUARDSZ) != 0) {
+ PyErr_SetString(PyExc_SystemError, "buffer overflow");
+ return NULL;
+ }
return PyBytes_FromStringAndSize(buf, len);
#undef IOCTL_BUFSZ
}