]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
pathlib ABCs: remove duplicate `realpath()` implementation. (#119178)
authorBarney Gale <barney.gale@gmail.com>
Wed, 5 Jun 2024 17:54:50 +0000 (18:54 +0100)
committerGitHub <noreply@github.com>
Wed, 5 Jun 2024 17:54:50 +0000 (18:54 +0100)
Add private `posixpath._realpath()` function, which is a generic version of `realpath()` that can be parameterised with string tokens (`sep`, `curdir`, `pardir`) and query functions (`getcwd`, `lstat`, `readlink`). Also add support for limiting the number of symlink traversals.

In the private `pathlib._abc.PathBase` class, call `posixpath._realpath()` and remove our re-implementation of the same algorithm.

No change to any public APIs, either in `posixpath` or `pathlib`.

Co-authored-by: Nice Zombies <nineteendo19d0@gmail.com>
Lib/pathlib/_abc.py
Lib/posixpath.py

index d7471b6927331d3a45f2c47ac17b2a0e9499775d..1a74f457c3f5a7b48cc9fac1dcceb16473abfcba 100644 (file)
@@ -12,8 +12,8 @@ resemble pathlib's PurePath and Path respectively.
 """
 
 import functools
+import posixpath
 from glob import _Globber, _no_recurse_symlinks
-from errno import ENOTDIR, ELOOP
 from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
 
 
@@ -696,65 +696,34 @@ class PathBase(PurePathBase):
         """
         if self._resolving:
             return self
-        path_root, parts = self._stack
-        path = self.with_segments(path_root)
-        try:
-            path = path.absolute()
-        except UnsupportedOperation:
-            path_tail = []
-        else:
-            path_root, path_tail = path._stack
-            path_tail.reverse()
-
-        # If the user has *not* overridden the `readlink()` method, then symlinks are unsupported
-        # and (in non-strict mode) we can improve performance by not calling `stat()`.
-        querying = strict or getattr(self.readlink, '_supported', True)
-        link_count = 0
-        while parts:
-            part = parts.pop()
-            if not part or part == '.':
-                continue
-            if part == '..':
-                if not path_tail:
-                    if path_root:
-                        # Delete '..' segment immediately following root
-                        continue
-                elif path_tail[-1] != '..':
-                    # Delete '..' segment and its predecessor
-                    path_tail.pop()
-                    continue
-            path_tail.append(part)
-            if querying and part != '..':
-                path = self.with_segments(path_root + self.parser.sep.join(path_tail))
+
+        def getcwd():
+            return str(self.with_segments().absolute())
+
+        if strict or getattr(self.readlink, '_supported', True):
+            def lstat(path_str):
+                path = self.with_segments(path_str)
                 path._resolving = True
-                try:
-                    st = path.stat(follow_symlinks=False)
-                    if S_ISLNK(st.st_mode):
-                        # Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are
-                        # encountered during resolution.
-                        link_count += 1
-                        if link_count >= self._max_symlinks:
-                            raise OSError(ELOOP, "Too many symbolic links in path", self._raw_path)
-                        target_root, target_parts = path.readlink()._stack
-                        # If the symlink target is absolute (like '/etc/hosts'), set the current
-                        # path to its uppermost parent (like '/').
-                        if target_root:
-                            path_root = target_root
-                            path_tail.clear()
-                        else:
-                            path_tail.pop()
-                        # Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to
-                        # the stack of unresolved path parts.
-                        parts.extend(target_parts)
-                        continue
-                    elif parts and not S_ISDIR(st.st_mode):
-                        raise NotADirectoryError(ENOTDIR, "Not a directory", self._raw_path)
-                except OSError:
-                    if strict:
-                        raise
-                    else:
-                        querying = False
-        return self.with_segments(path_root + self.parser.sep.join(path_tail))
+                return path.lstat()
+
+            def readlink(path_str):
+                path = self.with_segments(path_str)
+                path._resolving = True
+                return str(path.readlink())
+        else:
+            # If the user has *not* overridden the `readlink()` method, then
+            # symlinks are unsupported and (in non-strict mode) we can improve
+            # performance by not calling `path.lstat()`.
+            def skip(path_str):
+                # This exception will be internally consumed by `_realpath()`.
+                raise OSError("Operation skipped.")
+
+            lstat = readlink = skip
+
+        return self.with_segments(posixpath._realpath(
+            str(self), strict, self.parser.sep,
+            getcwd=getcwd, lstat=lstat, readlink=readlink,
+            maxlinks=self._max_symlinks))
 
     def symlink_to(self, target, target_is_directory=False):
         """
index 47b2aa572e5c656c24a6eec064de65820f889bae..fccca4e066b76f1717cf6b9a25c44907f87448ab 100644 (file)
@@ -22,6 +22,7 @@ defpath = '/bin:/usr/bin'
 altsep = None
 devnull = '/dev/null'
 
+import errno
 import os
 import sys
 import stat
@@ -401,7 +402,10 @@ symbolic links encountered in the path."""
         curdir = '.'
         pardir = '..'
         getcwd = os.getcwd
+    return _realpath(filename, strict, sep, curdir, pardir, getcwd)
 
+def _realpath(filename, strict=False, sep=sep, curdir=curdir, pardir=pardir,
+              getcwd=os.getcwd, lstat=os.lstat, readlink=os.readlink, maxlinks=None):
     # The stack of unresolved path parts. When popped, a special value of None
     # indicates that a symlink target has been resolved, and that the original
     # symlink path can be retrieved by popping again. The [::-1] slice is a
@@ -418,6 +422,10 @@ symbolic links encountered in the path."""
     # the same links.
     seen = {}
 
+    # Number of symlinks traversed. When the number of traversals is limited
+    # by *maxlinks*, this is used instead of *seen* to detect symlink loops.
+    link_count = 0
+
     while rest:
         name = rest.pop()
         if name is None:
@@ -436,11 +444,19 @@ symbolic links encountered in the path."""
         else:
             newpath = path + sep + name
         try:
-            st = os.lstat(newpath)
+            st = lstat(newpath)
             if not stat.S_ISLNK(st.st_mode):
                 path = newpath
                 continue
-            if newpath in seen:
+            elif maxlinks is not None:
+                link_count += 1
+                if link_count > maxlinks:
+                    if strict:
+                        raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
+                                      newpath)
+                    path = newpath
+                    continue
+            elif newpath in seen:
                 # Already seen this path
                 path = seen[newpath]
                 if path is not None:
@@ -448,26 +464,28 @@ symbolic links encountered in the path."""
                     continue
                 # The symlink is not resolved, so we must have a symlink loop.
                 if strict:
-                    # Raise OSError(errno.ELOOP)
-                    os.stat(newpath)
+                    raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
+                                  newpath)
                 path = newpath
                 continue
-            target = os.readlink(newpath)
+            target = readlink(newpath)
         except OSError:
             if strict:
                 raise
             path = newpath
             continue
         # Resolve the symbolic link
-        seen[newpath] = None # not resolved symlink
         if target.startswith(sep):
             # Symlink target is absolute; reset resolved path.
             path = sep
-        # Push the symlink path onto the stack, and signal its specialness by
-        # also pushing None. When these entries are popped, we'll record the
-        # fully-resolved symlink target in the 'seen' mapping.
-        rest.append(newpath)
-        rest.append(None)
+        if maxlinks is None:
+            # Mark this symlink as seen but not fully resolved.
+            seen[newpath] = None
+            # Push the symlink path onto the stack, and signal its specialness
+            # by also pushing None. When these entries are popped, we'll
+            # record the fully-resolved symlink target in the 'seen' mapping.
+            rest.append(newpath)
+            rest.append(None)
         # Push the unresolved symlink target parts onto the stack.
         rest.extend(target.split(sep)[::-1])