+import io
import os
import unittest
+import warnings
from test import support
-from test.support import import_helper, os_helper
+from test.support import import_helper, os_helper, warnings_helper
-_testcapi = import_helper.import_module('_testcapi')
+_testcapi = import_helper.import_module('_testcapi')
+_testlimitedcapi = import_helper.import_module('_testlimitedcapi')
+_io = import_helper.import_module('_io')
NULL = None
+STDOUT_FD = 1
+
+with open(__file__, 'rb') as fp:
+ FIRST_LINE = next(fp).decode()
+FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'
class CAPIFileTest(unittest.TestCase):
+ def test_pyfile_fromfd(self):
+ # Test PyFile_FromFd() which is a thin wrapper to _io.open()
+ pyfile_fromfd = _testlimitedcapi.pyfile_fromfd
+ filename = __file__
+ with open(filename, "rb") as fp:
+ fd = fp.fileno()
+
+ # FileIO
+ fp.seek(0)
+ obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)
+ try:
+ self.assertIsInstance(obj, _io.FileIO)
+ self.assertEqual(obj.readline(), FIRST_LINE.encode())
+ finally:
+ obj.close()
+
+ # BufferedReader
+ fp.seek(0)
+ obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)
+ try:
+ self.assertIsInstance(obj, _io.BufferedReader)
+ self.assertEqual(obj.readline(), FIRST_LINE.encode())
+ finally:
+ obj.close()
+
+ # TextIOWrapper
+ fp.seek(0)
+ obj = pyfile_fromfd(fd, filename, "r", 1,
+ "utf-8", "replace", NULL, 0)
+ try:
+ self.assertIsInstance(obj, _io.TextIOWrapper)
+ self.assertEqual(obj.encoding, "utf-8")
+ self.assertEqual(obj.errors, "replace")
+ self.assertEqual(obj.readline(), FIRST_LINE_NORM)
+ finally:
+ obj.close()
+
+ def test_pyfile_getline(self):
+ # Test PyFile_GetLine(file, n): call file.readline()
+ # and strip "\n" suffix if n < 0.
+ pyfile_getline = _testlimitedcapi.pyfile_getline
+
+ # Test Unicode
+ with open(__file__, "r") as fp:
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, -1),
+ FIRST_LINE_NORM.rstrip('\n'))
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, 0),
+ FIRST_LINE_NORM)
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, 6),
+ FIRST_LINE_NORM[:6])
+
+ # Test bytes
+ with open(__file__, "rb") as fp:
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, -1),
+ FIRST_LINE.rstrip('\n').encode())
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, 0),
+ FIRST_LINE.encode())
+ fp.seek(0)
+ self.assertEqual(pyfile_getline(fp, 6),
+ FIRST_LINE.encode()[:6])
+
+ def test_pyfile_writestring(self):
+ # Test PyFile_WriteString(str, file): call file.write(str)
+ writestr = _testlimitedcapi.pyfile_writestring
+
+ with io.StringIO() as fp:
+ self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)
+ with self.assertRaises(UnicodeDecodeError):
+ writestr(b"\xff", fp)
+ with self.assertRaises(UnicodeDecodeError):
+ writestr("\udc80".encode("utf-8", "surrogatepass"), fp)
+
+ text = fp.getvalue()
+ self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")
+
+ with self.assertRaises(SystemError):
+ writestr(b"abc", NULL)
+
+ def test_pyfile_writeobject(self):
+ # Test PyFile_WriteObject(obj, file, flags):
+ # - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.
+ # - Call file.write(repr(obj)) otherwise.
+ writeobject = _testlimitedcapi.pyfile_writeobject
+ Py_PRINT_RAW = 1
+
+ with io.StringIO() as fp:
+ # Test flags=Py_PRINT_RAW
+ self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)
+ writeobject(NULL, fp, Py_PRINT_RAW)
+
+ # Test flags=0
+ self.assertEqual(writeobject("repr", fp, 0), 0)
+ writeobject(NULL, fp, 0)
+
+ text = fp.getvalue()
+ self.assertEqual(text, "raw<NULL>'repr'<NULL>")
+
+ # invalid file type
+ for invalid_file in (123, "abc", object()):
+ with self.subTest(file=invalid_file):
+ with self.assertRaises(AttributeError):
+ writeobject("abc", invalid_file, Py_PRINT_RAW)
+
+ with self.assertRaises(TypeError):
+ writeobject("abc", NULL, 0)
+
+ def test_pyobject_asfiledescriptor(self):
+ # Test PyObject_AsFileDescriptor(obj):
+ # - Return obj if obj is an integer.
+ # - Return obj.fileno() otherwise.
+ # File descriptor must be >= 0.
+ asfd = _testlimitedcapi.pyobject_asfiledescriptor
+
+ self.assertEqual(asfd(123), 123)
+ self.assertEqual(asfd(0), 0)
+
+ with open(__file__, "rb") as fp:
+ self.assertEqual(asfd(fp), fp.fileno())
+
+ # bool emits RuntimeWarning
+ msg = r"bool is used as a file descriptor"
+ with warnings_helper.check_warnings((msg, RuntimeWarning)):
+ self.assertEqual(asfd(True), 1)
+
+ class FakeFile:
+ def __init__(self, fd):
+ self.fd = fd
+ def fileno(self):
+ return self.fd
+
+ # file descriptor must be positive
+ with self.assertRaises(ValueError):
+ asfd(-1)
+ with self.assertRaises(ValueError):
+ asfd(FakeFile(-1))
+
+ # fileno() result must be an integer
+ with self.assertRaises(TypeError):
+ asfd(FakeFile("text"))
+
+ # unsupported types
+ for obj in ("string", ["list"], object()):
+ with self.subTest(obj=obj):
+ with self.assertRaises(TypeError):
+ asfd(obj)
+
+ # CRASHES asfd(NULL)
+
+ def test_pyfile_newstdprinter(self):
+ # Test PyFile_NewStdPrinter()
+ pyfile_newstdprinter = _testcapi.pyfile_newstdprinter
+
+ file = pyfile_newstdprinter(STDOUT_FD)
+ self.assertEqual(file.closed, False)
+ self.assertIsNone(file.encoding)
+ self.assertEqual(file.mode, "w")
+
+ self.assertEqual(file.fileno(), STDOUT_FD)
+ self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))
+
+ # flush() is a no-op
+ self.assertIsNone(file.flush())
+
+ # close() is a no-op
+ self.assertIsNone(file.close())
+ self.assertEqual(file.closed, False)
+
+ support.check_disallow_instantiation(self, type(file))
+
+ def test_pyfile_newstdprinter_write(self):
+ # Test the write() method of PyFile_NewStdPrinter()
+ pyfile_newstdprinter = _testcapi.pyfile_newstdprinter
+
+ filename = os_helper.TESTFN
+ self.addCleanup(os_helper.unlink, filename)
+
+ try:
+ old_stdout = os.dup(STDOUT_FD)
+ except OSError as exc:
+ # os.dup(STDOUT_FD) is not supported on WASI
+ self.skipTest(f"os.dup() failed with {exc!r}")
+
+ try:
+ with open(filename, "wb") as fp:
+ # PyFile_NewStdPrinter() only accepts fileno(stdout)
+ # or fileno(stderr) file descriptor.
+ fd = fp.fileno()
+ os.dup2(fd, STDOUT_FD)
+
+ file = pyfile_newstdprinter(STDOUT_FD)
+ self.assertEqual(file.write("text"), 4)
+ # The surrogate character is encoded with
+ # the "surrogateescape" error handler
+ self.assertEqual(file.write("[\udc80]"), 8)
+ finally:
+ os.dup2(old_stdout, STDOUT_FD)
+ os.close(old_stdout)
+
+ with open(filename, "r") as fp:
+ self.assertEqual(fp.read(), "text[\\udc80]")
+
def test_py_fopen(self):
# Test Py_fopen() and Py_fclose()
+ py_fopen = _testcapi.py_fopen
with open(__file__, "rb") as fp:
source = fp.read()
for filename in (__file__, os.fsencode(__file__)):
with self.subTest(filename=filename):
- data = _testcapi.py_fopen(filename, "rb")
+ data = py_fopen(filename, "rb")
self.assertEqual(data, source[:256])
- data = _testcapi.py_fopen(os_helper.FakePath(filename), "rb")
+ data = py_fopen(os_helper.FakePath(filename), "rb")
self.assertEqual(data, source[:256])
filenames = [
filename = None
continue
try:
- data = _testcapi.py_fopen(filename, "rb")
+ data = py_fopen(filename, "rb")
self.assertEqual(data, source[:256])
finally:
os_helper.unlink(filename)
# embedded null character/byte in the filename
with self.assertRaises(ValueError):
- _testcapi.py_fopen("a\x00b", "rb")
+ py_fopen("a\x00b", "rb")
with self.assertRaises(ValueError):
- _testcapi.py_fopen(b"a\x00b", "rb")
+ py_fopen(b"a\x00b", "rb")
# non-ASCII mode failing with "Invalid argument"
with self.assertRaises(OSError):
- _testcapi.py_fopen(__file__, b"\xc2\x80")
+ py_fopen(__file__, b"\xc2\x80")
with self.assertRaises(OSError):
# \x98 is invalid in cp1250, cp1251, cp1257
# \x9d is invalid in cp1252-cp1255, cp1258
- _testcapi.py_fopen(__file__, b"\xc2\x98\xc2\x9d")
+ py_fopen(__file__, b"\xc2\x98\xc2\x9d")
# UnicodeDecodeError can come from the audit hook code
with self.assertRaises((UnicodeDecodeError, OSError)):
- _testcapi.py_fopen(__file__, b"\x98\x9d")
+ py_fopen(__file__, b"\x98\x9d")
# invalid filename type
for invalid_type in (123, object()):
with self.subTest(filename=invalid_type):
with self.assertRaises(TypeError):
- _testcapi.py_fopen(invalid_type, "rb")
+ py_fopen(invalid_type, "rb")
if support.MS_WINDOWS:
with self.assertRaises(OSError):
# On Windows, the file mode is limited to 10 characters
- _testcapi.py_fopen(__file__, "rt+, ccs=UTF-8")
+ py_fopen(__file__, "rt+, ccs=UTF-8")
+
+ # CRASHES py_fopen(NULL, 'rb')
+ # CRASHES py_fopen(__file__, NULL)
+
+ # TODO: Test Py_UniversalNewlineFgets()
- # CRASHES _testcapi.py_fopen(NULL, 'rb')
- # CRASHES _testcapi.py_fopen(__file__, NULL)
+ # PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by
+ # test_embed.test_open_code_hook()
if __name__ == "__main__":
--- /dev/null
+/*[clinic input]
+preserve
+[clinic start generated code]*/
+
+PyDoc_STRVAR(_testcapi_pyfile_getline__doc__,
+"pyfile_getline($module, file, n, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYFILE_GETLINE_METHODDEF \
+ {"pyfile_getline", (PyCFunction)(void(*)(void))_testcapi_pyfile_getline, METH_FASTCALL, _testcapi_pyfile_getline__doc__},
+
+static PyObject *
+_testcapi_pyfile_getline_impl(PyObject *module, PyObject *file, int n);
+
+static PyObject *
+_testcapi_pyfile_getline(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
+{
+ PyObject *return_value = NULL;
+ PyObject *file;
+ int n;
+
+ if (nargs != 2) {
+ PyErr_Format(PyExc_TypeError, "pyfile_getline expected 2 arguments, got %zd", nargs);
+ goto exit;
+ }
+ file = args[0];
+ n = PyLong_AsInt(args[1]);
+ if (n == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = _testcapi_pyfile_getline_impl(module, file, n);
+
+exit:
+ return return_value;
+}
+
+PyDoc_STRVAR(_testcapi_pyfile_writeobject__doc__,
+"pyfile_writeobject($module, obj, file, flags, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYFILE_WRITEOBJECT_METHODDEF \
+ {"pyfile_writeobject", (PyCFunction)(void(*)(void))_testcapi_pyfile_writeobject, METH_FASTCALL, _testcapi_pyfile_writeobject__doc__},
+
+static PyObject *
+_testcapi_pyfile_writeobject_impl(PyObject *module, PyObject *obj,
+ PyObject *file, int flags);
+
+static PyObject *
+_testcapi_pyfile_writeobject(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
+{
+ PyObject *return_value = NULL;
+ PyObject *obj;
+ PyObject *file;
+ int flags;
+
+ if (nargs != 3) {
+ PyErr_Format(PyExc_TypeError, "pyfile_writeobject expected 3 arguments, got %zd", nargs);
+ goto exit;
+ }
+ obj = args[0];
+ file = args[1];
+ flags = PyLong_AsInt(args[2]);
+ if (flags == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = _testcapi_pyfile_writeobject_impl(module, obj, file, flags);
+
+exit:
+ return return_value;
+}
+
+PyDoc_STRVAR(_testcapi_pyobject_asfiledescriptor__doc__,
+"pyobject_asfiledescriptor($module, obj, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYOBJECT_ASFILEDESCRIPTOR_METHODDEF \
+ {"pyobject_asfiledescriptor", (PyCFunction)_testcapi_pyobject_asfiledescriptor, METH_O, _testcapi_pyobject_asfiledescriptor__doc__},
+/*[clinic end generated code: output=ea572aaaa01aec7b input=a9049054013a1b77]*/
--- /dev/null
+#include "pyconfig.h" // Py_GIL_DISABLED
+#ifndef Py_GIL_DISABLED
+ // Need limited C API 3.13 for PyLong_AsInt()
+# define Py_LIMITED_API 0x030d0000
+#endif
+
+#include "parts.h"
+#include "util.h"
+#include "clinic/file.c.h"
+
+
+/*[clinic input]
+module _testcapi
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6361033e795369fc]*/
+
+
+static PyObject *
+pyfile_fromfd(PyObject *module, PyObject *args)
+{
+ int fd;
+ const char *name;
+ Py_ssize_t size;
+ const char *mode;
+ int buffering;
+ const char *encoding;
+ const char *errors;
+ const char *newline;
+ int closefd;
+ if (!PyArg_ParseTuple(args,
+ "iz#z#"
+ "iz#z#"
+ "z#i",
+ &fd, &name, &size, &mode, &size,
+ &buffering, &encoding, &size, &errors, &size,
+ &newline, &size, &closefd)) {
+ return NULL;
+ }
+
+ return PyFile_FromFd(fd, name, mode, buffering,
+ encoding, errors, newline, closefd);
+}
+
+
+/*[clinic input]
+_testcapi.pyfile_getline
+
+ file: object
+ n: int
+ /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyfile_getline_impl(PyObject *module, PyObject *file, int n)
+/*[clinic end generated code: output=137fde2774563266 input=df26686148b3657e]*/
+{
+ return PyFile_GetLine(file, n);
+}
+
+
+/*[clinic input]
+_testcapi.pyfile_writeobject
+
+ obj: object
+ file: object
+ flags: int
+ /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyfile_writeobject_impl(PyObject *module, PyObject *obj,
+ PyObject *file, int flags)
+/*[clinic end generated code: output=ebb4d802e3db489c input=64a34a3e75b9935a]*/
+{
+ NULLABLE(obj);
+ NULLABLE(file);
+ RETURN_INT(PyFile_WriteObject(obj, file, flags));
+}
+
+
+static PyObject *
+pyfile_writestring(PyObject *module, PyObject *args)
+{
+ const char *str;
+ Py_ssize_t size;
+ PyObject *file;
+ if (!PyArg_ParseTuple(args, "z#O", &str, &size, &file)) {
+ return NULL;
+ }
+ NULLABLE(file);
+
+ RETURN_INT(PyFile_WriteString(str, file));
+}
+
+
+/*[clinic input]
+_testcapi.pyobject_asfiledescriptor
+
+ obj: object
+ /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyobject_asfiledescriptor(PyObject *module, PyObject *obj)
+/*[clinic end generated code: output=2d640c6a1970c721 input=45fa1171d62b18d7]*/
+{
+ NULLABLE(obj);
+ RETURN_INT(PyObject_AsFileDescriptor(obj));
+}
+
+
+static PyMethodDef test_methods[] = {
+ {"pyfile_fromfd", pyfile_fromfd, METH_VARARGS},
+ _TESTCAPI_PYFILE_GETLINE_METHODDEF
+ _TESTCAPI_PYFILE_WRITEOBJECT_METHODDEF
+ {"pyfile_writestring", pyfile_writestring, METH_VARARGS},
+ _TESTCAPI_PYOBJECT_ASFILEDESCRIPTOR_METHODDEF
+ {NULL},
+};
+
+int
+_PyTestLimitedCAPI_Init_File(PyObject *m)
+{
+ return PyModule_AddFunctions(m, test_methods);
+}