from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
-from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import (
+ S_IMODE, S_ISDIR, S_ISREG, S_ISLNK, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO,
+)
from _collections_abc import Sequence
try:
grp = None
from pathlib._os import (
- PathInfo, DirEntryInfo,
vfsopen, vfspath,
ensure_different_files, ensure_distinct_paths,
- copyfile2, copyfileobj, copy_info,
+ copyfile2, copyfileobj,
)
__slots__ = ()
+class _Info:
+ __slots__ = ('_path',)
+
+ def __init__(self, path):
+ self._path = path
+
+ def __repr__(self):
+ path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
+ return f"<{path_type}.info>"
+
+ def _stat(self, *, follow_symlinks=True):
+ """Return the status as an os.stat_result."""
+ raise NotImplementedError
+
+ def _posix_permissions(self, *, follow_symlinks=True):
+ """Return the POSIX file permissions."""
+ return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode)
+
+ def _file_id(self, *, follow_symlinks=True):
+ """Returns the identifier of the file."""
+ st = self._stat(follow_symlinks=follow_symlinks)
+ return st.st_dev, st.st_ino
+
+ def _access_time_ns(self, *, follow_symlinks=True):
+ """Return the access time in nanoseconds."""
+ return self._stat(follow_symlinks=follow_symlinks).st_atime_ns
+
+ def _mod_time_ns(self, *, follow_symlinks=True):
+ """Return the modify time in nanoseconds."""
+ return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns
+
+ if hasattr(os.stat_result, 'st_flags'):
+ def _bsd_flags(self, *, follow_symlinks=True):
+ """Return the flags."""
+ return self._stat(follow_symlinks=follow_symlinks).st_flags
+
+ if hasattr(os, 'listxattr'):
+ def _xattrs(self, *, follow_symlinks=True):
+ """Return the xattrs as a list of (attr, value) pairs, or an empty
+ list if extended attributes aren't supported."""
+ try:
+ return [
+ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+ for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+ except OSError as err:
+ if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ return []
+
+
+_STAT_RESULT_ERROR = [] # falsy sentinel indicating stat() failed.
+
+
+class _StatResultInfo(_Info):
+ """Implementation of pathlib.types.PathInfo that provides status
+ information by querying a wrapped os.stat_result object. Don't try to
+ construct it yourself."""
+ __slots__ = ('_stat_result', '_lstat_result')
+
+ def __init__(self, path):
+ super().__init__(path)
+ self._stat_result = None
+ self._lstat_result = None
+
+ def _stat(self, *, follow_symlinks=True):
+ """Return the status as an os.stat_result."""
+ if follow_symlinks:
+ if not self._stat_result:
+ try:
+ self._stat_result = os.stat(self._path)
+ except (OSError, ValueError):
+ self._stat_result = _STAT_RESULT_ERROR
+ raise
+ return self._stat_result
+ else:
+ if not self._lstat_result:
+ try:
+ self._lstat_result = os.lstat(self._path)
+ except (OSError, ValueError):
+ self._lstat_result = _STAT_RESULT_ERROR
+ raise
+ return self._lstat_result
+
+ def exists(self, *, follow_symlinks=True):
+ """Whether this path exists."""
+ if follow_symlinks:
+ if self._stat_result is _STAT_RESULT_ERROR:
+ return False
+ else:
+ if self._lstat_result is _STAT_RESULT_ERROR:
+ return False
+ try:
+ self._stat(follow_symlinks=follow_symlinks)
+ except (OSError, ValueError):
+ return False
+ return True
+
+ def is_dir(self, *, follow_symlinks=True):
+ """Whether this path is a directory."""
+ if follow_symlinks:
+ if self._stat_result is _STAT_RESULT_ERROR:
+ return False
+ else:
+ if self._lstat_result is _STAT_RESULT_ERROR:
+ return False
+ try:
+ st = self._stat(follow_symlinks=follow_symlinks)
+ except (OSError, ValueError):
+ return False
+ return S_ISDIR(st.st_mode)
+
+ def is_file(self, *, follow_symlinks=True):
+ """Whether this path is a regular file."""
+ if follow_symlinks:
+ if self._stat_result is _STAT_RESULT_ERROR:
+ return False
+ else:
+ if self._lstat_result is _STAT_RESULT_ERROR:
+ return False
+ try:
+ st = self._stat(follow_symlinks=follow_symlinks)
+ except (OSError, ValueError):
+ return False
+ return S_ISREG(st.st_mode)
+
+ def is_symlink(self):
+ """Whether this path is a symbolic link."""
+ if self._lstat_result is _STAT_RESULT_ERROR:
+ return False
+ try:
+ st = self._stat(follow_symlinks=False)
+ except (OSError, ValueError):
+ return False
+ return S_ISLNK(st.st_mode)
+
+
+class _DirEntryInfo(_Info):
+ """Implementation of pathlib.types.PathInfo that provides status
+ information by querying a wrapped os.DirEntry object. Don't try to
+ construct it yourself."""
+ __slots__ = ('_entry',)
+
+ def __init__(self, entry):
+ super().__init__(entry.path)
+ self._entry = entry
+
+ def _stat(self, *, follow_symlinks=True):
+ """Return the status as an os.stat_result."""
+ return self._entry.stat(follow_symlinks=follow_symlinks)
+
+ def exists(self, *, follow_symlinks=True):
+ """Whether this path exists."""
+ if not follow_symlinks:
+ return True
+ try:
+ self._stat(follow_symlinks=follow_symlinks)
+ except OSError:
+ return False
+ return True
+
+ def is_dir(self, *, follow_symlinks=True):
+ """Whether this path is a directory."""
+ try:
+ return self._entry.is_dir(follow_symlinks=follow_symlinks)
+ except OSError:
+ return False
+
+ def is_file(self, *, follow_symlinks=True):
+ """Whether this path is a regular file."""
+ try:
+ return self._entry.is_file(follow_symlinks=follow_symlinks)
+ except OSError:
+ return False
+
+ def is_symlink(self):
+ """Whether this path is a symbolic link."""
+ try:
+ return self._entry.is_symlink()
+ except OSError:
+ return False
+
+
+def _copy_info(info, target, follow_symlinks=True):
+ """Copy metadata from the given PathInfo to the given local path."""
+ copy_times_ns = (
+ hasattr(info, '_access_time_ns') and
+ hasattr(info, '_mod_time_ns') and
+ (follow_symlinks or os.utime in os.supports_follow_symlinks))
+ if copy_times_ns:
+ t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
+ t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
+ os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
+
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ copy_xattrs = (
+ hasattr(info, '_xattrs') and
+ hasattr(os, 'setxattr') and
+ (follow_symlinks or os.setxattr in os.supports_follow_symlinks))
+ if copy_xattrs:
+ xattrs = info._xattrs(follow_symlinks=follow_symlinks)
+ for attr, value in xattrs:
+ try:
+ os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+
+ copy_posix_permissions = (
+ hasattr(info, '_posix_permissions') and
+ (follow_symlinks or os.chmod in os.supports_follow_symlinks))
+ if copy_posix_permissions:
+ posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
+ try:
+ os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
+ except NotImplementedError:
+ # if we got a NotImplementedError, it's because
+ # * follow_symlinks=False,
+ # * lchown() is unavailable, and
+ # * either
+ # * fchownat() is unavailable or
+ # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+ # (it returned ENOSUP.)
+ # therefore we're out of options--we simply cannot chown the
+ # symlink. give up, suppress the error.
+ # (which is what shutil always did in this circumstance.)
+ pass
+
+ copy_bsd_flags = (
+ hasattr(info, '_bsd_flags') and
+ hasattr(os, 'chflags') and
+ (follow_symlinks or os.chflags in os.supports_follow_symlinks))
+ if copy_bsd_flags:
+ bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
+ try:
+ os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
+ except OSError as why:
+ if why.errno not in (EOPNOTSUPP, ENOTSUP):
+ raise
+
+
class Path(PurePath):
"""PurePath subclass that can make system calls.
try:
return self._info
except AttributeError:
- self._info = PathInfo(self)
+ self._info = _StatResultInfo(str(self))
return self._info
def stat(self, *, follow_symlinks=True):
def _from_dir_entry(self, dir_entry, path_str):
path = self.with_segments(path_str)
path._str = path_str
- path._info = DirEntryInfo(dir_entry)
+ path._info = _DirEntryInfo(dir_entry)
return path
def iterdir(self):
self.joinpath(child.name)._copy_from(
child, follow_symlinks, preserve_metadata)
if preserve_metadata:
- copy_info(source.info, self)
+ _copy_info(source.info, self)
else:
self._copy_from_file(source, preserve_metadata)
with open(self, 'wb') as target_f:
copyfileobj(source_f, target_f)
if preserve_metadata:
- copy_info(source.info, self)
+ _copy_info(source.info, self)
if copyfile2:
# Use fast OS routine for local file copying where available.
def _copy_from_symlink(self, source, preserve_metadata=False):
os.symlink(vfspath(source.readlink()), self, source.info.is_dir())
if preserve_metadata:
- copy_info(source.info, self, follow_symlinks=False)
+ _copy_info(source.info, self, follow_symlinks=False)
else:
def _copy_from_symlink(self, source, preserve_metadata=False):
os.symlink(vfspath(source.readlink()), self)
if preserve_metadata:
- copy_info(source.info, self, follow_symlinks=False)
+ _copy_info(source.info, self, follow_symlinks=False)
def move(self, target):
"""
from errno import *
from io import TextIOWrapper, text_encoding
-from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE
import os
import sys
try:
err.filename = vfspath(source)
err.filename2 = vfspath(target)
raise err
-
-
-def copy_info(info, target, follow_symlinks=True):
- """Copy metadata from the given PathInfo to the given local path."""
- copy_times_ns = (
- hasattr(info, '_access_time_ns') and
- hasattr(info, '_mod_time_ns') and
- (follow_symlinks or os.utime in os.supports_follow_symlinks))
- if copy_times_ns:
- t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
- t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
- os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
-
- # We must copy extended attributes before the file is (potentially)
- # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- copy_xattrs = (
- hasattr(info, '_xattrs') and
- hasattr(os, 'setxattr') and
- (follow_symlinks or os.setxattr in os.supports_follow_symlinks))
- if copy_xattrs:
- xattrs = info._xattrs(follow_symlinks=follow_symlinks)
- for attr, value in xattrs:
- try:
- os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
- except OSError as e:
- if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
-
- copy_posix_permissions = (
- hasattr(info, '_posix_permissions') and
- (follow_symlinks or os.chmod in os.supports_follow_symlinks))
- if copy_posix_permissions:
- posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
- try:
- os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
- except NotImplementedError:
- # if we got a NotImplementedError, it's because
- # * follow_symlinks=False,
- # * lchown() is unavailable, and
- # * either
- # * fchownat() is unavailable or
- # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
- # (it returned ENOSUP.)
- # therefore we're out of options--we simply cannot chown the
- # symlink. give up, suppress the error.
- # (which is what shutil always did in this circumstance.)
- pass
-
- copy_bsd_flags = (
- hasattr(info, '_bsd_flags') and
- hasattr(os, 'chflags') and
- (follow_symlinks or os.chflags in os.supports_follow_symlinks))
- if copy_bsd_flags:
- bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
- try:
- os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
- except OSError as why:
- if why.errno not in (EOPNOTSUPP, ENOTSUP):
- raise
-
-
-class _PathInfoBase:
- __slots__ = ('_path', '_stat_result', '_lstat_result')
-
- def __init__(self, path):
- self._path = str(path)
-
- def __repr__(self):
- path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
- return f"<{path_type}.info>"
-
- def _stat(self, *, follow_symlinks=True, ignore_errors=False):
- """Return the status as an os.stat_result, or None if stat() fails and
- ignore_errors is true."""
- if follow_symlinks:
- try:
- result = self._stat_result
- except AttributeError:
- pass
- else:
- if ignore_errors or result is not None:
- return result
- try:
- self._stat_result = os.stat(self._path)
- except (OSError, ValueError):
- self._stat_result = None
- if not ignore_errors:
- raise
- return self._stat_result
- else:
- try:
- result = self._lstat_result
- except AttributeError:
- pass
- else:
- if ignore_errors or result is not None:
- return result
- try:
- self._lstat_result = os.lstat(self._path)
- except (OSError, ValueError):
- self._lstat_result = None
- if not ignore_errors:
- raise
- return self._lstat_result
-
- def _posix_permissions(self, *, follow_symlinks=True):
- """Return the POSIX file permissions."""
- return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode)
-
- def _file_id(self, *, follow_symlinks=True):
- """Returns the identifier of the file."""
- st = self._stat(follow_symlinks=follow_symlinks)
- return st.st_dev, st.st_ino
-
- def _access_time_ns(self, *, follow_symlinks=True):
- """Return the access time in nanoseconds."""
- return self._stat(follow_symlinks=follow_symlinks).st_atime_ns
-
- def _mod_time_ns(self, *, follow_symlinks=True):
- """Return the modify time in nanoseconds."""
- return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns
-
- if hasattr(os.stat_result, 'st_flags'):
- def _bsd_flags(self, *, follow_symlinks=True):
- """Return the flags."""
- return self._stat(follow_symlinks=follow_symlinks).st_flags
-
- if hasattr(os, 'listxattr'):
- def _xattrs(self, *, follow_symlinks=True):
- """Return the xattrs as a list of (attr, value) pairs, or an empty
- list if extended attributes aren't supported."""
- try:
- return [
- (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
- for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
- except OSError as err:
- if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- return []
-
-
-class _WindowsPathInfo(_PathInfoBase):
- """Implementation of pathlib.types.PathInfo that provides status
- information for Windows paths. Don't try to construct it yourself."""
- __slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink')
-
- def exists(self, *, follow_symlinks=True):
- """Whether this path exists."""
- if not follow_symlinks and self.is_symlink():
- return True
- try:
- return self._exists
- except AttributeError:
- if os.path.exists(self._path):
- self._exists = True
- return True
- else:
- self._exists = self._is_dir = self._is_file = False
- return False
-
- def is_dir(self, *, follow_symlinks=True):
- """Whether this path is a directory."""
- if not follow_symlinks and self.is_symlink():
- return False
- try:
- return self._is_dir
- except AttributeError:
- if os.path.isdir(self._path):
- self._is_dir = self._exists = True
- return True
- else:
- self._is_dir = False
- return False
-
- def is_file(self, *, follow_symlinks=True):
- """Whether this path is a regular file."""
- if not follow_symlinks and self.is_symlink():
- return False
- try:
- return self._is_file
- except AttributeError:
- if os.path.isfile(self._path):
- self._is_file = self._exists = True
- return True
- else:
- self._is_file = False
- return False
-
- def is_symlink(self):
- """Whether this path is a symbolic link."""
- try:
- return self._is_symlink
- except AttributeError:
- self._is_symlink = os.path.islink(self._path)
- return self._is_symlink
-
-
-class _PosixPathInfo(_PathInfoBase):
- """Implementation of pathlib.types.PathInfo that provides status
- information for POSIX paths. Don't try to construct it yourself."""
- __slots__ = ()
-
- def exists(self, *, follow_symlinks=True):
- """Whether this path exists."""
- st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
- if st is None:
- return False
- return True
-
- def is_dir(self, *, follow_symlinks=True):
- """Whether this path is a directory."""
- st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
- if st is None:
- return False
- return S_ISDIR(st.st_mode)
-
- def is_file(self, *, follow_symlinks=True):
- """Whether this path is a regular file."""
- st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
- if st is None:
- return False
- return S_ISREG(st.st_mode)
-
- def is_symlink(self):
- """Whether this path is a symbolic link."""
- st = self._stat(follow_symlinks=False, ignore_errors=True)
- if st is None:
- return False
- return S_ISLNK(st.st_mode)
-
-
-PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo
-
-
-class DirEntryInfo(_PathInfoBase):
- """Implementation of pathlib.types.PathInfo that provides status
- information by querying a wrapped os.DirEntry object. Don't try to
- construct it yourself."""
- __slots__ = ('_entry',)
-
- def __init__(self, entry):
- super().__init__(entry.path)
- self._entry = entry
-
- def _stat(self, *, follow_symlinks=True, ignore_errors=False):
- try:
- return self._entry.stat(follow_symlinks=follow_symlinks)
- except OSError:
- if not ignore_errors:
- raise
- return None
-
- def exists(self, *, follow_symlinks=True):
- """Whether this path exists."""
- if not follow_symlinks:
- return True
- return self._stat(ignore_errors=True) is not None
-
- def is_dir(self, *, follow_symlinks=True):
- """Whether this path is a directory."""
- try:
- return self._entry.is_dir(follow_symlinks=follow_symlinks)
- except OSError:
- return False
-
- def is_file(self, *, follow_symlinks=True):
- """Whether this path is a regular file."""
- try:
- return self._entry.is_file(follow_symlinks=follow_symlinks)
- except OSError:
- return False
-
- def is_symlink(self):
- """Whether this path is a symbolic link."""
- try:
- return self._entry.is_symlink()
- except OSError:
- return False