]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-125413: Move `pathlib.Path.copy()` implementation alongside `Path.info` (#129856)
authorBarney Gale <barney.gale@gmail.com>
Sun, 9 Feb 2025 12:05:23 +0000 (12:05 +0000)
committerGitHub <noreply@github.com>
Sun, 9 Feb 2025 12:05:23 +0000 (12:05 +0000)
Move pathlib's private `CopyReader`, `LocalCopyReader`, `CopyWriter` and
`LocalCopyWriter` classes into `pathlib._os`, where they can live alongside
the low-level copying functions (`copyfileobj()` etc) and high-level path
querying interface (`PathInfo`).

This sets the stage for merging `LocalCopyReader` into `PathInfo`.

No change of behaviour; just moving some code around.

Lib/pathlib/_abc.py
Lib/pathlib/_local.py
Lib/pathlib/_os.py

index 65d91e4d67b463a92441bddc5869cf71e1181539..800d1b4503d78d7bc1383ad4e7dece7ea09b0bff 100644 (file)
@@ -12,11 +12,9 @@ WritablePath.
 """
 
 import functools
-import io
 import posixpath
-from errno import EINVAL
 from glob import _PathGlobber, _no_recurse_symlinks
-from pathlib._os import copyfileobj
+from pathlib._os import magic_open, CopyReader, CopyWriter
 
 
 @functools.cache
@@ -41,162 +39,6 @@ def _explode_path(path):
     return path, names
 
 
-def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
-               newline=None):
-    """
-    Open the file pointed to by this path and return a file object, as
-    the built-in open() function does.
-    """
-    try:
-        return io.open(path, mode, buffering, encoding, errors, newline)
-    except TypeError:
-        pass
-    cls = type(path)
-    text = 'b' not in mode
-    mode = ''.join(sorted(c for c in mode if c not in 'bt'))
-    if text:
-        try:
-            attr = getattr(cls, f'__open_{mode}__')
-        except AttributeError:
-            pass
-        else:
-            return attr(path, buffering, encoding, errors, newline)
-
-    try:
-        attr = getattr(cls, f'__open_{mode}b__')
-    except AttributeError:
-        pass
-    else:
-        stream = attr(path, buffering)
-        if text:
-            stream = io.TextIOWrapper(stream, encoding, errors, newline)
-        return stream
-
-    raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
-
-
-class CopyReader:
-    """
-    Class that implements the "read" part of copying between path objects.
-    An instance of this class is available from the ReadablePath._copy_reader
-    property.
-    """
-    __slots__ = ('_path',)
-
-    def __init__(self, path):
-        self._path = path
-
-    _readable_metakeys = frozenset()
-
-    def _read_metadata(self, metakeys, *, follow_symlinks=True):
-        """
-        Returns path metadata as a dict with string keys.
-        """
-        raise NotImplementedError
-
-
-class CopyWriter:
-    """
-    Class that implements the "write" part of copying between path objects. An
-    instance of this class is available from the WritablePath._copy_writer
-    property.
-    """
-    __slots__ = ('_path',)
-
-    def __init__(self, path):
-        self._path = path
-
-    _writable_metakeys = frozenset()
-
-    def _write_metadata(self, metadata, *, follow_symlinks=True):
-        """
-        Sets path metadata from the given dict with string keys.
-        """
-        raise NotImplementedError
-
-    def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
-        self._ensure_distinct_path(source)
-        if preserve_metadata:
-            metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
-        else:
-            metakeys = None
-        if not follow_symlinks and source.is_symlink():
-            self._create_symlink(source, metakeys)
-        elif source.is_dir():
-            self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
-        else:
-            self._create_file(source, metakeys)
-        return self._path
-
-    def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
-        """Copy the given directory to our path."""
-        children = list(source.iterdir())
-        self._path.mkdir(exist_ok=dirs_exist_ok)
-        for src in children:
-            dst = self._path.joinpath(src.name)
-            if not follow_symlinks and src.is_symlink():
-                dst._copy_writer._create_symlink(src, metakeys)
-            elif src.is_dir():
-                dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
-            else:
-                dst._copy_writer._create_file(src, metakeys)
-        if metakeys:
-            metadata = source._copy_reader._read_metadata(metakeys)
-            if metadata:
-                self._write_metadata(metadata)
-
-    def _create_file(self, source, metakeys):
-        """Copy the given file to our path."""
-        self._ensure_different_file(source)
-        with magic_open(source, 'rb') as source_f:
-            try:
-                with magic_open(self._path, 'wb') as target_f:
-                    copyfileobj(source_f, target_f)
-            except IsADirectoryError as e:
-                if not self._path.exists():
-                    # Raise a less confusing exception.
-                    raise FileNotFoundError(
-                        f'Directory does not exist: {self._path}') from e
-                raise
-        if metakeys:
-            metadata = source._copy_reader._read_metadata(metakeys)
-            if metadata:
-                self._write_metadata(metadata)
-
-    def _create_symlink(self, source, metakeys):
-        """Copy the given symbolic link to our path."""
-        self._path.symlink_to(source.readlink())
-        if metakeys:
-            metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
-            if metadata:
-                self._write_metadata(metadata, follow_symlinks=False)
-
-    def _ensure_different_file(self, source):
-        """
-        Raise OSError(EINVAL) if both paths refer to the same file.
-        """
-        pass
-
-    def _ensure_distinct_path(self, source):
-        """
-        Raise OSError(EINVAL) if the other path is within this path.
-        """
-        # Note: there is no straightforward, foolproof algorithm to determine
-        # if one directory is within another (a particularly perverse example
-        # would be a single network share mounted in one location via NFS, and
-        # in another location via CIFS), so we simply checks whether the
-        # other path is lexically equal to, or within, this path.
-        if source == self._path:
-            err = OSError(EINVAL, "Source and target are the same path")
-        elif source in self._path.parents:
-            err = OSError(EINVAL, "Source path is a parent of target path")
-        else:
-            return
-        err.filename = str(source)
-        err.filename2 = str(self._path)
-        raise err
-
-
 class JoinablePath:
     """Base class for pure path objects.
 
index 6cdcd448991c8cd04b56c581de1fce26bb3090bd..956c1920bf6d78a804aaeed2a106241206e22450 100644 (file)
@@ -7,7 +7,7 @@ import sys
 from errno import *
 from glob import _StringGlobber, _no_recurse_symlinks
 from itertools import chain
-from stat import S_IMODE, S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
 from _collections_abc import Sequence
 
 try:
@@ -19,8 +19,8 @@ try:
 except ImportError:
     grp = None
 
-from pathlib._os import copyfile, PathInfo, DirEntryInfo
-from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath
+from pathlib._os import LocalCopyReader, LocalCopyWriter, PathInfo, DirEntryInfo
+from pathlib._abc import JoinablePath, ReadablePath, WritablePath
 
 
 __all__ = [
@@ -65,141 +65,6 @@ class _PathParents(Sequence):
         return "<{}.parents>".format(type(self._path).__name__)
 
 
-class _LocalCopyReader(CopyReader):
-    """This object implements the "read" part of copying local paths. Don't
-    try to construct it yourself.
-    """
-    __slots__ = ()
-
-    _readable_metakeys = {'mode', 'times_ns'}
-    if hasattr(os.stat_result, 'st_flags'):
-        _readable_metakeys.add('flags')
-    if hasattr(os, 'listxattr'):
-        _readable_metakeys.add('xattrs')
-    _readable_metakeys = frozenset(_readable_metakeys)
-
-    def _read_metadata(self, metakeys, *, follow_symlinks=True):
-        metadata = {}
-        if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
-            st = self._path.stat(follow_symlinks=follow_symlinks)
-            if 'mode' in metakeys:
-                metadata['mode'] = S_IMODE(st.st_mode)
-            if 'times_ns' in metakeys:
-                metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
-            if 'flags' in metakeys:
-                metadata['flags'] = st.st_flags
-        if 'xattrs' in metakeys:
-            try:
-                metadata['xattrs'] = [
-                    (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
-                    for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
-            except OSError as err:
-                if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
-                    raise
-        return metadata
-
-
-class _LocalCopyWriter(CopyWriter):
-    """This object implements the "write" part of copying local paths. Don't
-    try to construct it yourself.
-    """
-    __slots__ = ()
-
-    _writable_metakeys = _LocalCopyReader._readable_metakeys
-
-    def _write_metadata(self, metadata, *, follow_symlinks=True):
-        def _nop(*args, ns=None, follow_symlinks=None):
-            pass
-
-        if follow_symlinks:
-            # use the real function if it exists
-            def lookup(name):
-                return getattr(os, name, _nop)
-        else:
-            # use the real function only if it exists
-            # *and* it supports follow_symlinks
-            def lookup(name):
-                fn = getattr(os, name, _nop)
-                if fn in os.supports_follow_symlinks:
-                    return fn
-                return _nop
-
-        times_ns = metadata.get('times_ns')
-        if times_ns is not None:
-            lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
-        # We must copy extended attributes before the file is (potentially)
-        # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
-        xattrs = metadata.get('xattrs')
-        if xattrs is not None:
-            for attr, value in xattrs:
-                try:
-                    os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
-                except OSError as e:
-                    if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
-                        raise
-        mode = metadata.get('mode')
-        if mode is not None:
-            try:
-                lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
-            except NotImplementedError:
-                # if we got a NotImplementedError, it's because
-                #   * follow_symlinks=False,
-                #   * lchown() is unavailable, and
-                #   * either
-                #       * fchownat() is unavailable or
-                #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
-                #         (it returned ENOSUP.)
-                # therefore we're out of options--we simply cannot chown the
-                # symlink.  give up, suppress the error.
-                # (which is what shutil always did in this circumstance.)
-                pass
-        flags = metadata.get('flags')
-        if flags is not None:
-            try:
-                lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
-            except OSError as why:
-                if why.errno not in (EOPNOTSUPP, ENOTSUP):
-                    raise
-
-    if copyfile:
-        # Use fast OS routine for local file copying where available.
-        def _create_file(self, source, metakeys):
-            """Copy the given file to the given target."""
-            try:
-                source = os.fspath(source)
-            except TypeError:
-                if not isinstance(source, WritablePath):
-                    raise
-                super()._create_file(source, metakeys)
-            else:
-                copyfile(source, os.fspath(self._path))
-
-    if os.name == 'nt':
-        # Windows: symlink target might not exist yet if we're copying several
-        # files, so ensure we pass is_dir to os.symlink().
-        def _create_symlink(self, source, metakeys):
-            """Copy the given symlink to the given target."""
-            self._path.symlink_to(source.readlink(), source.is_dir())
-            if metakeys:
-                metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
-                if metadata:
-                    self._write_metadata(metadata, follow_symlinks=False)
-
-    def _ensure_different_file(self, source):
-        """
-        Raise OSError(EINVAL) if both paths refer to the same file.
-        """
-        try:
-            if not self._path.samefile(source):
-                return
-        except (OSError, ValueError):
-            return
-        err = OSError(EINVAL, "Source and target are the same file")
-        err.filename = str(source)
-        err.filename2 = str(self._path)
-        raise err
-
-
 class PurePath(JoinablePath):
     """Base class for manipulating paths without I/O.
 
@@ -1190,8 +1055,8 @@ class Path(WritablePath, ReadablePath, PurePath):
         os.replace(self, target)
         return self.with_segments(target)
 
-    _copy_reader = property(_LocalCopyReader)
-    _copy_writer = property(_LocalCopyWriter)
+    _copy_reader = property(LocalCopyReader)
+    _copy_writer = property(LocalCopyWriter)
 
     def move(self, target):
         """
index c2febb773cd83a030b0e28712330efe3e48f1c92..c0c81ada858cdfeb827bc7427501e1305ac1d056 100644 (file)
@@ -3,7 +3,8 @@ Low-level OS functionality wrappers used by pathlib.
 """
 
 from errno import *
-from stat import S_ISDIR, S_ISREG, S_ISLNK
+from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE
+import io
 import os
 import sys
 try:
@@ -165,6 +166,295 @@ def copyfileobj(source_f, target_f):
         write_target(buf)
 
 
+def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
+               newline=None):
+    """
+    Open the file pointed to by this path and return a file object, as
+    the built-in open() function does.
+    """
+    try:
+        return io.open(path, mode, buffering, encoding, errors, newline)
+    except TypeError:
+        pass
+    cls = type(path)
+    text = 'b' not in mode
+    mode = ''.join(sorted(c for c in mode if c not in 'bt'))
+    if text:
+        try:
+            attr = getattr(cls, f'__open_{mode}__')
+        except AttributeError:
+            pass
+        else:
+            return attr(path, buffering, encoding, errors, newline)
+
+    try:
+        attr = getattr(cls, f'__open_{mode}b__')
+    except AttributeError:
+        pass
+    else:
+        stream = attr(path, buffering)
+        if text:
+            stream = io.TextIOWrapper(stream, encoding, errors, newline)
+        return stream
+
+    raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
+
+
+class CopyReader:
+    """
+    Class that implements the "read" part of copying between path objects.
+    An instance of this class is available from the ReadablePath._copy_reader
+    property.
+    """
+    __slots__ = ('_path',)
+
+    def __init__(self, path):
+        self._path = path
+
+    _readable_metakeys = frozenset()
+
+    def _read_metadata(self, metakeys, *, follow_symlinks=True):
+        """
+        Returns path metadata as a dict with string keys.
+        """
+        raise NotImplementedError
+
+
+class CopyWriter:
+    """
+    Class that implements the "write" part of copying between path objects. An
+    instance of this class is available from the WritablePath._copy_writer
+    property.
+    """
+    __slots__ = ('_path',)
+
+    def __init__(self, path):
+        self._path = path
+
+    _writable_metakeys = frozenset()
+
+    def _write_metadata(self, metadata, *, follow_symlinks=True):
+        """
+        Sets path metadata from the given dict with string keys.
+        """
+        raise NotImplementedError
+
+    def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
+        self._ensure_distinct_path(source)
+        if preserve_metadata:
+            metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
+        else:
+            metakeys = None
+        if not follow_symlinks and source.is_symlink():
+            self._create_symlink(source, metakeys)
+        elif source.is_dir():
+            self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
+        else:
+            self._create_file(source, metakeys)
+        return self._path
+
+    def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
+        """Copy the given directory to our path."""
+        children = list(source.iterdir())
+        self._path.mkdir(exist_ok=dirs_exist_ok)
+        for src in children:
+            dst = self._path.joinpath(src.name)
+            if not follow_symlinks and src.is_symlink():
+                dst._copy_writer._create_symlink(src, metakeys)
+            elif src.is_dir():
+                dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+            else:
+                dst._copy_writer._create_file(src, metakeys)
+        if metakeys:
+            metadata = source._copy_reader._read_metadata(metakeys)
+            if metadata:
+                self._write_metadata(metadata)
+
+    def _create_file(self, source, metakeys):
+        """Copy the given file to our path."""
+        self._ensure_different_file(source)
+        with magic_open(source, 'rb') as source_f:
+            try:
+                with magic_open(self._path, 'wb') as target_f:
+                    copyfileobj(source_f, target_f)
+            except IsADirectoryError as e:
+                if not self._path.exists():
+                    # Raise a less confusing exception.
+                    raise FileNotFoundError(
+                        f'Directory does not exist: {self._path}') from e
+                raise
+        if metakeys:
+            metadata = source._copy_reader._read_metadata(metakeys)
+            if metadata:
+                self._write_metadata(metadata)
+
+    def _create_symlink(self, source, metakeys):
+        """Copy the given symbolic link to our path."""
+        self._path.symlink_to(source.readlink())
+        if metakeys:
+            metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
+            if metadata:
+                self._write_metadata(metadata, follow_symlinks=False)
+
+    def _ensure_different_file(self, source):
+        """
+        Raise OSError(EINVAL) if both paths refer to the same file.
+        """
+        pass
+
+    def _ensure_distinct_path(self, source):
+        """
+        Raise OSError(EINVAL) if the other path is within this path.
+        """
+        # Note: there is no straightforward, foolproof algorithm to determine
+        # if one directory is within another (a particularly perverse example
+        # would be a single network share mounted in one location via NFS, and
+        # in another location via CIFS), so we simply checks whether the
+        # other path is lexically equal to, or within, this path.
+        if source == self._path:
+            err = OSError(EINVAL, "Source and target are the same path")
+        elif source in self._path.parents:
+            err = OSError(EINVAL, "Source path is a parent of target path")
+        else:
+            return
+        err.filename = str(source)
+        err.filename2 = str(self._path)
+        raise err
+
+
+class LocalCopyReader(CopyReader):
+    """This object implements the "read" part of copying local paths. Don't
+    try to construct it yourself.
+    """
+    __slots__ = ()
+
+    _readable_metakeys = {'mode', 'times_ns'}
+    if hasattr(os.stat_result, 'st_flags'):
+        _readable_metakeys.add('flags')
+    if hasattr(os, 'listxattr'):
+        _readable_metakeys.add('xattrs')
+    _readable_metakeys = frozenset(_readable_metakeys)
+
+    def _read_metadata(self, metakeys, *, follow_symlinks=True):
+        metadata = {}
+        if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
+            st = self._path.stat(follow_symlinks=follow_symlinks)
+            if 'mode' in metakeys:
+                metadata['mode'] = S_IMODE(st.st_mode)
+            if 'times_ns' in metakeys:
+                metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+            if 'flags' in metakeys:
+                metadata['flags'] = st.st_flags
+        if 'xattrs' in metakeys:
+            try:
+                metadata['xattrs'] = [
+                    (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+                    for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+            except OSError as err:
+                if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                    raise
+        return metadata
+
+
+class LocalCopyWriter(CopyWriter):
+    """This object implements the "write" part of copying local paths. Don't
+    try to construct it yourself.
+    """
+    __slots__ = ()
+
+    _writable_metakeys = LocalCopyReader._readable_metakeys
+
+    def _write_metadata(self, metadata, *, follow_symlinks=True):
+        def _nop(*args, ns=None, follow_symlinks=None):
+            pass
+
+        if follow_symlinks:
+            # use the real function if it exists
+            def lookup(name):
+                return getattr(os, name, _nop)
+        else:
+            # use the real function only if it exists
+            # *and* it supports follow_symlinks
+            def lookup(name):
+                fn = getattr(os, name, _nop)
+                if fn in os.supports_follow_symlinks:
+                    return fn
+                return _nop
+
+        times_ns = metadata.get('times_ns')
+        if times_ns is not None:
+            lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
+        # We must copy extended attributes before the file is (potentially)
+        # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+        xattrs = metadata.get('xattrs')
+        if xattrs is not None:
+            for attr, value in xattrs:
+                try:
+                    os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+                except OSError as e:
+                    if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                        raise
+        mode = metadata.get('mode')
+        if mode is not None:
+            try:
+                lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+            except NotImplementedError:
+                # if we got a NotImplementedError, it's because
+                #   * follow_symlinks=False,
+                #   * lchown() is unavailable, and
+                #   * either
+                #       * fchownat() is unavailable or
+                #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+                #         (it returned ENOSUP.)
+                # therefore we're out of options--we simply cannot chown the
+                # symlink.  give up, suppress the error.
+                # (which is what shutil always did in this circumstance.)
+                pass
+        flags = metadata.get('flags')
+        if flags is not None:
+            try:
+                lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+            except OSError as why:
+                if why.errno not in (EOPNOTSUPP, ENOTSUP):
+                    raise
+
+    if copyfile:
+        # Use fast OS routine for local file copying where available.
+        def _create_file(self, source, metakeys):
+            """Copy the given file to the given target."""
+            try:
+                source = os.fspath(source)
+            except TypeError:
+                super()._create_file(source, metakeys)
+            else:
+                copyfile(source, os.fspath(self._path))
+
+    if os.name == 'nt':
+        # Windows: symlink target might not exist yet if we're copying several
+        # files, so ensure we pass is_dir to os.symlink().
+        def _create_symlink(self, source, metakeys):
+            """Copy the given symlink to the given target."""
+            self._path.symlink_to(source.readlink(), source.is_dir())
+            if metakeys:
+                metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
+                if metadata:
+                    self._write_metadata(metadata, follow_symlinks=False)
+
+    def _ensure_different_file(self, source):
+        """
+        Raise OSError(EINVAL) if both paths refer to the same file.
+        """
+        try:
+            if not self._path.samefile(source):
+                return
+        except (OSError, ValueError):
+            return
+        err = OSError(EINVAL, "Source and target are the same file")
+        err.filename = str(source)
+        err.filename2 = str(self._path)
+        raise err
+
+
 class _PathInfoBase:
     __slots__ = ()