yield path
except OSError:
pass
+
+ @classmethod
+ def walk(cls, root, top_down, on_error, follow_symlinks):
+ """Walk the directory tree from the given root, similar to os.walk().
+ """
+ paths = [root]
+ while paths:
+ path = paths.pop()
+ if isinstance(path, tuple):
+ yield path
+ continue
+ try:
+ with cls.scandir(path) as scandir_it:
+ dirnames = []
+ filenames = []
+ if not top_down:
+ paths.append((path, dirnames, filenames))
+ for entry in scandir_it:
+ name = entry.name
+ try:
+ if entry.is_dir(follow_symlinks=follow_symlinks):
+ if not top_down:
+ paths.append(cls.parse_entry(entry))
+ dirnames.append(name)
+ else:
+ filenames.append(name)
+ except OSError:
+ filenames.append(name)
+ except OSError as error:
+ if on_error is not None:
+ on_error(error)
+ else:
+ if top_down:
+ yield path, dirnames, filenames
+ if dirnames:
+ prefix = cls.add_slash(path)
+ paths += [cls.concat_path(prefix, d) for d in reversed(dirnames)]
"""
return (self._make_child_relpath(name) for name in os.listdir(self))
- def _scandir(self):
- return os.scandir(self)
-
- def _make_child_direntry(self, entry):
- # Transform an entry yielded from _scandir() into a path object.
- path_str = entry.name if str(self) == '.' else entry.path
- path = self.with_segments(path_str)
- path._str = path_str
- path._drv = self.drive
- path._root = self.root
- path._tail_cached = self._tail + [entry.name]
- return path
def _make_child_relpath(self, name):
if not name:
def walk(self, top_down=True, on_error=None, follow_symlinks=False):
"""Walk the directory tree from this directory, similar to os.walk()."""
sys.audit("pathlib.Path.walk", self, on_error, follow_symlinks)
- return _abc.PathBase.walk(
- self, top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks)
+ root_dir = str(self)
+ results = self._globber.walk(root_dir, top_down, on_error, follow_symlinks)
+ for path_str, dirnames, filenames in results:
+ if root_dir == '.':
+ path_str = path_str[2:]
+ yield self._from_parsed_string(path_str), dirnames, filenames
def absolute(self):
"""Return an absolute version of this path
class Globber(glob._Globber):
lstat = operator.methodcaller('lstat')
- scandir = operator.methodcaller('_scandir')
add_slash = operator.methodcaller('joinpath', '')
+ @staticmethod
+ def scandir(path):
+ # Emulate os.scandir(), which returns an object that can be used as a
+ # context manager. This method is called by walk() and glob().
+ from contextlib import nullcontext
+ return nullcontext(path.iterdir())
+
@staticmethod
def concat_path(path, text):
"""Appends text to the given path.
"""
raise UnsupportedOperation(self._unsupported_msg('iterdir()'))
- def _scandir(self):
- # Emulate os.scandir(), which returns an object that can be used as a
- # context manager. This method is called by walk() and glob().
- from contextlib import nullcontext
- return nullcontext(self.iterdir())
-
- def _make_child_direntry(self, entry):
- # Transform an entry yielded from _scandir() into a path object.
- # PathBase._scandir() yields PathBase objects, so this is a no-op.
- return entry
-
- def _make_child_relpath(self, name):
- return self.joinpath(name)
-
def _glob_selector(self, parts, case_sensitive, recurse_symlinks):
if case_sensitive is None:
case_sensitive = _is_case_sensitive(self.parser)
def walk(self, top_down=True, on_error=None, follow_symlinks=False):
"""Walk the directory tree from this directory, similar to os.walk()."""
- paths = [self]
-
- while paths:
- path = paths.pop()
- if isinstance(path, tuple):
- yield path
- continue
-
- # We may not have read permission for self, in which case we can't
- # get a list of the files the directory contains. os.walk()
- # always suppressed the exception in that instance, rather than
- # blow up for a minor reason when (say) a thousand readable
- # directories are still left to visit. That logic is copied here.
- try:
- scandir_obj = path._scandir()
- except OSError as error:
- if on_error is not None:
- on_error(error)
- continue
-
- with scandir_obj as scandir_it:
- dirnames = []
- filenames = []
- if not top_down:
- paths.append((path, dirnames, filenames))
- for entry in scandir_it:
- try:
- is_dir = entry.is_dir(follow_symlinks=follow_symlinks)
- except OSError:
- # Carried over from os.path.isdir().
- is_dir = False
-
- if is_dir:
- if not top_down:
- paths.append(path._make_child_direntry(entry))
- dirnames.append(entry.name)
- else:
- filenames.append(entry.name)
-
- if top_down:
- yield path, dirnames, filenames
- paths += [path._make_child_relpath(d) for d in reversed(dirnames)]
+ return self._globber.walk(self, top_down, on_error, follow_symlinks)
def absolute(self):
"""Return an absolute version of this path