Remove this directory. The directory must be empty.
+.. method:: Path.rmtree(ignore_errors=False, on_error=None)
+
+ Recursively delete this entire directory tree. The path must not refer to a symlink.
+
+ If *ignore_errors* is true, errors resulting from failed removals will be
+ ignored. If *ignore_errors* is false or omitted, and a function is given to
+ *on_error*, it will be called each time an exception is raised. If neither
+ *ignore_errors* nor *on_error* are supplied, exceptions are propagated to
+ the caller.
+
+ .. note::
+
+ On platforms that support the necessary fd-based functions, a symlink
+ attack-resistant version of :meth:`~Path.rmtree` is used by default. On
+ other platforms, the :func:`~Path.rmtree` implementation is susceptible
+ to a symlink attack: given proper timing and circumstances, attackers
+ can manipulate symlinks on the filesystem to delete files they would not
+ be able to access otherwise.
+
+ If the optional argument *on_error* is specified, it should be a callable;
+ it will be called with one argument of type :exc:`OSError`. The
+ callable can handle the error to continue the deletion process or re-raise
+ it to stop. Note that the filename is available as the :attr:`~OSError.filename`
+ attribute of the exception object.
+
+ .. versionadded:: 3.14
+
+
Permissions and ownership
^^^^^^^^^^^^^^^^^^^^^^^^^
pathlib
-------
-* Add :meth:`pathlib.Path.copy`, which copies the content of one file to
- another, like :func:`shutil.copyfile`.
- (Contributed by Barney Gale in :gh:`73991`.)
-* Add :meth:`pathlib.Path.copytree`, which copies one directory tree to
- another.
+* Add methods to :class:`pathlib.Path` to recursively copy or remove files:
+
+ * :meth:`~pathlib.Path.copy` copies the content of one file to another, like
+ :func:`shutil.copyfile`.
+ * :meth:`~pathlib.Path.copytree` copies one directory tree to another, like
+ :func:`shutil.copytree`.
+ * :meth:`~pathlib.Path.rmtree` recursively removes a directory tree, like
+ :func:`shutil.rmtree`.
+
(Contributed by Barney Gale in :gh:`73991`.)
pdb
"""
raise UnsupportedOperation(self._unsupported_msg('rmdir()'))
+ def rmtree(self, ignore_errors=False, on_error=None):
+ """
+ Recursively delete this directory tree.
+
+ If *ignore_errors* is true, exceptions raised from scanning the tree
+ and removing files and directories are ignored. Otherwise, if
+ *on_error* is set, it will be called to handle the error. If neither
+ *ignore_errors* nor *on_error* are set, exceptions are propagated to
+ the caller.
+ """
+ if ignore_errors:
+ def on_error(err):
+ pass
+ elif on_error is None:
+ def on_error(err):
+ raise err
+ try:
+ if self.is_symlink():
+ raise OSError("Cannot call rmtree on a symbolic link")
+ elif self.is_junction():
+ raise OSError("Cannot call rmtree on a junction")
+ results = self.walk(
+ on_error=on_error,
+ top_down=False, # Bottom-up so we rmdir() empty directories.
+ follow_symlinks=False)
+ for dirpath, dirnames, filenames in results:
+ for name in filenames:
+ try:
+ dirpath.joinpath(name).unlink()
+ except OSError as err:
+ on_error(err)
+ for name in dirnames:
+ try:
+ dirpath.joinpath(name).rmdir()
+ except OSError as err:
+ on_error(err)
+ self.rmdir()
+ except OSError as err:
+ err.filename = str(self)
+ on_error(err)
+
def owner(self, *, follow_symlinks=True):
"""
Return the login name of the file owner.
"""
os.rmdir(self)
+ def rmtree(self, ignore_errors=False, on_error=None):
+ """
+ Recursively delete this directory tree.
+
+ If *ignore_errors* is true, exceptions raised from scanning the tree
+ and removing files and directories are ignored. Otherwise, if
+ *on_error* is set, it will be called to handle the error. If neither
+ *ignore_errors* nor *on_error* are set, exceptions are propagated to
+ the caller.
+ """
+ if on_error:
+ def onexc(func, filename, err):
+ err.filename = filename
+ on_error(err)
+ else:
+ onexc = None
+ import shutil
+ shutil.rmtree(str(self), ignore_errors, onexc=onexc)
+
def rename(self, target):
"""
Rename this path to the target path.
from test.support import import_helper
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
+from test.support import swap_attr
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)
+rmtree_use_fd_functions = (
+ {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
+ os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)
+
#
# Tests for the pure classes.
#
self.assertEqual(expected_gid, gid_2)
self.assertEqual(expected_name, link.group(follow_symlinks=False))
+ def test_rmtree_uses_safe_fd_version_if_available(self):
+ if rmtree_use_fd_functions:
+ d = self.cls(self.base, 'a')
+ d.mkdir()
+ try:
+ real_open = os.open
+
+ class Called(Exception):
+ pass
+
+ def _raiser(*args, **kwargs):
+ raise Called
+
+ os.open = _raiser
+ self.assertRaises(Called, d.rmtree)
+ finally:
+ os.open = real_open
+
+ @unittest.skipIf(sys.platform[:6] == 'cygwin',
+ "This test can't be run on Cygwin (issue #1071513).")
+ @os_helper.skip_if_dac_override
+ @os_helper.skip_unless_working_chmod
+ def test_rmtree_unwritable(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ child_file_path = tmp / 'a'
+ child_dir_path = tmp / 'b'
+ child_file_path.write_text("")
+ child_dir_path.mkdir()
+ old_dir_mode = tmp.stat().st_mode
+ old_child_file_mode = child_file_path.stat().st_mode
+ old_child_dir_mode = child_dir_path.stat().st_mode
+ # Make unwritable.
+ new_mode = stat.S_IREAD | stat.S_IEXEC
+ try:
+ child_file_path.chmod(new_mode)
+ child_dir_path.chmod(new_mode)
+ tmp.chmod(new_mode)
+
+ errors = []
+ tmp.rmtree(on_error=errors.append)
+ # Test whether onerror has actually been called.
+ print(errors)
+ self.assertEqual(len(errors), 3)
+ finally:
+ tmp.chmod(old_dir_mode)
+ child_file_path.chmod(old_child_file_mode)
+ child_dir_path.chmod(old_child_dir_mode)
+
+ @needs_windows
+ def test_rmtree_inner_junction(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir1 = tmp / 'dir1'
+ dir2 = dir1 / 'dir2'
+ dir3 = tmp / 'dir3'
+ for d in dir1, dir2, dir3:
+ d.mkdir()
+ file1 = tmp / 'file1'
+ file1.write_text('foo')
+ link1 = dir1 / 'link1'
+ _winapi.CreateJunction(str(dir2), str(link1))
+ link2 = dir1 / 'link2'
+ _winapi.CreateJunction(str(dir3), str(link2))
+ link3 = dir1 / 'link3'
+ _winapi.CreateJunction(str(file1), str(link3))
+ # make sure junctions are removed but not followed
+ dir1.rmtree()
+ self.assertFalse(dir1.exists())
+ self.assertTrue(dir3.exists())
+ self.assertTrue(file1.exists())
+
+ @needs_windows
+ def test_rmtree_outer_junction(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ try:
+ src = tmp / 'cheese'
+ dst = tmp / 'shop'
+ src.mkdir()
+ spam = src / 'spam'
+ spam.write_text('')
+ _winapi.CreateJunction(str(src), str(dst))
+ self.assertRaises(OSError, dst.rmtree)
+ dst.rmtree(ignore_errors=True)
+ finally:
+ tmp.rmtree(ignore_errors=True)
+
+ @needs_windows
+ def test_rmtree_outer_junction_on_error(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir_ = tmp / 'dir'
+ dir_.mkdir()
+ link = tmp / 'link'
+ _winapi.CreateJunction(str(dir_), str(link))
+ try:
+ self.assertRaises(OSError, link.rmtree)
+ self.assertTrue(dir_.exists())
+ self.assertTrue(link.exists(follow_symlinks=False))
+ errors = []
+
+ def on_error(error):
+ errors.append(error)
+
+ link.rmtree(on_error=on_error)
+ self.assertEqual(len(errors), 1)
+ self.assertIsInstance(errors[0], OSError)
+ self.assertEqual(errors[0].filename, str(link))
+ finally:
+ os.unlink(str(link))
+
+ @unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
+ def test_rmtree_fails_on_close(self):
+ # Test that the error handler is called for failed os.close() and that
+ # os.close() is only called once for a file descriptor.
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir1 = tmp / 'dir1'
+ dir1.mkdir()
+ dir2 = dir1 / 'dir2'
+ dir2.mkdir()
+
+ def close(fd):
+ orig_close(fd)
+ nonlocal close_count
+ close_count += 1
+ raise OSError
+
+ close_count = 0
+ with swap_attr(os, 'close', close) as orig_close:
+ with self.assertRaises(OSError):
+ dir1.rmtree()
+ self.assertTrue(dir2.is_dir())
+ self.assertEqual(close_count, 2)
+
+ close_count = 0
+ errors = []
+
+ with swap_attr(os, 'close', close) as orig_close:
+ dir1.rmtree(on_error=errors.append)
+ print(errors)
+ self.assertEqual(len(errors), 2)
+ self.assertEqual(errors[0].filename, str(dir2))
+ self.assertEqual(errors[1].filename, str(dir1))
+ self.assertEqual(close_count, 2)
+
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_rmtree_on_named_pipe(self):
+ p = self.cls(self.base, 'pipe')
+ os.mkfifo(p)
+ try:
+ with self.assertRaises(NotADirectoryError):
+ p.rmtree()
+ self.assertTrue(p.exists())
+ finally:
+ p.unlink()
+
+ p = self.cls(self.base, 'dir')
+ p.mkdir()
+ os.mkfifo(p / 'mypipe')
+ p.rmtree()
+ self.assertFalse(p.exists())
+
+ @unittest.skipIf(sys.platform[:6] == 'cygwin',
+ "This test can't be run on Cygwin (issue #1071513).")
+ @os_helper.skip_if_dac_override
+ @os_helper.skip_unless_working_chmod
+ def test_rmtree_deleted_race_condition(self):
+ # bpo-37260
+ #
+ # Test that a file or a directory deleted after it is enumerated
+ # by scandir() but before unlink() or rmdr() is called doesn't
+ # generate any errors.
+ def on_error(exc):
+ assert exc.filename
+ if not isinstance(exc, PermissionError):
+ raise
+ # Make the parent and the children writeable.
+ for p, mode in zip(paths, old_modes):
+ p.chmod(mode)
+ # Remove other dirs except one.
+ keep = next(p for p in dirs if str(p) != exc.filename)
+ for p in dirs:
+ if p != keep:
+ p.rmdir()
+ # Remove other files except one.
+ keep = next(p for p in files if str(p) != exc.filename)
+ for p in files:
+ if p != keep:
+ p.unlink()
+
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
+ dirs = paths[1::2]
+ files = paths[2::2]
+ for path in dirs:
+ path.mkdir()
+ for path in files:
+ path.write_text('')
+
+ old_modes = [path.stat().st_mode for path in paths]
+
+ # Make the parent and the children non-writeable.
+ new_mode = stat.S_IREAD | stat.S_IEXEC
+ for path in reversed(paths):
+ path.chmod(new_mode)
+
+ try:
+ tmp.rmtree(on_error=on_error)
+ except:
+ # Test failed, so cleanup artifacts.
+ for path, mode in zip(paths, old_modes):
+ try:
+ path.chmod(mode)
+ except OSError:
+ pass
+ tmp.rmtree()
+ raise
+
+ def test_rmtree_does_not_choke_on_failing_lstat(self):
+ try:
+ orig_lstat = os.lstat
+ tmp = self.cls(self.base, 'rmtree')
+
+ def raiser(fn, *args, **kwargs):
+ if fn != str(tmp):
+ raise OSError()
+ else:
+ return orig_lstat(fn)
+
+ os.lstat = raiser
+
+ tmp.mkdir()
+ foo = tmp / 'foo'
+ foo.write_text('')
+ tmp.rmtree()
+ finally:
+ os.lstat = orig_lstat
+
@os_helper.skip_unless_hardlink
def test_hardlink_to(self):
P = self.cls(self.base)
self.assertFileNotFound(p.stat)
self.assertFileNotFound(p.unlink)
+ def test_rmtree(self):
+ base = self.cls(self.base)
+ base.joinpath('dirA').rmtree()
+ self.assertRaises(FileNotFoundError, base.joinpath('dirA').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirA', 'linkC').lstat)
+ base.joinpath('dirB').rmtree()
+ self.assertRaises(FileNotFoundError, base.joinpath('dirB').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'fileB').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'linkD').lstat)
+ base.joinpath('dirC').rmtree()
+ self.assertRaises(FileNotFoundError, base.joinpath('dirC').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD', 'fileD').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'fileC').stat)
+ self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'novel.txt').stat)
+
+ def test_rmtree_errors(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ # filename is guaranteed not to exist
+ filename = tmp / 'foo'
+ self.assertRaises(FileNotFoundError, filename.rmtree)
+ # test that ignore_errors option is honored
+ filename.rmtree(ignore_errors=True)
+
+ # existing file
+ filename = tmp / "tstfile"
+ filename.write_text("")
+ with self.assertRaises(NotADirectoryError) as cm:
+ filename.rmtree()
+ self.assertEqual(cm.exception.filename, str(filename))
+ self.assertTrue(filename.exists())
+ # test that ignore_errors option is honored
+ filename.rmtree(ignore_errors=True)
+ self.assertTrue(filename.exists())
+
+ def test_rmtree_on_error(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ filename = tmp / "tstfile"
+ filename.write_text("")
+ errors = []
+
+ def on_error(error):
+ errors.append(error)
+
+ filename.rmtree(on_error=on_error)
+ self.assertEqual(len(errors), 2)
+ # First from scandir()
+ self.assertIsInstance(errors[0], NotADirectoryError)
+ self.assertEqual(errors[0].filename, str(filename))
+ # Then from munlink()
+ self.assertIsInstance(errors[1], NotADirectoryError)
+ self.assertEqual(errors[1].filename, str(filename))
+
+ @needs_symlinks
+ def test_rmtree_outer_symlink(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir_ = tmp / 'dir'
+ dir_.mkdir()
+ link = tmp / 'link'
+ link.symlink_to(dir_)
+ self.assertRaises(OSError, link.rmtree)
+ self.assertTrue(dir_.exists())
+ self.assertTrue(link.exists(follow_symlinks=False))
+ errors = []
+
+ def on_error(error):
+ errors.append(error)
+
+ link.rmtree(on_error=on_error)
+ self.assertEqual(len(errors), 1)
+ self.assertIsInstance(errors[0], OSError)
+ self.assertEqual(errors[0].filename, str(link))
+
+ @needs_symlinks
+ def test_rmtree_inner_symlink(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir1 = tmp / 'dir1'
+ dir2 = dir1 / 'dir2'
+ dir3 = tmp / 'dir3'
+ for d in dir1, dir2, dir3:
+ d.mkdir()
+ file1 = tmp / 'file1'
+ file1.write_text('foo')
+ link1 = dir1 / 'link1'
+ link1.symlink_to(dir2)
+ link2 = dir1 / 'link2'
+ link2.symlink_to(dir3)
+ link3 = dir1 / 'link3'
+ link3.symlink_to(file1)
+ # make sure symlinks are removed but not followed
+ dir1.rmtree()
+ self.assertFalse(dir1.exists())
+ self.assertTrue(dir3.exists())
+ self.assertTrue(file1.exists())
+
def setUpWalk(self):
# Build:
# TESTFN/
--- /dev/null
+Add :meth:`pathlib.Path.rmtree`, which recursively removes a directory.