]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.12] gh-111495: Add PyFile tests (#129449) (#129477) (#129501)
authorVictor Stinner <vstinner@python.org>
Fri, 31 Jan 2025 09:27:35 +0000 (10:27 +0100)
committerGitHub <noreply@github.com>
Fri, 31 Jan 2025 09:27:35 +0000 (10:27 +0100)
[3.13] gh-111495: Add PyFile tests (#129449) (#129477)

gh-111495: Add PyFile tests (#129449)

Add tests for the following functions in test_capi.test_file:

* PyFile_FromFd()
* PyFile_GetLine()
* PyFile_NewStdPrinter()
* PyFile_WriteObject()
* PyFile_WriteString()
* PyObject_AsFileDescriptor()

Remove test_embed.StdPrinterTests which became redundant.

(cherry picked from commit 4ca9fc08f89bf7172d41e523d9e520eb1729ee8c)
(cherry picked from commit 9a59a51733e58b6091ca9157fd43cc9d0f93a96f)

Lib/test/test_capi/test_file.py [new file with mode: 0644]
Lib/test/test_embed.py
Modules/_testcapi/clinic/file.c.h [new file with mode: 0644]
Modules/_testcapi/file.c

diff --git a/Lib/test/test_capi/test_file.py b/Lib/test/test_capi/test_file.py
new file mode 100644 (file)
index 0000000..8042c8e
--- /dev/null
@@ -0,0 +1,231 @@
+import io
+import os
+import unittest
+import warnings
+from test import support
+from test.support import import_helper, os_helper, warnings_helper
+
+
+_testcapi = import_helper.import_module('_testcapi')
+_io = import_helper.import_module('_io')
+NULL = None
+STDOUT_FD = 1
+
+with open(__file__, 'rb') as fp:
+    FIRST_LINE = next(fp).decode()
+FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'
+
+
+class CAPIFileTest(unittest.TestCase):
+    def test_pyfile_fromfd(self):
+        # Test PyFile_FromFd() which is a thin wrapper to _io.open()
+        pyfile_fromfd = _testcapi.pyfile_fromfd
+        filename = __file__
+        with open(filename, "rb") as fp:
+            fd = fp.fileno()
+
+            # FileIO
+            fp.seek(0)
+            obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)
+            try:
+                self.assertIsInstance(obj, _io.FileIO)
+                self.assertEqual(obj.readline(), FIRST_LINE.encode())
+            finally:
+                obj.close()
+
+            # BufferedReader
+            fp.seek(0)
+            obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)
+            try:
+                self.assertIsInstance(obj, _io.BufferedReader)
+                self.assertEqual(obj.readline(), FIRST_LINE.encode())
+            finally:
+                obj.close()
+
+            # TextIOWrapper
+            fp.seek(0)
+            obj = pyfile_fromfd(fd, filename, "r", 1,
+                                "utf-8", "replace", NULL, 0)
+            try:
+                self.assertIsInstance(obj, _io.TextIOWrapper)
+                self.assertEqual(obj.encoding, "utf-8")
+                self.assertEqual(obj.errors, "replace")
+                self.assertEqual(obj.readline(), FIRST_LINE_NORM)
+            finally:
+                obj.close()
+
+    def test_pyfile_getline(self):
+        # Test PyFile_GetLine(file, n): call file.readline()
+        # and strip "\n" suffix if n < 0.
+        pyfile_getline = _testcapi.pyfile_getline
+
+        # Test Unicode
+        with open(__file__, "r") as fp:
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, -1),
+                             FIRST_LINE_NORM.rstrip('\n'))
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, 0),
+                             FIRST_LINE_NORM)
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, 6),
+                             FIRST_LINE_NORM[:6])
+
+        # Test bytes
+        with open(__file__, "rb") as fp:
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, -1),
+                             FIRST_LINE.rstrip('\n').encode())
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, 0),
+                             FIRST_LINE.encode())
+            fp.seek(0)
+            self.assertEqual(pyfile_getline(fp, 6),
+                             FIRST_LINE.encode()[:6])
+
+    def test_pyfile_writestring(self):
+        # Test PyFile_WriteString(str, file): call file.write(str)
+        writestr = _testcapi.pyfile_writestring
+
+        with io.StringIO() as fp:
+            self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)
+            with self.assertRaises(UnicodeDecodeError):
+                writestr(b"\xff", fp)
+            with self.assertRaises(UnicodeDecodeError):
+                writestr("\udc80".encode("utf-8", "surrogatepass"), fp)
+
+            text = fp.getvalue()
+            self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")
+
+        with self.assertRaises(SystemError):
+            writestr(b"abc", NULL)
+
+    def test_pyfile_writeobject(self):
+        # Test PyFile_WriteObject(obj, file, flags):
+        # - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.
+        # - Call file.write(repr(obj)) otherwise.
+        writeobject = _testcapi.pyfile_writeobject
+        Py_PRINT_RAW = 1
+
+        with io.StringIO() as fp:
+            # Test flags=Py_PRINT_RAW
+            self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)
+            writeobject(NULL, fp, Py_PRINT_RAW)
+
+            # Test flags=0
+            self.assertEqual(writeobject("repr", fp, 0), 0)
+            writeobject(NULL, fp, 0)
+
+            text = fp.getvalue()
+            self.assertEqual(text, "raw<NULL>'repr'<NULL>")
+
+        # invalid file type
+        for invalid_file in (123, "abc", object()):
+            with self.subTest(file=invalid_file):
+                with self.assertRaises(AttributeError):
+                    writeobject("abc", invalid_file, Py_PRINT_RAW)
+
+        with self.assertRaises(TypeError):
+            writeobject("abc", NULL, 0)
+
+    def test_pyobject_asfiledescriptor(self):
+        # Test PyObject_AsFileDescriptor(obj):
+        # - Return obj if obj is an integer.
+        # - Return obj.fileno() otherwise.
+        # File descriptor must be >= 0.
+        asfd = _testcapi.pyobject_asfiledescriptor
+
+        self.assertEqual(asfd(123), 123)
+        self.assertEqual(asfd(0), 0)
+
+        with open(__file__, "rb") as fp:
+            self.assertEqual(asfd(fp), fp.fileno())
+
+        self.assertEqual(asfd(False), 0)
+        self.assertEqual(asfd(True), 1)
+
+        class FakeFile:
+            def __init__(self, fd):
+                self.fd = fd
+            def fileno(self):
+                return self.fd
+
+        # file descriptor must be positive
+        with self.assertRaises(ValueError):
+            asfd(-1)
+        with self.assertRaises(ValueError):
+            asfd(FakeFile(-1))
+
+        # fileno() result must be an integer
+        with self.assertRaises(TypeError):
+            asfd(FakeFile("text"))
+
+        # unsupported types
+        for obj in ("string", ["list"], object()):
+            with self.subTest(obj=obj):
+                with self.assertRaises(TypeError):
+                    asfd(obj)
+
+        # CRASHES asfd(NULL)
+
+    def test_pyfile_newstdprinter(self):
+        # Test PyFile_NewStdPrinter()
+        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter
+
+        file = pyfile_newstdprinter(STDOUT_FD)
+        self.assertEqual(file.closed, False)
+        self.assertIsNone(file.encoding)
+        self.assertEqual(file.mode, "w")
+
+        self.assertEqual(file.fileno(), STDOUT_FD)
+        self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))
+
+        # flush() is a no-op
+        self.assertIsNone(file.flush())
+
+        # close() is a no-op
+        self.assertIsNone(file.close())
+        self.assertEqual(file.closed, False)
+
+        support.check_disallow_instantiation(self, type(file))
+
+    def test_pyfile_newstdprinter_write(self):
+        # Test the write() method of PyFile_NewStdPrinter()
+        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter
+
+        filename = os_helper.TESTFN
+        self.addCleanup(os_helper.unlink, filename)
+
+        try:
+            old_stdout = os.dup(STDOUT_FD)
+        except OSError as exc:
+            # os.dup(STDOUT_FD) is not supported on WASI
+            self.skipTest(f"os.dup() failed with {exc!r}")
+
+        try:
+            with open(filename, "wb") as fp:
+                # PyFile_NewStdPrinter() only accepts fileno(stdout)
+                # or fileno(stderr) file descriptor.
+                fd = fp.fileno()
+                os.dup2(fd, STDOUT_FD)
+
+                file = pyfile_newstdprinter(STDOUT_FD)
+                self.assertEqual(file.write("text"), 4)
+                # The surrogate character is encoded with
+                # the "surrogateescape" error handler
+                self.assertEqual(file.write("[\udc80]"), 8)
+        finally:
+            os.dup2(old_stdout, STDOUT_FD)
+            os.close(old_stdout)
+
+        with open(filename, "r") as fp:
+            self.assertEqual(fp.read(), "text[\\udc80]")
+
+    # TODO: Test Py_UniversalNewlineFgets()
+
+    # PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by
+    # test_embed.test_open_code_hook()
+
+
+if __name__ == "__main__":
+    unittest.main()
index c931d1603505456b9d61fc20193f89ec8aa01b97..6e728107b2f7f0e552ec919e60d5b031b4a9846b 100644 (file)
@@ -1866,56 +1866,5 @@ class MiscTests(EmbeddingTestsMixin, unittest.TestCase):
                 self.assertEqual(blocks, 0, out)
 
 
-class StdPrinterTests(EmbeddingTestsMixin, unittest.TestCase):
-    # Test PyStdPrinter_Type which is used by _PySys_SetPreliminaryStderr():
-    #   "Set up a preliminary stderr printer until we have enough
-    #    infrastructure for the io module in place."
-
-    STDOUT_FD = 1
-
-    def create_printer(self, fd):
-        ctypes = import_helper.import_module('ctypes')
-        PyFile_NewStdPrinter = ctypes.pythonapi.PyFile_NewStdPrinter
-        PyFile_NewStdPrinter.argtypes = (ctypes.c_int,)
-        PyFile_NewStdPrinter.restype = ctypes.py_object
-        return PyFile_NewStdPrinter(fd)
-
-    def test_write(self):
-        message = "unicode:\xe9-\u20ac-\udc80!\n"
-
-        stdout_fd = self.STDOUT_FD
-        stdout_fd_copy = os.dup(stdout_fd)
-        self.addCleanup(os.close, stdout_fd_copy)
-
-        rfd, wfd = os.pipe()
-        self.addCleanup(os.close, rfd)
-        self.addCleanup(os.close, wfd)
-        try:
-            # PyFile_NewStdPrinter() only accepts fileno(stdout)
-            # or fileno(stderr) file descriptor.
-            os.dup2(wfd, stdout_fd)
-
-            printer = self.create_printer(stdout_fd)
-            printer.write(message)
-        finally:
-            os.dup2(stdout_fd_copy, stdout_fd)
-
-        data = os.read(rfd, 100)
-        self.assertEqual(data, message.encode('utf8', 'backslashreplace'))
-
-    def test_methods(self):
-        fd = self.STDOUT_FD
-        printer = self.create_printer(fd)
-        self.assertEqual(printer.fileno(), fd)
-        self.assertEqual(printer.isatty(), os.isatty(fd))
-        printer.flush()  # noop
-        printer.close()  # noop
-
-    def test_disallow_instantiation(self):
-        fd = self.STDOUT_FD
-        printer = self.create_printer(fd)
-        support.check_disallow_instantiation(self, type(printer))
-
-
 if __name__ == "__main__":
     unittest.main()
diff --git a/Modules/_testcapi/clinic/file.c.h b/Modules/_testcapi/clinic/file.c.h
new file mode 100644 (file)
index 0000000..52e59f1
--- /dev/null
@@ -0,0 +1,112 @@
+/*[clinic input]
+preserve
+[clinic start generated code]*/
+
+#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+#  include "pycore_gc.h"            // PyGC_Head
+#  include "pycore_runtime.h"       // _Py_ID()
+#endif
+
+
+PyDoc_STRVAR(_testcapi_pyfile_getline__doc__,
+"pyfile_getline($module, file, n, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYFILE_GETLINE_METHODDEF    \
+    {"pyfile_getline", _PyCFunction_CAST(_testcapi_pyfile_getline), METH_FASTCALL, _testcapi_pyfile_getline__doc__},
+
+static PyObject *
+_testcapi_pyfile_getline_impl(PyObject *module, PyObject *file, int n);
+
+static PyObject *
+_testcapi_pyfile_getline(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
+{
+    PyObject *return_value = NULL;
+    PyObject *file;
+    int n;
+
+    if (!_PyArg_CheckPositional("pyfile_getline", nargs, 2, 2)) {
+        goto exit;
+    }
+    file = args[0];
+    n = _PyLong_AsInt(args[1]);
+    if (n == -1 && PyErr_Occurred()) {
+        goto exit;
+    }
+    return_value = _testcapi_pyfile_getline_impl(module, file, n);
+
+exit:
+    return return_value;
+}
+
+PyDoc_STRVAR(_testcapi_pyfile_writeobject__doc__,
+"pyfile_writeobject($module, obj, file, flags, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYFILE_WRITEOBJECT_METHODDEF    \
+    {"pyfile_writeobject", _PyCFunction_CAST(_testcapi_pyfile_writeobject), METH_FASTCALL, _testcapi_pyfile_writeobject__doc__},
+
+static PyObject *
+_testcapi_pyfile_writeobject_impl(PyObject *module, PyObject *obj,
+                                  PyObject *file, int flags);
+
+static PyObject *
+_testcapi_pyfile_writeobject(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
+{
+    PyObject *return_value = NULL;
+    PyObject *obj;
+    PyObject *file;
+    int flags;
+
+    if (!_PyArg_CheckPositional("pyfile_writeobject", nargs, 3, 3)) {
+        goto exit;
+    }
+    obj = args[0];
+    file = args[1];
+    flags = _PyLong_AsInt(args[2]);
+    if (flags == -1 && PyErr_Occurred()) {
+        goto exit;
+    }
+    return_value = _testcapi_pyfile_writeobject_impl(module, obj, file, flags);
+
+exit:
+    return return_value;
+}
+
+PyDoc_STRVAR(_testcapi_pyobject_asfiledescriptor__doc__,
+"pyobject_asfiledescriptor($module, obj, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYOBJECT_ASFILEDESCRIPTOR_METHODDEF    \
+    {"pyobject_asfiledescriptor", (PyCFunction)_testcapi_pyobject_asfiledescriptor, METH_O, _testcapi_pyobject_asfiledescriptor__doc__},
+
+PyDoc_STRVAR(_testcapi_pyfile_newstdprinter__doc__,
+"pyfile_newstdprinter($module, fd, /)\n"
+"--\n"
+"\n");
+
+#define _TESTCAPI_PYFILE_NEWSTDPRINTER_METHODDEF    \
+    {"pyfile_newstdprinter", (PyCFunction)_testcapi_pyfile_newstdprinter, METH_O, _testcapi_pyfile_newstdprinter__doc__},
+
+static PyObject *
+_testcapi_pyfile_newstdprinter_impl(PyObject *module, int fd);
+
+static PyObject *
+_testcapi_pyfile_newstdprinter(PyObject *module, PyObject *arg)
+{
+    PyObject *return_value = NULL;
+    int fd;
+
+    fd = _PyLong_AsInt(arg);
+    if (fd == -1 && PyErr_Occurred()) {
+        goto exit;
+    }
+    return_value = _testcapi_pyfile_newstdprinter_impl(module, fd);
+
+exit:
+    return return_value;
+}
+/*[clinic end generated code: output=bb69f2c213a490f6 input=a9049054013a1b77]*/
index 634563f6ea12cb4746f6ae07369ea4159d26f9d6..8763dbba62c0a830f4e02127024b625947cf820c 100644 (file)
+#define PY_SSIZE_T_CLEAN
 #include "parts.h"
 #include "util.h"
+#include "clinic/file.c.h"
+
+
+/*[clinic input]
+module _testcapi
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6361033e795369fc]*/
+
+
+static PyObject *
+pyfile_fromfd(PyObject *module, PyObject *args)
+{
+    int fd;
+    const char *name;
+    Py_ssize_t size;
+    const char *mode;
+    int buffering;
+    const char *encoding;
+    const char *errors;
+    const char *newline;
+    int closefd;
+    if (!PyArg_ParseTuple(args,
+                          "iz#z#"
+                          "iz#z#"
+                          "z#i",
+                          &fd, &name, &size, &mode, &size,
+                          &buffering, &encoding, &size, &errors, &size,
+                          &newline, &size, &closefd)) {
+        return NULL;
+    }
+
+    return PyFile_FromFd(fd, name, mode, buffering,
+                         encoding, errors, newline, closefd);
+}
+
+
+/*[clinic input]
+_testcapi.pyfile_getline
+
+    file: object
+    n: int
+    /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyfile_getline_impl(PyObject *module, PyObject *file, int n)
+/*[clinic end generated code: output=137fde2774563266 input=df26686148b3657e]*/
+{
+    return PyFile_GetLine(file, n);
+}
+
+
+/*[clinic input]
+_testcapi.pyfile_writeobject
+
+    obj: object
+    file: object
+    flags: int
+    /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyfile_writeobject_impl(PyObject *module, PyObject *obj,
+                                  PyObject *file, int flags)
+/*[clinic end generated code: output=ebb4d802e3db489c input=64a34a3e75b9935a]*/
+{
+    NULLABLE(obj);
+    NULLABLE(file);
+    RETURN_INT(PyFile_WriteObject(obj, file, flags));
+}
+
+
+static PyObject *
+pyfile_writestring(PyObject *module, PyObject *args)
+{
+    const char *str;
+    Py_ssize_t size;
+    PyObject *file;
+    if (!PyArg_ParseTuple(args, "z#O", &str, &size, &file)) {
+        return NULL;
+    }
+    NULLABLE(file);
+
+    RETURN_INT(PyFile_WriteString(str, file));
+}
+
+
+/*[clinic input]
+_testcapi.pyobject_asfiledescriptor
+
+    obj: object
+    /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyobject_asfiledescriptor(PyObject *module, PyObject *obj)
+/*[clinic end generated code: output=2d640c6a1970c721 input=45fa1171d62b18d7]*/
+{
+    NULLABLE(obj);
+    RETURN_INT(PyObject_AsFileDescriptor(obj));
+}
+
+
+/*[clinic input]
+_testcapi.pyfile_newstdprinter
+
+    fd: int
+    /
+
+[clinic start generated code]*/
+
+static PyObject *
+_testcapi_pyfile_newstdprinter_impl(PyObject *module, int fd)
+/*[clinic end generated code: output=8a2d1c57b6892db3 input=442f1824142262ea]*/
+{
+    return PyFile_NewStdPrinter(fd);
+}
 
 
 static PyMethodDef test_methods[] = {
+    {"pyfile_fromfd", pyfile_fromfd, METH_VARARGS},
+    _TESTCAPI_PYFILE_GETLINE_METHODDEF
+    _TESTCAPI_PYFILE_WRITEOBJECT_METHODDEF
+    {"pyfile_writestring", pyfile_writestring, METH_VARARGS},
+    _TESTCAPI_PYOBJECT_ASFILEDESCRIPTOR_METHODDEF
+    _TESTCAPI_PYFILE_NEWSTDPRINTER_METHODDEF
     {NULL},
 };
 
 int
 _PyTestCapi_Init_File(PyObject *m)
 {
-    if (PyModule_AddFunctions(m, test_methods) < 0){
-        return -1;
-    }
-
-    return 0;
+    return PyModule_AddFunctions(m, test_methods);
 }