"""
sys.audit("os.walk", top, topdown, onerror, followlinks)
- return _walk(fspath(top), topdown, onerror, followlinks)
-
-def _walk(top, topdown, onerror, followlinks):
- dirs = []
- nondirs = []
- walk_dirs = []
-
- # We may not have read permission for top, in which case we can't
- # get a list of the files the directory contains. os.walk
- # always suppressed the exception then, rather than blow up for a
- # minor reason when (say) a thousand readable directories are still
- # left to visit. That logic is copied here.
- try:
- # Note that scandir is global in this module due
- # to earlier import-*.
- scandir_it = scandir(top)
- except OSError as error:
- if onerror is not None:
- onerror(error)
- return
- with scandir_it:
- while True:
- try:
+ stack = [(False, fspath(top))]
+ islink, join = path.islink, path.join
+ while stack:
+ must_yield, top = stack.pop()
+ if must_yield:
+ yield top
+ continue
+
+ dirs = []
+ nondirs = []
+ walk_dirs = []
+
+ # We may not have read permission for top, in which case we can't
+ # get a list of the files the directory contains.
+ # We suppress the exception here, rather than blow up for a
+ # minor reason when (say) a thousand readable directories are still
+ # left to visit.
+ try:
+ scandir_it = scandir(top)
+ except OSError as error:
+ if onerror is not None:
+ onerror(error)
+ continue
+
+ cont = False
+ with scandir_it:
+ while True:
try:
- entry = next(scandir_it)
- except StopIteration:
+ try:
+ entry = next(scandir_it)
+ except StopIteration:
+ break
+ except OSError as error:
+ if onerror is not None:
+ onerror(error)
+ cont = True
break
- except OSError as error:
- if onerror is not None:
- onerror(error)
- return
- try:
- is_dir = entry.is_dir()
- except OSError:
- # If is_dir() raises an OSError, consider that the entry is not
- # a directory, same behaviour than os.path.isdir().
- is_dir = False
-
- if is_dir:
- dirs.append(entry.name)
- else:
- nondirs.append(entry.name)
+ try:
+ is_dir = entry.is_dir()
+ except OSError:
+ # If is_dir() raises an OSError, consider the entry not to
+ # be a directory, same behaviour as os.path.isdir().
+ is_dir = False
- if not topdown and is_dir:
- # Bottom-up: recurse into sub-directory, but exclude symlinks to
- # directories if followlinks is False
- if followlinks:
- walk_into = True
+ if is_dir:
+ dirs.append(entry.name)
else:
- try:
- is_symlink = entry.is_symlink()
- except OSError:
- # If is_symlink() raises an OSError, consider that the
- # entry is not a symbolic link, same behaviour than
- # os.path.islink().
- is_symlink = False
- walk_into = not is_symlink
-
- if walk_into:
- walk_dirs.append(entry.path)
-
- # Yield before recursion if going top down
- if topdown:
- yield top, dirs, nondirs
-
- # Recurse into sub-directories
- islink, join = path.islink, path.join
- for dirname in dirs:
- new_path = join(top, dirname)
- # Issue #23605: os.path.islink() is used instead of caching
- # entry.is_symlink() result during the loop on os.scandir() because
- # the caller can replace the directory entry during the "yield"
- # above.
- if followlinks or not islink(new_path):
- yield from _walk(new_path, topdown, onerror, followlinks)
- else:
- # Recurse into sub-directories
- for new_path in walk_dirs:
- yield from _walk(new_path, topdown, onerror, followlinks)
- # Yield after recursion if going bottom up
- yield top, dirs, nondirs
+ nondirs.append(entry.name)
+
+ if not topdown and is_dir:
+ # Bottom-up: traverse into sub-directory, but exclude
+ # symlinks to directories if followlinks is False
+ if followlinks:
+ walk_into = True
+ else:
+ try:
+ is_symlink = entry.is_symlink()
+ except OSError:
+ # If is_symlink() raises an OSError, consider the
+ # entry not to be a symbolic link, same behaviour
+ # as os.path.islink().
+ is_symlink = False
+ walk_into = not is_symlink
+
+ if walk_into:
+ walk_dirs.append(entry.path)
+ if cont:
+ continue
+
+ if topdown:
+ # Yield before sub-directory traversal if going top down
+ yield top, dirs, nondirs
+ # Traverse into sub-directories
+ for dirname in reversed(dirs):
+ new_path = join(top, dirname)
+ # bpo-23605: os.path.islink() is used instead of caching
+ # entry.is_symlink() result during the loop on os.scandir() because
+ # the caller can replace the directory entry during the "yield"
+ # above.
+ if followlinks or not islink(new_path):
+ stack.append((False, new_path))
+ else:
+ # Yield after sub-directory traversal if going bottom up
+ stack.append((True, (top, dirs, nondirs)))
+ # Traverse into sub-directories
+ for new_path in reversed(walk_dirs):
+ stack.append((False, new_path))
__all__.append("walk")
testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
@contextlib.contextmanager
+def set_recursion_limit(limit):
+ """Temporarily change the recursion limit."""
+ original_limit = sys.getrecursionlimit()
+ try:
+ sys.setrecursionlimit(limit)
+ yield
+ finally:
+ sys.setrecursionlimit(original_limit)
+
def infinite_recursion(max_depth=75):
"""Set a lower limit for tests that interact with infinite recursions
(e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
debug windows builds, due to not enough functions being inlined the
stack size might not handle the default recursion limit (1000). See
bpo-11105 for details."""
+ return set_recursion_limit(max_depth)
- original_depth = sys.getrecursionlimit()
- try:
- sys.setrecursionlimit(max_depth)
- yield
- finally:
- sys.setrecursionlimit(original_depth)
def ignore_deprecations_from(module: str, *, like: str) -> object:
token = object()
from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
+from test.support import set_recursion_limit
from test.support import warnings_helper
from platform import win32_is_iot
self.assertEqual(next(it), expected)
p = os.path.join(p, 'd')
+ def test_walk_above_recursion_limit(self):
+ depth = 50
+ os.makedirs(os.path.join(self.walk_path, *(['d'] * depth)))
+ with set_recursion_limit(depth - 5):
+ all = list(self.walk(self.walk_path))
+
+ sub2_path = self.sub2_tree[0]
+ for root, dirs, files in all:
+ if root == sub2_path:
+ dirs.sort()
+ files.sort()
+
+ d_entries = []
+ d_path = self.walk_path
+ for _ in range(depth):
+ d_path = os.path.join(d_path, "d")
+ d_entries.append((d_path, ["d"], []))
+ d_entries[-1][1].clear()
+
+ # Sub-sequences where the order is known
+ sections = {
+ "SUB1": [
+ (self.sub1_path, ["SUB11"], ["tmp2"]),
+ (self.sub11_path, [], []),
+ ],
+ "SUB2": [self.sub2_tree],
+ "d": d_entries,
+ }
+
+ # The ordering of sub-dirs is arbitrary but determines the order in
+ # which sub-sequences appear
+ dirs = all[0][1]
+ expected = [(self.walk_path, dirs, ["tmp1"])]
+ for d in dirs:
+ expected.extend(sections[d])
+
+ self.assertEqual(len(all), depth + 4)
+ self.assertEqual(sorted(dirs), ["SUB1", "SUB2", "d"])
+ self.assertEqual(all, expected)
+
@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
class FwalkTests(WalkTests):
# fwalk() keeps file descriptors open
test_walk_many_open_files = None
+ # fwalk() still uses recursion
+ test_walk_above_recursion_limit = None
class BytesWalkTests(WalkTests):