]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-73991: Add `pathlib.Path.rmtree()` (#119060)
authorBarney Gale <barney.gale@gmail.com>
Sat, 20 Jul 2024 20:14:13 +0000 (21:14 +0100)
committerGitHub <noreply@github.com>
Sat, 20 Jul 2024 20:14:13 +0000 (20:14 +0000)
Add a `Path.rmtree()` method that removes an entire directory tree, like
`shutil.rmtree()`. The signature of the optional *on_error* argument
matches the `Path.walk()` argument of the same name, but differs from the
*onexc* and *onerror* arguments to `shutil.rmtree()`. Consistency within
pathlib is probably more important.

In the private pathlib ABCs, we add an implementation based on `walk()`.

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
Doc/library/pathlib.rst
Doc/whatsnew/3.14.rst
Lib/pathlib/_abc.py
Lib/pathlib/_local.py
Lib/test/test_pathlib/test_pathlib.py
Lib/test/test_pathlib/test_pathlib_abc.py
Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst [new file with mode: 0644]

index f139abd2454d69ed1cff9a33d9da09b812870191..a74b1321cb4b1dd29313b80b0f338ad9093a760c 100644 (file)
@@ -1645,6 +1645,34 @@ Copying, renaming and deleting
    Remove this directory.  The directory must be empty.
 
 
+.. method:: Path.rmtree(ignore_errors=False, on_error=None)
+
+   Recursively delete this entire directory tree. The path must not refer to a symlink.
+
+   If *ignore_errors* is true, errors resulting from failed removals will be
+   ignored. If *ignore_errors* is false or omitted, and a function is given to
+   *on_error*, it will be called each time an exception is raised. If neither
+   *ignore_errors* nor *on_error* are supplied, exceptions are propagated to
+   the caller.
+
+   .. note::
+
+      On platforms that support the necessary fd-based functions, a symlink
+      attack-resistant version of :meth:`~Path.rmtree` is used by default. On
+      other platforms, the :func:`~Path.rmtree` implementation is susceptible
+      to a symlink attack: given proper timing and circumstances, attackers
+      can manipulate symlinks on the filesystem to delete files they would not
+      be able to access otherwise.
+
+   If the optional argument *on_error* is specified, it should be a callable;
+   it will be called with one argument of type :exc:`OSError`. The
+   callable can handle the error to continue the deletion process or re-raise
+   it to stop. Note that the filename is available as the :attr:`~OSError.filename`
+   attribute of the exception object.
+
+   .. versionadded:: 3.14
+
+
 Permissions and ownership
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
index 21eb7c3e6efa4a0e49bf7ba3e271db218419d4d7..6f57733470565e6601261a70c94b18e2fb2330eb 100644 (file)
@@ -118,11 +118,15 @@ os
 pathlib
 -------
 
-* Add :meth:`pathlib.Path.copy`, which copies the content of one file to
-  another, like :func:`shutil.copyfile`.
-  (Contributed by Barney Gale in :gh:`73991`.)
-* Add :meth:`pathlib.Path.copytree`, which copies one directory tree to
-  another.
+* Add methods to :class:`pathlib.Path` to recursively copy or remove files:
+
+  * :meth:`~pathlib.Path.copy` copies the content of one file to another, like
+    :func:`shutil.copyfile`.
+  * :meth:`~pathlib.Path.copytree` copies one directory tree to another, like
+    :func:`shutil.copytree`.
+  * :meth:`~pathlib.Path.rmtree` recursively removes a directory tree, like
+    :func:`shutil.rmtree`.
+
   (Contributed by Barney Gale in :gh:`73991`.)
 
 pdb
index 05f55badd77c584bb2af45bedfc77ae06ca2780e..49e8e4ca13782c6b84f04143e0f274e3b0cc4550 100644 (file)
@@ -915,6 +915,47 @@ class PathBase(PurePathBase):
         """
         raise UnsupportedOperation(self._unsupported_msg('rmdir()'))
 
+    def rmtree(self, ignore_errors=False, on_error=None):
+        """
+        Recursively delete this directory tree.
+
+        If *ignore_errors* is true, exceptions raised from scanning the tree
+        and removing files and directories are ignored. Otherwise, if
+        *on_error* is set, it will be called to handle the error. If neither
+        *ignore_errors* nor *on_error* are set, exceptions are propagated to
+        the caller.
+        """
+        if ignore_errors:
+            def on_error(err):
+                pass
+        elif on_error is None:
+            def on_error(err):
+                raise err
+        try:
+            if self.is_symlink():
+                raise OSError("Cannot call rmtree on a symbolic link")
+            elif self.is_junction():
+                raise OSError("Cannot call rmtree on a junction")
+            results = self.walk(
+                on_error=on_error,
+                top_down=False,  # Bottom-up so we rmdir() empty directories.
+                follow_symlinks=False)
+            for dirpath, dirnames, filenames in results:
+                for name in filenames:
+                    try:
+                        dirpath.joinpath(name).unlink()
+                    except OSError as err:
+                        on_error(err)
+                for name in dirnames:
+                    try:
+                        dirpath.joinpath(name).rmdir()
+                    except OSError as err:
+                        on_error(err)
+            self.rmdir()
+        except OSError as err:
+            err.filename = str(self)
+            on_error(err)
+
     def owner(self, *, follow_symlinks=True):
         """
         Return the login name of the file owner.
index eae8a30c876f1972711aa8cfe4b1bcf83f803f88..4fd5279f9fe9ce51fa28b7fd1d7f2a95e0734366 100644 (file)
@@ -830,6 +830,25 @@ class Path(PathBase, PurePath):
         """
         os.rmdir(self)
 
+    def rmtree(self, ignore_errors=False, on_error=None):
+        """
+        Recursively delete this directory tree.
+
+        If *ignore_errors* is true, exceptions raised from scanning the tree
+        and removing files and directories are ignored. Otherwise, if
+        *on_error* is set, it will be called to handle the error. If neither
+        *ignore_errors* nor *on_error* are set, exceptions are propagated to
+        the caller.
+        """
+        if on_error:
+            def onexc(func, filename, err):
+                err.filename = filename
+                on_error(err)
+        else:
+            onexc = None
+        import shutil
+        shutil.rmtree(str(self), ignore_errors, onexc=onexc)
+
     def rename(self, target):
         """
         Rename this path to the target path.
index 1328a8695b0cca23ed2eabb121877d4ad3135726..e17e7d71b6ab461dfa1bc1ce3b263d6aa0e425f4 100644 (file)
@@ -16,6 +16,7 @@ from urllib.request import pathname2url
 from test.support import import_helper
 from test.support import is_emscripten, is_wasi
 from test.support import infinite_recursion
+from test.support import swap_attr
 from test.support import os_helper
 from test.support.os_helper import TESTFN, FakePath
 from test.test_pathlib import test_pathlib_abc
@@ -31,6 +32,10 @@ root_in_posix = False
 if hasattr(os, 'geteuid'):
     root_in_posix = (os.geteuid() == 0)
 
+rmtree_use_fd_functions = (
+    {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
+    os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)
+
 #
 # Tests for the pure classes.
 #
@@ -827,6 +832,252 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
         self.assertEqual(expected_gid, gid_2)
         self.assertEqual(expected_name, link.group(follow_symlinks=False))
 
+    def test_rmtree_uses_safe_fd_version_if_available(self):
+        if rmtree_use_fd_functions:
+            d = self.cls(self.base, 'a')
+            d.mkdir()
+            try:
+                real_open = os.open
+
+                class Called(Exception):
+                    pass
+
+                def _raiser(*args, **kwargs):
+                    raise Called
+
+                os.open = _raiser
+                self.assertRaises(Called, d.rmtree)
+            finally:
+                os.open = real_open
+
+    @unittest.skipIf(sys.platform[:6] == 'cygwin',
+                     "This test can't be run on Cygwin (issue #1071513).")
+    @os_helper.skip_if_dac_override
+    @os_helper.skip_unless_working_chmod
+    def test_rmtree_unwritable(self):
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        child_file_path = tmp / 'a'
+        child_dir_path = tmp / 'b'
+        child_file_path.write_text("")
+        child_dir_path.mkdir()
+        old_dir_mode = tmp.stat().st_mode
+        old_child_file_mode = child_file_path.stat().st_mode
+        old_child_dir_mode = child_dir_path.stat().st_mode
+        # Make unwritable.
+        new_mode = stat.S_IREAD | stat.S_IEXEC
+        try:
+            child_file_path.chmod(new_mode)
+            child_dir_path.chmod(new_mode)
+            tmp.chmod(new_mode)
+
+            errors = []
+            tmp.rmtree(on_error=errors.append)
+            # Test whether onerror has actually been called.
+            print(errors)
+            self.assertEqual(len(errors), 3)
+        finally:
+            tmp.chmod(old_dir_mode)
+            child_file_path.chmod(old_child_file_mode)
+            child_dir_path.chmod(old_child_dir_mode)
+
+    @needs_windows
+    def test_rmtree_inner_junction(self):
+        import _winapi
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        dir1 = tmp / 'dir1'
+        dir2 = dir1 / 'dir2'
+        dir3 = tmp / 'dir3'
+        for d in dir1, dir2, dir3:
+            d.mkdir()
+        file1 = tmp / 'file1'
+        file1.write_text('foo')
+        link1 = dir1 / 'link1'
+        _winapi.CreateJunction(str(dir2), str(link1))
+        link2 = dir1 / 'link2'
+        _winapi.CreateJunction(str(dir3), str(link2))
+        link3 = dir1 / 'link3'
+        _winapi.CreateJunction(str(file1), str(link3))
+        # make sure junctions are removed but not followed
+        dir1.rmtree()
+        self.assertFalse(dir1.exists())
+        self.assertTrue(dir3.exists())
+        self.assertTrue(file1.exists())
+
+    @needs_windows
+    def test_rmtree_outer_junction(self):
+        import _winapi
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        try:
+            src = tmp / 'cheese'
+            dst = tmp / 'shop'
+            src.mkdir()
+            spam = src / 'spam'
+            spam.write_text('')
+            _winapi.CreateJunction(str(src), str(dst))
+            self.assertRaises(OSError, dst.rmtree)
+            dst.rmtree(ignore_errors=True)
+        finally:
+            tmp.rmtree(ignore_errors=True)
+
+    @needs_windows
+    def test_rmtree_outer_junction_on_error(self):
+        import _winapi
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        dir_ = tmp / 'dir'
+        dir_.mkdir()
+        link = tmp / 'link'
+        _winapi.CreateJunction(str(dir_), str(link))
+        try:
+            self.assertRaises(OSError, link.rmtree)
+            self.assertTrue(dir_.exists())
+            self.assertTrue(link.exists(follow_symlinks=False))
+            errors = []
+
+            def on_error(error):
+                errors.append(error)
+
+            link.rmtree(on_error=on_error)
+            self.assertEqual(len(errors), 1)
+            self.assertIsInstance(errors[0], OSError)
+            self.assertEqual(errors[0].filename, str(link))
+        finally:
+            os.unlink(str(link))
+
+    @unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
+    def test_rmtree_fails_on_close(self):
+        # Test that the error handler is called for failed os.close() and that
+        # os.close() is only called once for a file descriptor.
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        dir1 = tmp / 'dir1'
+        dir1.mkdir()
+        dir2 = dir1 / 'dir2'
+        dir2.mkdir()
+
+        def close(fd):
+            orig_close(fd)
+            nonlocal close_count
+            close_count += 1
+            raise OSError
+
+        close_count = 0
+        with swap_attr(os, 'close', close) as orig_close:
+            with self.assertRaises(OSError):
+                dir1.rmtree()
+        self.assertTrue(dir2.is_dir())
+        self.assertEqual(close_count, 2)
+
+        close_count = 0
+        errors = []
+
+        with swap_attr(os, 'close', close) as orig_close:
+            dir1.rmtree(on_error=errors.append)
+        print(errors)
+        self.assertEqual(len(errors), 2)
+        self.assertEqual(errors[0].filename, str(dir2))
+        self.assertEqual(errors[1].filename, str(dir1))
+        self.assertEqual(close_count, 2)
+
+    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+    @unittest.skipIf(sys.platform == "vxworks",
+                     "fifo requires special path on VxWorks")
+    def test_rmtree_on_named_pipe(self):
+        p = self.cls(self.base, 'pipe')
+        os.mkfifo(p)
+        try:
+            with self.assertRaises(NotADirectoryError):
+                p.rmtree()
+            self.assertTrue(p.exists())
+        finally:
+            p.unlink()
+
+        p = self.cls(self.base, 'dir')
+        p.mkdir()
+        os.mkfifo(p / 'mypipe')
+        p.rmtree()
+        self.assertFalse(p.exists())
+
+    @unittest.skipIf(sys.platform[:6] == 'cygwin',
+                     "This test can't be run on Cygwin (issue #1071513).")
+    @os_helper.skip_if_dac_override
+    @os_helper.skip_unless_working_chmod
+    def test_rmtree_deleted_race_condition(self):
+        # bpo-37260
+        #
+        # Test that a file or a directory deleted after it is enumerated
+        # by scandir() but before unlink() or rmdr() is called doesn't
+        # generate any errors.
+        def on_error(exc):
+            assert exc.filename
+            if not isinstance(exc, PermissionError):
+                raise
+            # Make the parent and the children writeable.
+            for p, mode in zip(paths, old_modes):
+                p.chmod(mode)
+            # Remove other dirs except one.
+            keep = next(p for p in dirs if str(p) != exc.filename)
+            for p in dirs:
+                if p != keep:
+                    p.rmdir()
+            # Remove other files except one.
+            keep = next(p for p in files if str(p) != exc.filename)
+            for p in files:
+                if p != keep:
+                    p.unlink()
+
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
+        dirs = paths[1::2]
+        files = paths[2::2]
+        for path in dirs:
+            path.mkdir()
+        for path in files:
+            path.write_text('')
+
+        old_modes = [path.stat().st_mode for path in paths]
+
+        # Make the parent and the children non-writeable.
+        new_mode = stat.S_IREAD | stat.S_IEXEC
+        for path in reversed(paths):
+            path.chmod(new_mode)
+
+        try:
+            tmp.rmtree(on_error=on_error)
+        except:
+            # Test failed, so cleanup artifacts.
+            for path, mode in zip(paths, old_modes):
+                try:
+                    path.chmod(mode)
+                except OSError:
+                    pass
+            tmp.rmtree()
+            raise
+
+    def test_rmtree_does_not_choke_on_failing_lstat(self):
+        try:
+            orig_lstat = os.lstat
+            tmp = self.cls(self.base, 'rmtree')
+
+            def raiser(fn, *args, **kwargs):
+                if fn != str(tmp):
+                    raise OSError()
+                else:
+                    return orig_lstat(fn)
+
+            os.lstat = raiser
+
+            tmp.mkdir()
+            foo = tmp / 'foo'
+            foo.write_text('')
+            tmp.rmtree()
+        finally:
+            os.lstat = orig_lstat
+
     @os_helper.skip_unless_hardlink
     def test_hardlink_to(self):
         P = self.cls(self.base)
index 28c9664cc90fe1970fd99ec87413c7680d21f119..37678c5d799e9a2f29d87b8a96c6f961181bf4a3 100644 (file)
@@ -2641,6 +2641,105 @@ class DummyPathTest(DummyPurePathTest):
         self.assertFileNotFound(p.stat)
         self.assertFileNotFound(p.unlink)
 
+    def test_rmtree(self):
+        base = self.cls(self.base)
+        base.joinpath('dirA').rmtree()
+        self.assertRaises(FileNotFoundError, base.joinpath('dirA').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirA', 'linkC').lstat)
+        base.joinpath('dirB').rmtree()
+        self.assertRaises(FileNotFoundError, base.joinpath('dirB').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'fileB').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'linkD').lstat)
+        base.joinpath('dirC').rmtree()
+        self.assertRaises(FileNotFoundError, base.joinpath('dirC').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD', 'fileD').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'fileC').stat)
+        self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'novel.txt').stat)
+
+    def test_rmtree_errors(self):
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        # filename is guaranteed not to exist
+        filename = tmp / 'foo'
+        self.assertRaises(FileNotFoundError, filename.rmtree)
+        # test that ignore_errors option is honored
+        filename.rmtree(ignore_errors=True)
+
+        # existing file
+        filename = tmp / "tstfile"
+        filename.write_text("")
+        with self.assertRaises(NotADirectoryError) as cm:
+            filename.rmtree()
+        self.assertEqual(cm.exception.filename, str(filename))
+        self.assertTrue(filename.exists())
+        # test that ignore_errors option is honored
+        filename.rmtree(ignore_errors=True)
+        self.assertTrue(filename.exists())
+
+    def test_rmtree_on_error(self):
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        filename = tmp / "tstfile"
+        filename.write_text("")
+        errors = []
+
+        def on_error(error):
+            errors.append(error)
+
+        filename.rmtree(on_error=on_error)
+        self.assertEqual(len(errors), 2)
+        # First from scandir()
+        self.assertIsInstance(errors[0], NotADirectoryError)
+        self.assertEqual(errors[0].filename, str(filename))
+        # Then from munlink()
+        self.assertIsInstance(errors[1], NotADirectoryError)
+        self.assertEqual(errors[1].filename, str(filename))
+
+    @needs_symlinks
+    def test_rmtree_outer_symlink(self):
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        dir_ = tmp / 'dir'
+        dir_.mkdir()
+        link = tmp / 'link'
+        link.symlink_to(dir_)
+        self.assertRaises(OSError, link.rmtree)
+        self.assertTrue(dir_.exists())
+        self.assertTrue(link.exists(follow_symlinks=False))
+        errors = []
+
+        def on_error(error):
+            errors.append(error)
+
+        link.rmtree(on_error=on_error)
+        self.assertEqual(len(errors), 1)
+        self.assertIsInstance(errors[0], OSError)
+        self.assertEqual(errors[0].filename, str(link))
+
+    @needs_symlinks
+    def test_rmtree_inner_symlink(self):
+        tmp = self.cls(self.base, 'rmtree')
+        tmp.mkdir()
+        dir1 = tmp / 'dir1'
+        dir2 = dir1 / 'dir2'
+        dir3 = tmp / 'dir3'
+        for d in dir1, dir2, dir3:
+            d.mkdir()
+        file1 = tmp / 'file1'
+        file1.write_text('foo')
+        link1 = dir1 / 'link1'
+        link1.symlink_to(dir2)
+        link2 = dir1 / 'link2'
+        link2.symlink_to(dir3)
+        link3 = dir1 / 'link3'
+        link3.symlink_to(file1)
+        # make sure symlinks are removed but not followed
+        dir1.rmtree()
+        self.assertFalse(dir1.exists())
+        self.assertTrue(dir3.exists())
+        self.assertTrue(file1.exists())
+
     def setUpWalk(self):
         # Build:
         #     TESTFN/
diff --git a/Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst b/Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst
new file mode 100644 (file)
index 0000000..9aa7a7d
--- /dev/null
@@ -0,0 +1 @@
+Add :meth:`pathlib.Path.rmtree`, which recursively removes a directory.