for *cmd* are operating system dependent, and are available as constants
in the :mod:`fcntl` module, using the same names as used in the relevant C
header files. The argument *arg* can either be an integer value, a
- :class:`bytes` object, or a string.
+ :term:`bytes-like object`, or a string.
The type and size of *arg* must match the type and size of
the argument of the operation as specified in the relevant C documentation.
When *arg* is an integer, the function returns the integer
return value of the C :c:func:`fcntl` call.
- When the argument is bytes, it represents a binary structure,
+ When the argument is bytes-like object, it represents a binary structure,
for example, created by :func:`struct.pack`.
A string value is encoded to binary using the UTF-8 encoding.
The binary data is copied to a buffer whose address is
.. audit-event:: fcntl.fcntl fd,cmd,arg fcntl.fcntl
+ .. versionchanged:: next
+ Add support of arbitrary :term:`bytes-like objects <bytes-like object>`,
+ not only :class:`bytes`.
+
.. function:: ioctl(fd, request, arg=0, mutate_flag=True, /)
.. audit-event:: fcntl.ioctl fd,request,arg fcntl.ioctl
+ .. versionchanged:: next
+ The GIL is always released during a system call.
+ System calls failing with EINTR are automatically retried.
.. function:: flock(fd, operation, /)
fcntl_fcntl_impl(PyObject *module, int fd, int code, PyObject *arg)
/*[clinic end generated code: output=888fc93b51c295bd input=7955340198e5f334]*/
{
- unsigned int int_arg = 0;
int ret;
- char *str;
- Py_ssize_t len;
- char buf[1024];
int async_err = 0;
if (PySys_Audit("fcntl.fcntl", "iiO", fd, code, arg ? arg : Py_None) < 0) {
return NULL;
}
- if (arg != NULL) {
- int parse_result;
-
- if (PyArg_Parse(arg, "s#", &str, &len)) {
- if ((size_t)len > sizeof buf) {
- PyErr_SetString(PyExc_ValueError,
- "fcntl string arg too long");
+ if (arg == NULL || PyIndex_Check(arg)) {
+ unsigned int int_arg = 0;
+ if (arg != NULL) {
+ if (!PyArg_Parse(arg, "I", &int_arg)) {
return NULL;
}
- memcpy(buf, str, len);
- do {
- Py_BEGIN_ALLOW_THREADS
- ret = fcntl(fd, code, buf);
- Py_END_ALLOW_THREADS
- } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
- if (ret < 0) {
- return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
- }
- return PyBytes_FromStringAndSize(buf, len);
}
- PyErr_Clear();
- parse_result = PyArg_Parse(arg,
- "I;fcntl requires a file or file descriptor,"
- " an integer and optionally a third integer or a string",
- &int_arg);
- if (!parse_result) {
- return NULL;
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = fcntl(fd, code, (int)int_arg);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ if (ret < 0) {
+ return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
+ return PyLong_FromLong(ret);
}
+ if (PyUnicode_Check(arg) || PyObject_CheckBuffer(arg)) {
+#define FCNTL_BUFSZ 1024
+ Py_buffer view;
+ char buf[FCNTL_BUFSZ+1]; /* argument plus NUL byte */
- do {
- Py_BEGIN_ALLOW_THREADS
- ret = fcntl(fd, code, (int)int_arg);
- Py_END_ALLOW_THREADS
- } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
- if (ret < 0) {
- return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
+ if (!PyArg_Parse(arg, "s*", &view)) {
+ return NULL;
+ }
+ Py_ssize_t len = view.len;
+ if (len > FCNTL_BUFSZ) {
+ PyErr_SetString(PyExc_ValueError,
+ "fcntl argument 3 is too long");
+ PyBuffer_Release(&view);
+ return NULL;
+ }
+ memcpy(buf, view.buf, len);
+ buf[len] = '\0';
+ PyBuffer_Release(&view);
+
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = fcntl(fd, code, buf);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ if (ret < 0) {
+ return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
+ }
+ return PyBytes_FromStringAndSize(buf, len);
+#undef FCNTL_BUFSZ
}
- return PyLong_FromLong((long)ret);
+ PyErr_Format(PyExc_TypeError,
+ "fcntl() argument 3 must be an integer, "
+ "a bytes-like object, or a string, not %T",
+ arg);
+ return NULL;
}
fd: fildes
request as code: unsigned_long(bitwise=True)
- arg as ob_arg: object(c_default='NULL') = 0
+ arg: object(c_default='NULL') = 0
mutate_flag as mutate_arg: bool = True
/
[clinic start generated code]*/
static PyObject *
-fcntl_ioctl_impl(PyObject *module, int fd, unsigned long code,
- PyObject *ob_arg, int mutate_arg)
-/*[clinic end generated code: output=3d8eb6828666cea1 input=cee70f6a27311e58]*/
+fcntl_ioctl_impl(PyObject *module, int fd, unsigned long code, PyObject *arg,
+ int mutate_arg)
+/*[clinic end generated code: output=f72baba2454d7a62 input=9c6cca5e2c339622]*/
{
-#define IOCTL_BUFSZ 1024
/* We use the unsigned non-checked 'I' format for the 'code' parameter
because the system expects it to be a 32bit bit field value
regardless of it being passed as an int or unsigned long on
in their unsigned long ioctl codes this will break and need
special casing based on the platform being built on.
*/
- int arg = 0;
int ret;
- Py_buffer pstr;
- char *str;
- Py_ssize_t len;
- char buf[IOCTL_BUFSZ+1]; /* argument plus NUL byte */
+ int async_err = 0;
- if (PySys_Audit("fcntl.ioctl", "ikO", fd, code,
- ob_arg ? ob_arg : Py_None) < 0) {
+ if (PySys_Audit("fcntl.ioctl", "ikO", fd, code, arg ? arg : Py_None) < 0) {
return NULL;
}
- if (ob_arg != NULL) {
- if (PyArg_Parse(ob_arg, "w*:ioctl", &pstr)) {
- char *arg;
- str = pstr.buf;
- len = pstr.len;
-
- if (mutate_arg) {
- if (len <= IOCTL_BUFSZ) {
- memcpy(buf, str, len);
- buf[len] = '\0';
- arg = buf;
+ if (arg == NULL || PyIndex_Check(arg)) {
+ int int_arg = 0;
+ if (arg != NULL) {
+ if (!PyArg_Parse(arg, "i", &int_arg)) {
+ return NULL;
+ }
+ }
+
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = ioctl(fd, code, int_arg);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ if (ret < 0) {
+ return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
+ }
+ return PyLong_FromLong(ret);
+ }
+ if (PyUnicode_Check(arg) || PyObject_CheckBuffer(arg)) {
+ Py_buffer view;
+#define IOCTL_BUFSZ 1024
+ char buf[IOCTL_BUFSZ+1]; /* argument plus NUL byte */
+ if (mutate_arg && !PyBytes_Check(arg) && !PyUnicode_Check(arg)) {
+ if (PyObject_GetBuffer(arg, &view, PyBUF_WRITABLE) == 0) {
+ if (view.len <= IOCTL_BUFSZ) {
+ memcpy(buf, view.buf, view.len);
+ buf[view.len] = '\0';
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = ioctl(fd, code, buf);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ memcpy(view.buf, buf, view.len);
}
else {
- arg = str;
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ ret = ioctl(fd, code, view.buf);
+ Py_END_ALLOW_THREADS
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
}
- }
- else {
- if (len > IOCTL_BUFSZ) {
- PyBuffer_Release(&pstr);
- PyErr_SetString(PyExc_ValueError,
- "ioctl string arg too long");
+ if (ret < 0) {
+ if (!async_err) {
+ PyErr_SetFromErrno(PyExc_OSError);
+ }
+ PyBuffer_Release(&view);
return NULL;
}
- else {
- memcpy(buf, str, len);
- buf[len] = '\0';
- arg = buf;
- }
- }
- if (buf == arg) {
- Py_BEGIN_ALLOW_THREADS /* think array.resize() */
- ret = ioctl(fd, code, arg);
- Py_END_ALLOW_THREADS
- }
- else {
- ret = ioctl(fd, code, arg);
- }
- if (mutate_arg && (len <= IOCTL_BUFSZ)) {
- memcpy(str, buf, len);
- }
- if (ret < 0) {
- PyErr_SetFromErrno(PyExc_OSError);
- PyBuffer_Release(&pstr);
- return NULL;
- }
- PyBuffer_Release(&pstr);
- if (mutate_arg) {
+ PyBuffer_Release(&view);
return PyLong_FromLong(ret);
}
- else {
- return PyBytes_FromStringAndSize(buf, len);
+ if (!PyErr_ExceptionMatches(PyExc_BufferError)) {
+ return NULL;
}
+ PyErr_Clear();
}
- PyErr_Clear();
- if (PyArg_Parse(ob_arg, "s*:ioctl", &pstr)) {
- str = pstr.buf;
- len = pstr.len;
- if (len > IOCTL_BUFSZ) {
- PyBuffer_Release(&pstr);
- PyErr_SetString(PyExc_ValueError,
- "ioctl string arg too long");
- return NULL;
- }
- memcpy(buf, str, len);
- buf[len] = '\0';
+ if (!PyArg_Parse(arg, "s*", &view)) {
+ return NULL;
+ }
+ Py_ssize_t len = view.len;
+ if (len > IOCTL_BUFSZ) {
+ PyErr_SetString(PyExc_ValueError,
+ "ioctl argument 3 is too long");
+ PyBuffer_Release(&view);
+ return NULL;
+ }
+ memcpy(buf, view.buf, len);
+ buf[len] = '\0';
+ PyBuffer_Release(&view);
+
+ do {
Py_BEGIN_ALLOW_THREADS
ret = ioctl(fd, code, buf);
Py_END_ALLOW_THREADS
- if (ret < 0) {
- PyErr_SetFromErrno(PyExc_OSError);
- PyBuffer_Release(&pstr);
- return NULL;
- }
- PyBuffer_Release(&pstr);
- return PyBytes_FromStringAndSize(buf, len);
- }
-
- PyErr_Clear();
- if (!PyArg_Parse(ob_arg,
- "i;ioctl requires a file or file descriptor,"
- " an integer and optionally an integer or buffer argument",
- &arg)) {
- return NULL;
+ } while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
+ if (ret < 0) {
+ return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
- // Fall-through to outside the 'if' statement.
- }
- Py_BEGIN_ALLOW_THREADS
- ret = ioctl(fd, code, arg);
- Py_END_ALLOW_THREADS
- if (ret < 0) {
- PyErr_SetFromErrno(PyExc_OSError);
- return NULL;
- }
- return PyLong_FromLong((long)ret);
+ return PyBytes_FromStringAndSize(buf, len);
#undef IOCTL_BUFSZ
+ }
+ PyErr_Format(PyExc_TypeError,
+ "ioctl() argument 3 must be an integer, "
+ "a bytes-like object, or a string, not %T",
+ arg);
+ return NULL;
}
/*[clinic input]