]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-128520: pathlib ABCs: improve protocol for 'openable' objects (#134101)
authorBarney Gale <barney.gale@gmail.com>
Fri, 12 Sep 2025 21:25:18 +0000 (22:25 +0100)
committerGitHub <noreply@github.com>
Fri, 12 Sep 2025 21:25:18 +0000 (22:25 +0100)
Rename `pathlib._os.magic_open()` to `vfsopen()`. The new name is a bit
less abstract, and it aligns with the `vfspath()` method added in 5dbd27d.

Per discussion on discourse[^1], adjust `vfsopen()` so that the following
methods may be called:

- `__open_reader__()`
- `__open_writer__(mode)`
- `__open_updater__(mode)`

These three methods return readable, writable, and full duplex file objects
respectively. In the 'writer' method, *mode* is either 'a', 'w' or 'x'. In
the 'updater' method, *mode* is either 'r' or 'w'.

In the pathlib ABCs, replace `ReadablePath.__open_rb__()` with
`__open_reader__()`, and replace `WritablePath.__open_wb__()` with
`__open_writer__()`.

[^1]: https://discuss.python.org/t/open-able-objects/90238

Co-authored-by: Petr Viktorin <encukou@gmail.com>
Lib/pathlib/__init__.py
Lib/pathlib/_os.py
Lib/pathlib/types.py
Lib/test/test_pathlib/support/local_path.py
Lib/test/test_pathlib/support/zip_path.py
Lib/test/test_pathlib/test_read.py
Lib/test/test_pathlib/test_write.py

index cea1a9fe57eedf237e9f8ec4127acd14baf84880..bc39a30c6538ce69b6acbd374fea9c1f4fdb69fc 100644 (file)
@@ -28,7 +28,7 @@ except ImportError:
 
 from pathlib._os import (
     PathInfo, DirEntryInfo,
-    magic_open, vfspath,
+    vfsopen, vfspath,
     ensure_different_files, ensure_distinct_paths,
     copyfile2, copyfileobj, copy_info,
 )
@@ -1129,7 +1129,7 @@ class Path(PurePath):
 
     def _copy_from_file(self, source, preserve_metadata=False):
         ensure_different_files(source, self)
-        with magic_open(source, 'rb') as source_f:
+        with vfsopen(source, 'rb') as source_f:
             with open(self, 'wb') as target_f:
                 copyfileobj(source_f, target_f)
         if preserve_metadata:
index fbcbfb979d1278f0aa0842a03b81a87e3307383b..6508a9bca0d72b41ab9d4052c8f54c63620dc22b 100644 (file)
@@ -166,48 +166,86 @@ def copyfileobj(source_f, target_f):
         write_target(buf)
 
 
-def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
-               newline=None):
+def _open_reader(obj):
+    cls = type(obj)
+    try:
+        open_reader = cls.__open_reader__
+    except AttributeError:
+        cls_name = cls.__name__
+        raise TypeError(f"{cls_name} can't be opened for reading") from None
+    else:
+        return open_reader(obj)
+
+
+def _open_writer(obj, mode):
+    cls = type(obj)
+    try:
+        open_writer = cls.__open_writer__
+    except AttributeError:
+        cls_name = cls.__name__
+        raise TypeError(f"{cls_name} can't be opened for writing") from None
+    else:
+        return open_writer(obj, mode)
+
+
+def _open_updater(obj, mode):
+    cls = type(obj)
+    try:
+        open_updater = cls.__open_updater__
+    except AttributeError:
+        cls_name = cls.__name__
+        raise TypeError(f"{cls_name} can't be opened for updating") from None
+    else:
+        return open_updater(obj, mode)
+
+
+def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None,
+            newline=None):
     """
     Open the file pointed to by this path and return a file object, as
     the built-in open() function does.
+
+    Unlike the built-in open() function, this function additionally accepts
+    'openable' objects, which are objects with any of these special methods:
+
+        __open_reader__()
+        __open_writer__(mode)
+        __open_updater__(mode)
+
+    '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w'
+    and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text
+    mode is requested, the result is wrapped in an io.TextIOWrapper object.
     """
+    if buffering != -1:
+        raise ValueError("buffer size can't be customized")
     text = 'b' not in mode
     if text:
         # Call io.text_encoding() here to ensure any warning is raised at an
         # appropriate stack level.
         encoding = text_encoding(encoding)
     try:
-        return open(path, mode, buffering, encoding, errors, newline)
+        return open(obj, mode, buffering, encoding, errors, newline)
     except TypeError:
         pass
-    cls = type(path)
+    if not text:
+        if encoding is not None:
+            raise ValueError("binary mode doesn't take an encoding argument")
+        if errors is not None:
+            raise ValueError("binary mode doesn't take an errors argument")
+        if newline is not None:
+            raise ValueError("binary mode doesn't take a newline argument")
     mode = ''.join(sorted(c for c in mode if c not in 'bt'))
-    if text:
-        try:
-            attr = getattr(cls, f'__open_{mode}__')
-        except AttributeError:
-            pass
-        else:
-            return attr(path, buffering, encoding, errors, newline)
-    elif encoding is not None:
-        raise ValueError("binary mode doesn't take an encoding argument")
-    elif errors is not None:
-        raise ValueError("binary mode doesn't take an errors argument")
-    elif newline is not None:
-        raise ValueError("binary mode doesn't take a newline argument")
-
-    try:
-        attr = getattr(cls, f'__open_{mode}b__')
-    except AttributeError:
-        pass
+    if mode == 'r':
+        stream = _open_reader(obj)
+    elif mode in ('a', 'w', 'x'):
+        stream = _open_writer(obj, mode)
+    elif mode in ('+r', '+w'):
+        stream = _open_updater(obj, mode[1])
     else:
-        stream = attr(path, buffering)
-        if text:
-            stream = TextIOWrapper(stream, encoding, errors, newline)
-        return stream
-
-    raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
+        raise ValueError(f'invalid mode: {mode}')
+    if text:
+        stream = TextIOWrapper(stream, encoding, errors, newline)
+    return stream
 
 
 def vfspath(obj):
index 42b80221608bcc642baf7d733e96c21d6fa70e9b..fea0dd305fe2a3aa8ddfaa7da6d2e9e368da0e1c 100644 (file)
@@ -13,7 +13,7 @@ Protocols for supporting classes in pathlib.
 from abc import ABC, abstractmethod
 from glob import _GlobberBase
 from io import text_encoding
-from pathlib._os import (magic_open, vfspath, ensure_distinct_paths,
+from pathlib._os import (vfsopen, vfspath, ensure_distinct_paths,
                          ensure_different_files, copyfileobj)
 from pathlib import PurePath, Path
 from typing import Optional, Protocol, runtime_checkable
@@ -264,10 +264,10 @@ class _ReadablePath(_JoinablePath):
         raise NotImplementedError
 
     @abstractmethod
-    def __open_rb__(self, buffering=-1):
+    def __open_reader__(self):
         """
         Open the file pointed to by this path for reading in binary mode and
-        return a file object, like open(mode='rb').
+        return a file object.
         """
         raise NotImplementedError
 
@@ -275,7 +275,7 @@ class _ReadablePath(_JoinablePath):
         """
         Open the file in bytes mode, read it, and close the file.
         """
-        with magic_open(self, mode='rb', buffering=0) as f:
+        with vfsopen(self, mode='rb') as f:
             return f.read()
 
     def read_text(self, encoding=None, errors=None, newline=None):
@@ -285,7 +285,7 @@ class _ReadablePath(_JoinablePath):
         # Call io.text_encoding() here to ensure any warning is raised at an
         # appropriate stack level.
         encoding = text_encoding(encoding)
-        with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
+        with vfsopen(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
             return f.read()
 
     @abstractmethod
@@ -394,10 +394,10 @@ class _WritablePath(_JoinablePath):
         raise NotImplementedError
 
     @abstractmethod
-    def __open_wb__(self, buffering=-1):
+    def __open_writer__(self, mode):
         """
         Open the file pointed to by this path for writing in binary mode and
-        return a file object, like open(mode='wb').
+        return a file object.
         """
         raise NotImplementedError
 
@@ -407,7 +407,7 @@ class _WritablePath(_JoinablePath):
         """
         # type-check for the buffer interface before truncating the file
         view = memoryview(data)
-        with magic_open(self, mode='wb') as f:
+        with vfsopen(self, mode='wb') as f:
             return f.write(view)
 
     def write_text(self, data, encoding=None, errors=None, newline=None):
@@ -420,7 +420,7 @@ class _WritablePath(_JoinablePath):
         if not isinstance(data, str):
             raise TypeError('data must be str, not %s' %
                             data.__class__.__name__)
-        with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
+        with vfsopen(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
             return f.write(data)
 
     def _copy_from(self, source, follow_symlinks=True):
@@ -439,8 +439,8 @@ class _WritablePath(_JoinablePath):
                     stack.append((child, dst.joinpath(child.name)))
             else:
                 ensure_different_files(src, dst)
-                with magic_open(src, 'rb') as source_f:
-                    with magic_open(dst, 'wb') as target_f:
+                with vfsopen(src, 'rb') as source_f:
+                    with vfsopen(dst, 'wb') as target_f:
                         copyfileobj(source_f, target_f)
 
 
index c1423c545bfd0016335be61dab7b2a971093b0fb..ddfd6fd419533c776b38b94de2a994ae8cd4f3b6 100644 (file)
@@ -145,7 +145,7 @@ class ReadableLocalPath(_ReadablePath, LexicalPath):
         super().__init__(*pathsegments)
         self.info = LocalPathInfo(self)
 
-    def __open_rb__(self, buffering=-1):
+    def __open_reader__(self):
         return open(self, 'rb')
 
     def iterdir(self):
@@ -163,8 +163,8 @@ class WritableLocalPath(_WritablePath, LexicalPath):
     __slots__ = ()
     __fspath__ = LexicalPath.__vfspath__
 
-    def __open_wb__(self, buffering=-1):
-        return open(self, 'wb')
+    def __open_writer__(self, mode):
+        return open(self, f'{mode}b')
 
     def mkdir(self, mode=0o777):
         os.mkdir(self, mode)
index 2bfe89b36595b0e0422fde8fdb228fbb2b1d7508..90b939b6a590109017d269a2137c340901752bae 100644 (file)
@@ -264,13 +264,13 @@ class ReadableZipPath(_ReadablePath):
         tree = self.zip_file.filelist.tree
         return tree.resolve(vfspath(self), follow_symlinks=False)
 
-    def __open_rb__(self, buffering=-1):
+    def __open_reader__(self):
         info = self.info.resolve()
         if not info.exists():
             raise FileNotFoundError(errno.ENOENT, "File not found", self)
         elif info.is_dir():
             raise IsADirectoryError(errno.EISDIR, "Is a directory", self)
-        return self.zip_file.open(info.zip_info, 'r')
+        return self.zip_file.open(info.zip_info)
 
     def iterdir(self):
         info = self.info.resolve()
@@ -320,8 +320,8 @@ class WritableZipPath(_WritablePath):
     def with_segments(self, *pathsegments):
         return type(self)(*pathsegments, zip_file=self.zip_file)
 
-    def __open_wb__(self, buffering=-1):
-        return self.zip_file.open(vfspath(self), 'w')
+    def __open_writer__(self, mode):
+        return self.zip_file.open(vfspath(self), mode)
 
     def mkdir(self, mode=0o777):
         zinfo = zipfile.ZipInfo(vfspath(self) + '/')
index 482203c290a3c4e136d11ae04ad52b1f15a2b277..16fb555b2aee0517b73608a13d5b3cd9dae0de02 100644 (file)
@@ -13,10 +13,10 @@ from .support.zip_path import ReadableZipPath, ZipPathGround
 
 if is_pypi:
     from pathlib_abc import PathInfo, _ReadablePath
-    from pathlib_abc._os import magic_open
+    from pathlib_abc._os import vfsopen
 else:
     from pathlib.types import PathInfo, _ReadablePath
-    from pathlib._os import magic_open
+    from pathlib._os import vfsopen
 
 
 class ReadTestBase:
@@ -32,10 +32,16 @@ class ReadTestBase:
 
     def test_open_r(self):
         p = self.root / 'fileA'
-        with magic_open(p, 'r', encoding='utf-8') as f:
+        with vfsopen(p, 'r', encoding='utf-8') as f:
             self.assertIsInstance(f, io.TextIOBase)
             self.assertEqual(f.read(), 'this is file A\n')
 
+    def test_open_r_buffering_error(self):
+        p = self.root / 'fileA'
+        self.assertRaises(ValueError, vfsopen, p, 'r', buffering=0)
+        self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1)
+        self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1024)
+
     @unittest.skipIf(
         not getattr(sys.flags, 'warn_default_encoding', 0),
         "Requires warn_default_encoding",
@@ -43,17 +49,17 @@ class ReadTestBase:
     def test_open_r_encoding_warning(self):
         p = self.root / 'fileA'
         with self.assertWarns(EncodingWarning) as wc:
-            with magic_open(p, 'r'):
+            with vfsopen(p, 'r'):
                 pass
         self.assertEqual(wc.filename, __file__)
 
     def test_open_rb(self):
         p = self.root / 'fileA'
-        with magic_open(p, 'rb') as f:
+        with vfsopen(p, 'rb') as f:
             self.assertEqual(f.read(), b'this is file A\n')
-        self.assertRaises(ValueError, magic_open, p, 'rb', encoding='utf8')
-        self.assertRaises(ValueError, magic_open, p, 'rb', errors='strict')
-        self.assertRaises(ValueError, magic_open, p, 'rb', newline='')
+        self.assertRaises(ValueError, vfsopen, p, 'rb', encoding='utf8')
+        self.assertRaises(ValueError, vfsopen, p, 'rb', errors='strict')
+        self.assertRaises(ValueError, vfsopen, p, 'rb', newline='')
 
     def test_read_bytes(self):
         p = self.root / 'fileA'
index b958490d0a834f64c81eaa15c605b0bb3a6325f1..c9c1d64656c9be055a493fd7727b5e7687e6d3ae 100644 (file)
@@ -13,10 +13,10 @@ from .support.zip_path import WritableZipPath, ZipPathGround
 
 if is_pypi:
     from pathlib_abc import _WritablePath
-    from pathlib_abc._os import magic_open
+    from pathlib_abc._os import vfsopen
 else:
     from pathlib.types import _WritablePath
-    from pathlib._os import magic_open
+    from pathlib._os import vfsopen
 
 
 class WriteTestBase:
@@ -31,11 +31,17 @@ class WriteTestBase:
 
     def test_open_w(self):
         p = self.root / 'fileA'
-        with magic_open(p, 'w', encoding='utf-8') as f:
+        with vfsopen(p, 'w', encoding='utf-8') as f:
             self.assertIsInstance(f, io.TextIOBase)
             f.write('this is file A\n')
         self.assertEqual(self.ground.readtext(p), 'this is file A\n')
 
+    def test_open_w_buffering_error(self):
+        p = self.root / 'fileA'
+        self.assertRaises(ValueError, vfsopen, p, 'w', buffering=0)
+        self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1)
+        self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1024)
+
     @unittest.skipIf(
         not getattr(sys.flags, 'warn_default_encoding', 0),
         "Requires warn_default_encoding",
@@ -43,19 +49,19 @@ class WriteTestBase:
     def test_open_w_encoding_warning(self):
         p = self.root / 'fileA'
         with self.assertWarns(EncodingWarning) as wc:
-            with magic_open(p, 'w'):
+            with vfsopen(p, 'w'):
                 pass
         self.assertEqual(wc.filename, __file__)
 
     def test_open_wb(self):
         p = self.root / 'fileA'
-        with magic_open(p, 'wb') as f:
+        with vfsopen(p, 'wb') as f:
             #self.assertIsInstance(f, io.BufferedWriter)
             f.write(b'this is file A\n')
         self.assertEqual(self.ground.readbytes(p), b'this is file A\n')
-        self.assertRaises(ValueError, magic_open, p, 'wb', encoding='utf8')
-        self.assertRaises(ValueError, magic_open, p, 'wb', errors='strict')
-        self.assertRaises(ValueError, magic_open, p, 'wb', newline='')
+        self.assertRaises(ValueError, vfsopen, p, 'wb', encoding='utf8')
+        self.assertRaises(ValueError, vfsopen, p, 'wb', errors='strict')
+        self.assertRaises(ValueError, vfsopen, p, 'wb', newline='')
 
     def test_write_bytes(self):
         p = self.root / 'fileA'