]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-127807: pathlib ABCs: move private copying methods to dedicated class (#127810)
authorBarney Gale <barney.gale@gmail.com>
Sun, 22 Dec 2024 02:22:08 +0000 (02:22 +0000)
committerGitHub <noreply@github.com>
Sun, 22 Dec 2024 02:22:08 +0000 (02:22 +0000)
Move 9 private `PathBase` attributes and methods into a new `CopyWorker`
class. Change `PathBase.copy` from a method to a `CopyWorker` instance.

The methods remain private in the `CopyWorker` class. In future we might
make some/all of them public so that user subclasses of `PathBase` can
customize the copying process (in particular reading/writing of metadata,)
but we'd need to make `PathBase` public first.

Lib/pathlib/_abc.py
Lib/pathlib/_local.py
Lib/pathlib/_os.py

index b521c757561a99beb1b87e42a6320a9f818aa4ff..6acc29ebab2bc532722f8a1af3329c99c027315c 100644 (file)
@@ -57,6 +57,132 @@ class PathGlobber(_GlobberBase):
         return path.with_segments(str(path) + text)
 
 
+class CopyWorker:
+    """
+    Class that implements copying between path objects. An instance of this
+    class is available from the PathBase.copy property; it's made callable so
+    that PathBase.copy() can be treated as a method.
+
+    The target path's CopyWorker drives the process from its _create() method.
+    Files and directories are exchanged by calling methods on the source and
+    target paths, and metadata is exchanged by calling
+    source.copy._read_metadata() and target.copy._write_metadata().
+    """
+    __slots__ = ('_path',)
+
+    def __init__(self, path):
+        self._path = path
+
+    def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
+             preserve_metadata=False):
+        """
+        Recursively copy this file or directory tree to the given destination.
+        """
+        if not isinstance(target, PathBase):
+            target = self._path.with_segments(target)
+
+        # Delegate to the target path's CopyWorker object.
+        return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
+
+    _readable_metakeys = frozenset()
+
+    def _read_metadata(self, metakeys, *, follow_symlinks=True):
+        """
+        Returns path metadata as a dict with string keys.
+        """
+        raise NotImplementedError
+
+    _writable_metakeys = frozenset()
+
+    def _write_metadata(self, metadata, *, follow_symlinks=True):
+        """
+        Sets path metadata from the given dict with string keys.
+        """
+        raise NotImplementedError
+
+    def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
+        self._ensure_distinct_path(source)
+        if preserve_metadata:
+            metakeys = self._writable_metakeys & source.copy._readable_metakeys
+        else:
+            metakeys = None
+        if not follow_symlinks and source.is_symlink():
+            self._create_symlink(source, metakeys)
+        elif source.is_dir():
+            self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
+        else:
+            self._create_file(source, metakeys)
+        return self._path
+
+    def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
+        """Copy the given directory to our path."""
+        children = list(source.iterdir())
+        self._path.mkdir(exist_ok=dirs_exist_ok)
+        for src in children:
+            dst = self._path.joinpath(src.name)
+            if not follow_symlinks and src.is_symlink():
+                dst.copy._create_symlink(src, metakeys)
+            elif src.is_dir():
+                dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+            else:
+                dst.copy._create_file(src, metakeys)
+        if metakeys:
+            metadata = source.copy._read_metadata(metakeys)
+            if metadata:
+                self._write_metadata(metadata)
+
+    def _create_file(self, source, metakeys):
+        """Copy the given file to our path."""
+        self._ensure_different_file(source)
+        with source.open('rb') as source_f:
+            try:
+                with self._path.open('wb') as target_f:
+                    copyfileobj(source_f, target_f)
+            except IsADirectoryError as e:
+                if not self._path.exists():
+                    # Raise a less confusing exception.
+                    raise FileNotFoundError(
+                        f'Directory does not exist: {self._path}') from e
+                raise
+        if metakeys:
+            metadata = source.copy._read_metadata(metakeys)
+            if metadata:
+                self._write_metadata(metadata)
+
+    def _create_symlink(self, source, metakeys):
+        """Copy the given symbolic link to our path."""
+        self._path.symlink_to(source.readlink())
+        if metakeys:
+            metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+            if metadata:
+                self._write_metadata(metadata, follow_symlinks=False)
+
+    def _ensure_different_file(self, source):
+        """
+        Raise OSError(EINVAL) if both paths refer to the same file.
+        """
+        pass
+
+    def _ensure_distinct_path(self, source):
+        """
+        Raise OSError(EINVAL) if the other path is within this path.
+        """
+        # Note: there is no straightforward, foolproof algorithm to determine
+        # if one directory is within another (a particularly perverse example
+        # would be a single network share mounted in one location via NFS, and
+        # in another location via CIFS), so we simply checks whether the
+        # other path is lexically equal to, or within, this path.
+        if source == self._path:
+            err = OSError(EINVAL, "Source and target are the same path")
+        elif source in self._path.parents:
+            err = OSError(EINVAL, "Source path is a parent of target path")
+        else:
+            return
+        err.filename = str(source)
+        err.filename2 = str(self._path)
+        raise err
+
+
 class PurePathBase:
     """Base class for pure path objects.
 
@@ -374,31 +500,6 @@ class PathBase(PurePathBase):
         except (OSError, ValueError):
             return False
 
-    def _ensure_different_file(self, other_path):
-        """
-        Raise OSError(EINVAL) if both paths refer to the same file.
-        """
-        pass
-
-    def _ensure_distinct_path(self, other_path):
-        """
-        Raise OSError(EINVAL) if the other path is within this path.
-        """
-        # Note: there is no straightforward, foolproof algorithm to determine
-        # if one directory is within another (a particularly perverse example
-        # would be a single network share mounted in one location via NFS, and
-        # in another location via CIFS), so we simply checks whether the
-        # other path is lexically equal to, or within, this path.
-        if self == other_path:
-            err = OSError(EINVAL, "Source and target are the same path")
-        elif self in other_path.parents:
-            err = OSError(EINVAL, "Source path is a parent of target path")
-        else:
-            return
-        err.filename = str(self)
-        err.filename2 = str(other_path)
-        raise err
-
     def open(self, mode='r', buffering=-1, encoding=None,
              errors=None, newline=None):
         """
@@ -537,88 +638,13 @@ class PathBase(PurePathBase):
         """
         raise NotImplementedError
 
-    def _symlink_to_target_of(self, link):
-        """
-        Make this path a symlink with the same target as the given link. This
-        is used by copy().
-        """
-        self.symlink_to(link.readlink())
-
     def mkdir(self, mode=0o777, parents=False, exist_ok=False):
         """
         Create a new directory at this given path.
         """
         raise NotImplementedError
 
-    # Metadata keys supported by this path type.
-    _readable_metadata = _writable_metadata = frozenset()
-
-    def _read_metadata(self, keys=None, *, follow_symlinks=True):
-        """
-        Returns path metadata as a dict with string keys.
-        """
-        raise NotImplementedError
-
-    def _write_metadata(self, metadata, *, follow_symlinks=True):
-        """
-        Sets path metadata from the given dict with string keys.
-        """
-        raise NotImplementedError
-
-    def _copy_metadata(self, target, *, follow_symlinks=True):
-        """
-        Copies metadata (permissions, timestamps, etc) from this path to target.
-        """
-        # Metadata types supported by both source and target.
-        keys = self._readable_metadata & target._writable_metadata
-        if keys:
-            metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
-            target._write_metadata(metadata, follow_symlinks=follow_symlinks)
-
-    def _copy_file(self, target):
-        """
-        Copy the contents of this file to the given target.
-        """
-        self._ensure_different_file(target)
-        with self.open('rb') as source_f:
-            try:
-                with target.open('wb') as target_f:
-                    copyfileobj(source_f, target_f)
-            except IsADirectoryError as e:
-                if not target.exists():
-                    # Raise a less confusing exception.
-                    raise FileNotFoundError(
-                        f'Directory does not exist: {target}') from e
-                else:
-                    raise
-
-    def copy(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
-             preserve_metadata=False):
-        """
-        Recursively copy this file or directory tree to the given destination.
-        """
-        if not isinstance(target, PathBase):
-            target = self.with_segments(target)
-        self._ensure_distinct_path(target)
-        stack = [(self, target)]
-        while stack:
-            src, dst = stack.pop()
-            if not follow_symlinks and src.is_symlink():
-                dst._symlink_to_target_of(src)
-                if preserve_metadata:
-                    src._copy_metadata(dst, follow_symlinks=False)
-            elif src.is_dir():
-                children = src.iterdir()
-                dst.mkdir(exist_ok=dirs_exist_ok)
-                stack.extend((child, dst.joinpath(child.name))
-                             for child in children)
-                if preserve_metadata:
-                    src._copy_metadata(dst)
-            else:
-                src._copy_file(dst)
-                if preserve_metadata:
-                    src._copy_metadata(dst)
-        return target
+    copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)
 
     def copy_into(self, target_dir, *, follow_symlinks=True,
                   dirs_exist_ok=False, preserve_metadata=False):
index 4897149d7e8a8e9452be6d7ca0189f8adbbfe794..915402e6c65b29908cbd18e2d987d82556fd77a0 100644 (file)
@@ -4,10 +4,10 @@ import operator
 import os
 import posixpath
 import sys
-from errno import EINVAL, EXDEV
+from errno import *
 from glob import _StringGlobber, _no_recurse_symlinks
 from itertools import chain
-from stat import S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import S_IMODE, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
 from _collections_abc import Sequence
 
 try:
@@ -19,9 +19,8 @@ try:
 except ImportError:
     grp = None
 
-from pathlib._os import (copyfile, file_metadata_keys, read_file_metadata,
-                         write_file_metadata)
-from pathlib._abc import PurePathBase, PathBase
+from pathlib._os import copyfile
+from pathlib._abc import CopyWorker, PurePathBase, PathBase
 
 
 __all__ = [
@@ -66,6 +65,131 @@ class _PathParents(Sequence):
         return "<{}.parents>".format(type(self._path).__name__)
 
 
+class _LocalCopyWorker(CopyWorker):
+    """This object implements the Path.copy callable.  Don't try to construct
+    it yourself."""
+    __slots__ = ()
+
+    _readable_metakeys = {'mode', 'times_ns'}
+    if hasattr(os.stat_result, 'st_flags'):
+        _readable_metakeys.add('flags')
+    if hasattr(os, 'listxattr'):
+        _readable_metakeys.add('xattrs')
+    _readable_metakeys = _writable_metakeys = frozenset(_readable_metakeys)
+
+    def _read_metadata(self, metakeys, *, follow_symlinks=True):
+        metadata = {}
+        if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
+            st = self._path.stat(follow_symlinks=follow_symlinks)
+            if 'mode' in metakeys:
+                metadata['mode'] = S_IMODE(st.st_mode)
+            if 'times_ns' in metakeys:
+                metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+            if 'flags' in metakeys:
+                metadata['flags'] = st.st_flags
+        if 'xattrs' in metakeys:
+            try:
+                metadata['xattrs'] = [
+                    (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+                    for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+            except OSError as err:
+                if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                    raise
+        return metadata
+
+    def _write_metadata(self, metadata, *, follow_symlinks=True):
+        def _nop(*args, ns=None, follow_symlinks=None):
+            pass
+
+        if follow_symlinks:
+            # use the real function if it exists
+            def lookup(name):
+                return getattr(os, name, _nop)
+        else:
+            # use the real function only if it exists
+            # *and* it supports follow_symlinks
+            def lookup(name):
+                fn = getattr(os, name, _nop)
+                if fn in os.supports_follow_symlinks:
+                    return fn
+                return _nop
+
+        times_ns = metadata.get('times_ns')
+        if times_ns is not None:
+            lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
+        # We must copy extended attributes before the file is (potentially)
+        # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+        xattrs = metadata.get('xattrs')
+        if xattrs is not None:
+            for attr, value in xattrs:
+                try:
+                    os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+                except OSError as e:
+                    if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                        raise
+        mode = metadata.get('mode')
+        if mode is not None:
+            try:
+                lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+            except NotImplementedError:
+                # if we got a NotImplementedError, it's because
+                #   * follow_symlinks=False,
+                #   * lchown() is unavailable, and
+                #   * either
+                #       * fchownat() is unavailable or
+                #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+                #         (it returned ENOSUP.)
+                # therefore we're out of options--we simply cannot chown the
+                # symlink.  give up, suppress the error.
+                # (which is what shutil always did in this circumstance.)
+                pass
+        flags = metadata.get('flags')
+        if flags is not None:
+            try:
+                lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+            except OSError as why:
+                if why.errno not in (EOPNOTSUPP, ENOTSUP):
+                    raise
+
+    if copyfile:
+        # Use fast OS routine for local file copying where available.
+        def _create_file(self, source, metakeys):
+            """Copy the given file to the given target."""
+            try:
+                source = os.fspath(source)
+            except TypeError:
+                if not isinstance(source, PathBase):
+                    raise
+                super()._create_file(source, metakeys)
+            else:
+                copyfile(source, os.fspath(self._path))
+
+    if os.name == 'nt':
+        # Windows: symlink target might not exist yet if we're copying several
+        # files, so ensure we pass is_dir to os.symlink().
+        def _create_symlink(self, source, metakeys):
+            """Copy the given symlink to the given target."""
+            self._path.symlink_to(source.readlink(), source.is_dir())
+            if metakeys:
+                metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+                if metadata:
+                    self._write_metadata(metadata, follow_symlinks=False)
+
+    def _ensure_different_file(self, source):
+        """
+        Raise OSError(EINVAL) if both paths refer to the same file.
+        """
+        try:
+            if not self._path.samefile(source):
+                return
+        except (OSError, ValueError):
+            return
+        err = OSError(EINVAL, "Source and target are the same file")
+        err.filename = str(source)
+        err.filename2 = str(self._path)
+        raise err
+
+
 class PurePath(PurePathBase):
     """Base class for manipulating paths without I/O.
 
@@ -678,20 +802,6 @@ class Path(PathBase, PurePath):
         return (st.st_ino == other_st.st_ino and
                 st.st_dev == other_st.st_dev)
 
-    def _ensure_different_file(self, other_path):
-        """
-        Raise OSError(EINVAL) if both paths refer to the same file.
-        """
-        try:
-            if not self.samefile(other_path):
-                return
-        except (OSError, ValueError):
-            return
-        err = OSError(EINVAL, "Source and target are the same file")
-        err.filename = str(self)
-        err.filename2 = str(other_path)
-        raise err
-
     def open(self, mode='r', buffering=-1, encoding=None,
              errors=None, newline=None):
         """
@@ -932,24 +1042,6 @@ class Path(PathBase, PurePath):
             if not exist_ok or not self.is_dir():
                 raise
 
-    _readable_metadata = _writable_metadata = file_metadata_keys
-    _read_metadata = read_file_metadata
-    _write_metadata = write_file_metadata
-
-    if copyfile:
-        def _copy_file(self, target):
-            """
-            Copy the contents of this file to the given target.
-            """
-            try:
-                target = os.fspath(target)
-            except TypeError:
-                if not isinstance(target, PathBase):
-                    raise
-                PathBase._copy_file(self, target)
-            else:
-                copyfile(os.fspath(self), target)
-
     def chmod(self, mode, *, follow_symlinks=True):
         """
         Change the permissions of the path, like os.chmod().
@@ -1019,16 +1111,17 @@ class Path(PathBase, PurePath):
         os.replace(self, target)
         return self.with_segments(target)
 
+    copy = property(_LocalCopyWorker, doc=_LocalCopyWorker.__call__.__doc__)
+
     def move(self, target):
         """
         Recursively move this file or directory tree to the given destination.
         """
-        self._ensure_different_file(target)
+        if not isinstance(target, PathBase):
+            target = self.with_segments(target)
+        target.copy._ensure_different_file(self)
         try:
             return self.replace(target)
-        except TypeError:
-            if not isinstance(target, PathBase):
-                raise
         except OSError as err:
             if err.errno != EXDEV:
                 raise
@@ -1051,14 +1144,6 @@ class Path(PathBase, PurePath):
             f = f"{type(self).__name__}.symlink_to()"
             raise UnsupportedOperation(f"{f} is unsupported on this system")
 
-    if os.name == 'nt':
-        def _symlink_to_target_of(self, link):
-            """
-            Make this path a symlink with the same target as the given link.
-            This is used by copy().
-            """
-            self.symlink_to(link.readlink(), link.is_dir())
-
     if hasattr(os, "link"):
         def hardlink_to(self, target):
             """
index 642b3a57c59a1d876a7336273116aa019e9b5698..57bcaf3d680138797d6327bbb146dc67acaad152 100644 (file)
@@ -4,7 +4,6 @@ Low-level OS functionality wrappers used by pathlib.
 
 from errno import *
 import os
-import stat
 import sys
 try:
     import fcntl
@@ -163,100 +162,3 @@ def copyfileobj(source_f, target_f):
     write_target = target_f.write
     while buf := read_source(1024 * 1024):
         write_target(buf)
-
-
-# Kinds of metadata supported by the operating system.
-file_metadata_keys = {'mode', 'times_ns'}
-if hasattr(os.stat_result, 'st_flags'):
-    file_metadata_keys.add('flags')
-if hasattr(os, 'listxattr'):
-    file_metadata_keys.add('xattrs')
-file_metadata_keys = frozenset(file_metadata_keys)
-
-
-def read_file_metadata(path, keys=None, *, follow_symlinks=True):
-    """
-    Returns local path metadata as a dict with string keys.
-    """
-    if keys is None:
-        keys = file_metadata_keys
-    assert keys.issubset(file_metadata_keys)
-    result = {}
-    for key in keys:
-        if key == 'xattrs':
-            try:
-                result['xattrs'] = [
-                    (attr, os.getxattr(path, attr, follow_symlinks=follow_symlinks))
-                    for attr in os.listxattr(path, follow_symlinks=follow_symlinks)]
-            except OSError as err:
-                if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
-                    raise
-            continue
-        st = os.stat(path, follow_symlinks=follow_symlinks)
-        if key == 'mode':
-            result['mode'] = stat.S_IMODE(st.st_mode)
-        elif key == 'times_ns':
-            result['times_ns'] = st.st_atime_ns, st.st_mtime_ns
-        elif key == 'flags':
-            result['flags'] = st.st_flags
-    return result
-
-
-def write_file_metadata(path, metadata, *, follow_symlinks=True):
-    """
-    Sets local path metadata from the given dict with string keys.
-    """
-    assert frozenset(metadata.keys()).issubset(file_metadata_keys)
-
-    def _nop(*args, ns=None, follow_symlinks=None):
-        pass
-
-    if follow_symlinks:
-        # use the real function if it exists
-        def lookup(name):
-            return getattr(os, name, _nop)
-    else:
-        # use the real function only if it exists
-        # *and* it supports follow_symlinks
-        def lookup(name):
-            fn = getattr(os, name, _nop)
-            if fn in os.supports_follow_symlinks:
-                return fn
-            return _nop
-
-    times_ns = metadata.get('times_ns')
-    if times_ns is not None:
-        lookup("utime")(path, ns=times_ns, follow_symlinks=follow_symlinks)
-    # We must copy extended attributes before the file is (potentially)
-    # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
-    xattrs = metadata.get('xattrs')
-    if xattrs is not None:
-        for attr, value in xattrs:
-            try:
-                os.setxattr(path, attr, value, follow_symlinks=follow_symlinks)
-            except OSError as e:
-                if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
-                    raise
-    mode = metadata.get('mode')
-    if mode is not None:
-        try:
-            lookup("chmod")(path, mode, follow_symlinks=follow_symlinks)
-        except NotImplementedError:
-            # if we got a NotImplementedError, it's because
-            #   * follow_symlinks=False,
-            #   * lchown() is unavailable, and
-            #   * either
-            #       * fchownat() is unavailable or
-            #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
-            #         (it returned ENOSUP.)
-            # therefore we're out of options--we simply cannot chown the
-            # symlink.  give up, suppress the error.
-            # (which is what shutil always did in this circumstance.)
-            pass
-    flags = metadata.get('flags')
-    if flags is not None:
-        try:
-            lookup("chflags")(path, flags, follow_symlinks=follow_symlinks)
-        except OSError as why:
-            if why.errno not in (EOPNOTSUPP, ENOTSUP):
-                raise