]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-73991: Support preserving metadata in `pathlib.Path.copy()` (#120806)
authorBarney Gale <barney.gale@gmail.com>
Sat, 6 Jul 2024 16:18:39 +0000 (17:18 +0100)
committerGitHub <noreply@github.com>
Sat, 6 Jul 2024 16:18:39 +0000 (17:18 +0100)
Add *preserve_metadata* keyword-only argument to `pathlib.Path.copy()`, defaulting to false. When set to true, we copy timestamps, permissions, extended attributes and flags where available, like `shutil.copystat()`. The argument has no effect on Windows, where metadata is always copied.

Internally (in the pathlib ABCs), path types gain `_readable_metadata` and `_writable_metadata` attributes. These sets of strings describe what kinds of metadata can be retrieved and stored. We take an intersection of `source._readable_metadata` and `target._writable_metadata` to minimise reads/writes. A new `_read_metadata()` method accepts a set of metadata keys and returns a dict with those keys, and a new `_write_metadata()` method accepts a dict of metadata. We *might* make these public in future, but it's hard to justify while the ABCs are still private.

Doc/library/pathlib.rst
Lib/pathlib/_abc.py
Lib/pathlib/_local.py
Lib/pathlib/_os.py
Lib/test/test_pathlib/test_pathlib.py

index d7fd56f4c4ff7fac3cabb1317f7a58c669b76926..f139abd2454d69ed1cff9a33d9da09b812870191 100644 (file)
@@ -1539,7 +1539,7 @@ Creating files and directories
 Copying, renaming and deleting
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. method:: Path.copy(target, *, follow_symlinks=True)
+.. method:: Path.copy(target, *, follow_symlinks=True, preserve_metadata=False)
 
    Copy the contents of this file to the *target* file. If *target* specifies
    a file that already exists, it will be replaced.
@@ -1548,11 +1548,11 @@ Copying, renaming and deleting
    will be created as a symbolic link. If *follow_symlinks* is true and this
    file is a symbolic link, *target* will be a copy of the symlink target.
 
-   .. note::
-      This method uses operating system functionality to copy file content
-      efficiently. The OS might also copy some metadata, such as file
-      permissions. After the copy is complete, users may wish to call
-      :meth:`Path.chmod` to set the permissions of the target file.
+   If *preserve_metadata* is false (the default), only the file data is
+   guaranteed to be copied. Set *preserve_metadata* to true to ensure that the
+   file mode (permissions), flags, last access and modification times, and
+   extended attributes are copied where supported. This argument has no effect
+   on Windows, where metadata is always preserved when copying.
 
    .. versionadded:: 3.14
 
index b5f903ec1f03ce606508495837348dbc07200a0d..05f55badd77c584bb2af45bedfc77ae06ca2780e 100644 (file)
@@ -781,7 +781,32 @@ class PathBase(PurePathBase):
         """
         raise UnsupportedOperation(self._unsupported_msg('mkdir()'))
 
-    def copy(self, target, follow_symlinks=True):
+    # Metadata keys supported by this path type.
+    _readable_metadata = _writable_metadata = frozenset()
+
+    def _read_metadata(self, keys=None, *, follow_symlinks=True):
+        """
+        Returns path metadata as a dict with string keys.
+        """
+        raise UnsupportedOperation(self._unsupported_msg('_read_metadata()'))
+
+    def _write_metadata(self, metadata, *, follow_symlinks=True):
+        """
+        Sets path metadata from the given dict with string keys.
+        """
+        raise UnsupportedOperation(self._unsupported_msg('_write_metadata()'))
+
+    def _copy_metadata(self, target, *, follow_symlinks=True):
+        """
+        Copies metadata (permissions, timestamps, etc) from this path to target.
+        """
+        # Metadata types supported by both source and target.
+        keys = self._readable_metadata & target._writable_metadata
+        if keys:
+            metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
+            target._write_metadata(metadata, follow_symlinks=follow_symlinks)
+
+    def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
         """
         Copy the contents of this file to the given target. If this file is a
         symlink and follow_symlinks is false, a symlink will be created at the
@@ -793,6 +818,8 @@ class PathBase(PurePathBase):
             raise OSError(f"{self!r} and {target!r} are the same file")
         if not follow_symlinks and self.is_symlink():
             target.symlink_to(self.readlink())
+            if preserve_metadata:
+                self._copy_metadata(target, follow_symlinks=False)
             return
         with self.open('rb') as source_f:
             try:
@@ -805,6 +832,8 @@ class PathBase(PurePathBase):
                         f'Directory does not exist: {target}') from e
                 else:
                     raise
+        if preserve_metadata:
+            self._copy_metadata(target)
 
     def copytree(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
                  ignore=None, on_error=None):
index acb57214b81865128c9c910538c6065b44750b25..eae8a30c876f1972711aa8cfe4b1bcf83f803f88 100644 (file)
@@ -17,7 +17,8 @@ try:
 except ImportError:
     grp = None
 
-from ._os import UnsupportedOperation, copyfile
+from ._os import (UnsupportedOperation, copyfile, file_metadata_keys,
+                  read_file_metadata, write_file_metadata)
 from ._abc import PurePathBase, PathBase
 
 
@@ -781,8 +782,12 @@ class Path(PathBase, PurePath):
             if not exist_ok or not self.is_dir():
                 raise
 
+    _readable_metadata = _writable_metadata = file_metadata_keys
+    _read_metadata = read_file_metadata
+    _write_metadata = write_file_metadata
+
     if copyfile:
-        def copy(self, target, follow_symlinks=True):
+        def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
             """
             Copy the contents of this file to the given target. If this file is a
             symlink and follow_symlinks is false, a symlink will be created at the
@@ -799,7 +804,8 @@ class Path(PathBase, PurePath):
                     return
                 except UnsupportedOperation:
                     pass  # Fall through to generic code.
-            PathBase.copy(self, target, follow_symlinks=follow_symlinks)
+            PathBase.copy(self, target, follow_symlinks=follow_symlinks,
+                          preserve_metadata=preserve_metadata)
 
     def chmod(self, mode, *, follow_symlinks=True):
         """
index 61923b5e410b5caee2d632c8bf364bea1373e816..164ee8e9034427c278d1769ea8ac695aad460295 100644 (file)
@@ -2,7 +2,7 @@
 Low-level OS functionality wrappers used by pathlib.
 """
 
-from errno import EBADF, EOPNOTSUPP, ETXTBSY, EXDEV
+from errno import *
 import os
 import stat
 import sys
@@ -178,3 +178,100 @@ def copyfileobj(source_f, target_f):
     write_target = target_f.write
     while buf := read_source(1024 * 1024):
         write_target(buf)
+
+
+# Kinds of metadata supported by the operating system.
+file_metadata_keys = {'mode', 'times_ns'}
+if hasattr(os.stat_result, 'st_flags'):
+    file_metadata_keys.add('flags')
+if hasattr(os, 'listxattr'):
+    file_metadata_keys.add('xattrs')
+file_metadata_keys = frozenset(file_metadata_keys)
+
+
+def read_file_metadata(path, keys=None, *, follow_symlinks=True):
+    """
+    Returns local path metadata as a dict with string keys.
+    """
+    if keys is None:
+        keys = file_metadata_keys
+    assert keys.issubset(file_metadata_keys)
+    result = {}
+    for key in keys:
+        if key == 'xattrs':
+            try:
+                result['xattrs'] = [
+                    (attr, os.getxattr(path, attr, follow_symlinks=follow_symlinks))
+                    for attr in os.listxattr(path, follow_symlinks=follow_symlinks)]
+            except OSError as err:
+                if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                    raise
+            continue
+        st = os.stat(path, follow_symlinks=follow_symlinks)
+        if key == 'mode':
+            result['mode'] = stat.S_IMODE(st.st_mode)
+        elif key == 'times_ns':
+            result['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+        elif key == 'flags':
+            result['flags'] = st.st_flags
+    return result
+
+
+def write_file_metadata(path, metadata, *, follow_symlinks=True):
+    """
+    Sets local path metadata from the given dict with string keys.
+    """
+    assert frozenset(metadata.keys()).issubset(file_metadata_keys)
+
+    def _nop(*args, ns=None, follow_symlinks=None):
+        pass
+
+    if follow_symlinks:
+        # use the real function if it exists
+        def lookup(name):
+            return getattr(os, name, _nop)
+    else:
+        # use the real function only if it exists
+        # *and* it supports follow_symlinks
+        def lookup(name):
+            fn = getattr(os, name, _nop)
+            if fn in os.supports_follow_symlinks:
+                return fn
+            return _nop
+
+    times_ns = metadata.get('times_ns')
+    if times_ns is not None:
+        lookup("utime")(path, ns=times_ns, follow_symlinks=follow_symlinks)
+    # We must copy extended attributes before the file is (potentially)
+    # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+    xattrs = metadata.get('xattrs')
+    if xattrs is not None:
+        for attr, value in xattrs:
+            try:
+                os.setxattr(path, attr, value, follow_symlinks=follow_symlinks)
+            except OSError as e:
+                if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+                    raise
+    mode = metadata.get('mode')
+    if mode is not None:
+        try:
+            lookup("chmod")(path, mode, follow_symlinks=follow_symlinks)
+        except NotImplementedError:
+            # if we got a NotImplementedError, it's because
+            #   * follow_symlinks=False,
+            #   * lchown() is unavailable, and
+            #   * either
+            #       * fchownat() is unavailable or
+            #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+            #         (it returned ENOSUP.)
+            # therefore we're out of options--we simply cannot chown the
+            # symlink.  give up, suppress the error.
+            # (which is what shutil always did in this circumstance.)
+            pass
+    flags = metadata.get('flags')
+    if flags is not None:
+        try:
+            lookup("chflags")(path, flags, follow_symlinks=follow_symlinks)
+        except OSError as why:
+            if why.errno not in (EOPNOTSUPP, ENOTSUP):
+                raise
index da6d82465d29cfa74a1ae8d8bd27aebc19015b9d..234e5746e544cdd4ae130d3478a18f73272ffb00 100644 (file)
@@ -653,6 +653,50 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
             self.assertIsInstance(f, io.RawIOBase)
             self.assertEqual(f.read().strip(), b"this is file A")
 
+    def test_copy_file_preserve_metadata(self):
+        base = self.cls(self.base)
+        source = base / 'fileA'
+        if hasattr(os, 'setxattr'):
+            os.setxattr(source, b'user.foo', b'42')
+        if hasattr(os, 'chmod'):
+            os.chmod(source, stat.S_IRWXU | stat.S_IRWXO)
+        if hasattr(os, 'chflags') and hasattr(stat, 'UF_NODUMP'):
+            os.chflags(source, stat.UF_NODUMP)
+        source_st = source.stat()
+        target = base / 'copyA'
+        source.copy(target, preserve_metadata=True)
+        self.assertTrue(target.exists())
+        self.assertEqual(source.read_text(), target.read_text())
+        target_st = target.stat()
+        self.assertLessEqual(source_st.st_atime, target_st.st_atime)
+        self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
+        if hasattr(os, 'getxattr'):
+            self.assertEqual(os.getxattr(target, b'user.foo'), b'42')
+        self.assertEqual(source_st.st_mode, target_st.st_mode)
+        if hasattr(source_st, 'st_flags'):
+            self.assertEqual(source_st.st_flags, target_st.st_flags)
+
+    @needs_symlinks
+    def test_copy_link_preserve_metadata(self):
+        base = self.cls(self.base)
+        source = base / 'linkA'
+        if hasattr(os, 'lchmod'):
+            os.lchmod(source, stat.S_IRWXU | stat.S_IRWXO)
+        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+            os.lchflags(source, stat.UF_NODUMP)
+        source_st = source.lstat()
+        target = base / 'copyA'
+        source.copy(target, follow_symlinks=False, preserve_metadata=True)
+        self.assertTrue(target.exists())
+        self.assertTrue(target.is_symlink())
+        self.assertEqual(source.readlink(), target.readlink())
+        target_st = target.lstat()
+        self.assertLessEqual(source_st.st_atime, target_st.st_atime)
+        self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
+        self.assertEqual(source_st.st_mode, target_st.st_mode)
+        if hasattr(source_st, 'st_flags'):
+            self.assertEqual(source_st.st_flags, target_st.st_flags)
+
     @unittest.skipIf(sys.platform == "win32" or sys.platform == "wasi", "directories are always readable on Windows and WASI")
     @unittest.skipIf(root_in_posix, "test fails with root privilege")
     def test_copytree_no_read_permission(self):