"""
import functools
-import io
import posixpath
-from errno import EINVAL
from glob import _PathGlobber, _no_recurse_symlinks
-from pathlib._os import copyfileobj
+from pathlib._os import magic_open, CopyReader, CopyWriter
@functools.cache
return path, names
-def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
- newline=None):
- """
- Open the file pointed to by this path and return a file object, as
- the built-in open() function does.
- """
- try:
- return io.open(path, mode, buffering, encoding, errors, newline)
- except TypeError:
- pass
- cls = type(path)
- text = 'b' not in mode
- mode = ''.join(sorted(c for c in mode if c not in 'bt'))
- if text:
- try:
- attr = getattr(cls, f'__open_{mode}__')
- except AttributeError:
- pass
- else:
- return attr(path, buffering, encoding, errors, newline)
-
- try:
- attr = getattr(cls, f'__open_{mode}b__')
- except AttributeError:
- pass
- else:
- stream = attr(path, buffering)
- if text:
- stream = io.TextIOWrapper(stream, encoding, errors, newline)
- return stream
-
- raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
-
-
-class CopyReader:
- """
- Class that implements the "read" part of copying between path objects.
- An instance of this class is available from the ReadablePath._copy_reader
- property.
- """
- __slots__ = ('_path',)
-
- def __init__(self, path):
- self._path = path
-
- _readable_metakeys = frozenset()
-
- def _read_metadata(self, metakeys, *, follow_symlinks=True):
- """
- Returns path metadata as a dict with string keys.
- """
- raise NotImplementedError
-
-
-class CopyWriter:
- """
- Class that implements the "write" part of copying between path objects. An
- instance of this class is available from the WritablePath._copy_writer
- property.
- """
- __slots__ = ('_path',)
-
- def __init__(self, path):
- self._path = path
-
- _writable_metakeys = frozenset()
-
- def _write_metadata(self, metadata, *, follow_symlinks=True):
- """
- Sets path metadata from the given dict with string keys.
- """
- raise NotImplementedError
-
- def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
- self._ensure_distinct_path(source)
- if preserve_metadata:
- metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
- else:
- metakeys = None
- if not follow_symlinks and source.is_symlink():
- self._create_symlink(source, metakeys)
- elif source.is_dir():
- self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
- else:
- self._create_file(source, metakeys)
- return self._path
-
- def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
- """Copy the given directory to our path."""
- children = list(source.iterdir())
- self._path.mkdir(exist_ok=dirs_exist_ok)
- for src in children:
- dst = self._path.joinpath(src.name)
- if not follow_symlinks and src.is_symlink():
- dst._copy_writer._create_symlink(src, metakeys)
- elif src.is_dir():
- dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
- else:
- dst._copy_writer._create_file(src, metakeys)
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys)
- if metadata:
- self._write_metadata(metadata)
-
- def _create_file(self, source, metakeys):
- """Copy the given file to our path."""
- self._ensure_different_file(source)
- with magic_open(source, 'rb') as source_f:
- try:
- with magic_open(self._path, 'wb') as target_f:
- copyfileobj(source_f, target_f)
- except IsADirectoryError as e:
- if not self._path.exists():
- # Raise a less confusing exception.
- raise FileNotFoundError(
- f'Directory does not exist: {self._path}') from e
- raise
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys)
- if metadata:
- self._write_metadata(metadata)
-
- def _create_symlink(self, source, metakeys):
- """Copy the given symbolic link to our path."""
- self._path.symlink_to(source.readlink())
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
- if metadata:
- self._write_metadata(metadata, follow_symlinks=False)
-
- def _ensure_different_file(self, source):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- pass
-
- def _ensure_distinct_path(self, source):
- """
- Raise OSError(EINVAL) if the other path is within this path.
- """
- # Note: there is no straightforward, foolproof algorithm to determine
- # if one directory is within another (a particularly perverse example
- # would be a single network share mounted in one location via NFS, and
- # in another location via CIFS), so we simply checks whether the
- # other path is lexically equal to, or within, this path.
- if source == self._path:
- err = OSError(EINVAL, "Source and target are the same path")
- elif source in self._path.parents:
- err = OSError(EINVAL, "Source path is a parent of target path")
- else:
- return
- err.filename = str(source)
- err.filename2 = str(self._path)
- raise err
-
-
class JoinablePath:
"""Base class for pure path objects.
from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
-from stat import S_IMODE, S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from _collections_abc import Sequence
try:
except ImportError:
grp = None
-from pathlib._os import copyfile, PathInfo, DirEntryInfo
-from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath
+from pathlib._os import LocalCopyReader, LocalCopyWriter, PathInfo, DirEntryInfo
+from pathlib._abc import JoinablePath, ReadablePath, WritablePath
__all__ = [
return "<{}.parents>".format(type(self._path).__name__)
-class _LocalCopyReader(CopyReader):
- """This object implements the "read" part of copying local paths. Don't
- try to construct it yourself.
- """
- __slots__ = ()
-
- _readable_metakeys = {'mode', 'times_ns'}
- if hasattr(os.stat_result, 'st_flags'):
- _readable_metakeys.add('flags')
- if hasattr(os, 'listxattr'):
- _readable_metakeys.add('xattrs')
- _readable_metakeys = frozenset(_readable_metakeys)
-
- def _read_metadata(self, metakeys, *, follow_symlinks=True):
- metadata = {}
- if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
- st = self._path.stat(follow_symlinks=follow_symlinks)
- if 'mode' in metakeys:
- metadata['mode'] = S_IMODE(st.st_mode)
- if 'times_ns' in metakeys:
- metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
- if 'flags' in metakeys:
- metadata['flags'] = st.st_flags
- if 'xattrs' in metakeys:
- try:
- metadata['xattrs'] = [
- (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
- for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
- except OSError as err:
- if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- return metadata
-
-
-class _LocalCopyWriter(CopyWriter):
- """This object implements the "write" part of copying local paths. Don't
- try to construct it yourself.
- """
- __slots__ = ()
-
- _writable_metakeys = _LocalCopyReader._readable_metakeys
-
- def _write_metadata(self, metadata, *, follow_symlinks=True):
- def _nop(*args, ns=None, follow_symlinks=None):
- pass
-
- if follow_symlinks:
- # use the real function if it exists
- def lookup(name):
- return getattr(os, name, _nop)
- else:
- # use the real function only if it exists
- # *and* it supports follow_symlinks
- def lookup(name):
- fn = getattr(os, name, _nop)
- if fn in os.supports_follow_symlinks:
- return fn
- return _nop
-
- times_ns = metadata.get('times_ns')
- if times_ns is not None:
- lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
- # We must copy extended attributes before the file is (potentially)
- # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- xattrs = metadata.get('xattrs')
- if xattrs is not None:
- for attr, value in xattrs:
- try:
- os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
- except OSError as e:
- if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- mode = metadata.get('mode')
- if mode is not None:
- try:
- lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
- except NotImplementedError:
- # if we got a NotImplementedError, it's because
- # * follow_symlinks=False,
- # * lchown() is unavailable, and
- # * either
- # * fchownat() is unavailable or
- # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
- # (it returned ENOSUP.)
- # therefore we're out of options--we simply cannot chown the
- # symlink. give up, suppress the error.
- # (which is what shutil always did in this circumstance.)
- pass
- flags = metadata.get('flags')
- if flags is not None:
- try:
- lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
- except OSError as why:
- if why.errno not in (EOPNOTSUPP, ENOTSUP):
- raise
-
- if copyfile:
- # Use fast OS routine for local file copying where available.
- def _create_file(self, source, metakeys):
- """Copy the given file to the given target."""
- try:
- source = os.fspath(source)
- except TypeError:
- if not isinstance(source, WritablePath):
- raise
- super()._create_file(source, metakeys)
- else:
- copyfile(source, os.fspath(self._path))
-
- if os.name == 'nt':
- # Windows: symlink target might not exist yet if we're copying several
- # files, so ensure we pass is_dir to os.symlink().
- def _create_symlink(self, source, metakeys):
- """Copy the given symlink to the given target."""
- self._path.symlink_to(source.readlink(), source.is_dir())
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
- if metadata:
- self._write_metadata(metadata, follow_symlinks=False)
-
- def _ensure_different_file(self, source):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- try:
- if not self._path.samefile(source):
- return
- except (OSError, ValueError):
- return
- err = OSError(EINVAL, "Source and target are the same file")
- err.filename = str(source)
- err.filename2 = str(self._path)
- raise err
-
-
class PurePath(JoinablePath):
"""Base class for manipulating paths without I/O.
os.replace(self, target)
return self.with_segments(target)
- _copy_reader = property(_LocalCopyReader)
- _copy_writer = property(_LocalCopyWriter)
+ _copy_reader = property(LocalCopyReader)
+ _copy_writer = property(LocalCopyWriter)
def move(self, target):
"""
"""
from errno import *
-from stat import S_ISDIR, S_ISREG, S_ISLNK
+from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE
+import io
import os
import sys
try:
write_target(buf)
+def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
+ newline=None):
+ """
+ Open the file pointed to by this path and return a file object, as
+ the built-in open() function does.
+ """
+ try:
+ return io.open(path, mode, buffering, encoding, errors, newline)
+ except TypeError:
+ pass
+ cls = type(path)
+ text = 'b' not in mode
+ mode = ''.join(sorted(c for c in mode if c not in 'bt'))
+ if text:
+ try:
+ attr = getattr(cls, f'__open_{mode}__')
+ except AttributeError:
+ pass
+ else:
+ return attr(path, buffering, encoding, errors, newline)
+
+ try:
+ attr = getattr(cls, f'__open_{mode}b__')
+ except AttributeError:
+ pass
+ else:
+ stream = attr(path, buffering)
+ if text:
+ stream = io.TextIOWrapper(stream, encoding, errors, newline)
+ return stream
+
+ raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
+
+
+class CopyReader:
+ """
+ Class that implements the "read" part of copying between path objects.
+ An instance of this class is available from the ReadablePath._copy_reader
+ property.
+ """
+ __slots__ = ('_path',)
+
+ def __init__(self, path):
+ self._path = path
+
+ _readable_metakeys = frozenset()
+
+ def _read_metadata(self, metakeys, *, follow_symlinks=True):
+ """
+ Returns path metadata as a dict with string keys.
+ """
+ raise NotImplementedError
+
+
+class CopyWriter:
+ """
+ Class that implements the "write" part of copying between path objects. An
+ instance of this class is available from the WritablePath._copy_writer
+ property.
+ """
+ __slots__ = ('_path',)
+
+ def __init__(self, path):
+ self._path = path
+
+ _writable_metakeys = frozenset()
+
+ def _write_metadata(self, metadata, *, follow_symlinks=True):
+ """
+ Sets path metadata from the given dict with string keys.
+ """
+ raise NotImplementedError
+
+ def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
+ self._ensure_distinct_path(source)
+ if preserve_metadata:
+ metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
+ else:
+ metakeys = None
+ if not follow_symlinks and source.is_symlink():
+ self._create_symlink(source, metakeys)
+ elif source.is_dir():
+ self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
+ else:
+ self._create_file(source, metakeys)
+ return self._path
+
+ def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
+ """Copy the given directory to our path."""
+ children = list(source.iterdir())
+ self._path.mkdir(exist_ok=dirs_exist_ok)
+ for src in children:
+ dst = self._path.joinpath(src.name)
+ if not follow_symlinks and src.is_symlink():
+ dst._copy_writer._create_symlink(src, metakeys)
+ elif src.is_dir():
+ dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+ else:
+ dst._copy_writer._create_file(src, metakeys)
+ if metakeys:
+ metadata = source._copy_reader._read_metadata(metakeys)
+ if metadata:
+ self._write_metadata(metadata)
+
+ def _create_file(self, source, metakeys):
+ """Copy the given file to our path."""
+ self._ensure_different_file(source)
+ with magic_open(source, 'rb') as source_f:
+ try:
+ with magic_open(self._path, 'wb') as target_f:
+ copyfileobj(source_f, target_f)
+ except IsADirectoryError as e:
+ if not self._path.exists():
+ # Raise a less confusing exception.
+ raise FileNotFoundError(
+ f'Directory does not exist: {self._path}') from e
+ raise
+ if metakeys:
+ metadata = source._copy_reader._read_metadata(metakeys)
+ if metadata:
+ self._write_metadata(metadata)
+
+ def _create_symlink(self, source, metakeys):
+ """Copy the given symbolic link to our path."""
+ self._path.symlink_to(source.readlink())
+ if metakeys:
+ metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
+ if metadata:
+ self._write_metadata(metadata, follow_symlinks=False)
+
+ def _ensure_different_file(self, source):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ pass
+
+ def _ensure_distinct_path(self, source):
+ """
+ Raise OSError(EINVAL) if the other path is within this path.
+ """
+ # Note: there is no straightforward, foolproof algorithm to determine
+ # if one directory is within another (a particularly perverse example
+ # would be a single network share mounted in one location via NFS, and
+ # in another location via CIFS), so we simply checks whether the
+ # other path is lexically equal to, or within, this path.
+ if source == self._path:
+ err = OSError(EINVAL, "Source and target are the same path")
+ elif source in self._path.parents:
+ err = OSError(EINVAL, "Source path is a parent of target path")
+ else:
+ return
+ err.filename = str(source)
+ err.filename2 = str(self._path)
+ raise err
+
+
+class LocalCopyReader(CopyReader):
+ """This object implements the "read" part of copying local paths. Don't
+ try to construct it yourself.
+ """
+ __slots__ = ()
+
+ _readable_metakeys = {'mode', 'times_ns'}
+ if hasattr(os.stat_result, 'st_flags'):
+ _readable_metakeys.add('flags')
+ if hasattr(os, 'listxattr'):
+ _readable_metakeys.add('xattrs')
+ _readable_metakeys = frozenset(_readable_metakeys)
+
+ def _read_metadata(self, metakeys, *, follow_symlinks=True):
+ metadata = {}
+ if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
+ st = self._path.stat(follow_symlinks=follow_symlinks)
+ if 'mode' in metakeys:
+ metadata['mode'] = S_IMODE(st.st_mode)
+ if 'times_ns' in metakeys:
+ metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+ if 'flags' in metakeys:
+ metadata['flags'] = st.st_flags
+ if 'xattrs' in metakeys:
+ try:
+ metadata['xattrs'] = [
+ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+ for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+ except OSError as err:
+ if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ return metadata
+
+
+class LocalCopyWriter(CopyWriter):
+ """This object implements the "write" part of copying local paths. Don't
+ try to construct it yourself.
+ """
+ __slots__ = ()
+
+ _writable_metakeys = LocalCopyReader._readable_metakeys
+
+ def _write_metadata(self, metadata, *, follow_symlinks=True):
+ def _nop(*args, ns=None, follow_symlinks=None):
+ pass
+
+ if follow_symlinks:
+ # use the real function if it exists
+ def lookup(name):
+ return getattr(os, name, _nop)
+ else:
+ # use the real function only if it exists
+ # *and* it supports follow_symlinks
+ def lookup(name):
+ fn = getattr(os, name, _nop)
+ if fn in os.supports_follow_symlinks:
+ return fn
+ return _nop
+
+ times_ns = metadata.get('times_ns')
+ if times_ns is not None:
+ lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ xattrs = metadata.get('xattrs')
+ if xattrs is not None:
+ for attr, value in xattrs:
+ try:
+ os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ mode = metadata.get('mode')
+ if mode is not None:
+ try:
+ lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+ except NotImplementedError:
+ # if we got a NotImplementedError, it's because
+ # * follow_symlinks=False,
+ # * lchown() is unavailable, and
+ # * either
+ # * fchownat() is unavailable or
+ # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+ # (it returned ENOSUP.)
+ # therefore we're out of options--we simply cannot chown the
+ # symlink. give up, suppress the error.
+ # (which is what shutil always did in this circumstance.)
+ pass
+ flags = metadata.get('flags')
+ if flags is not None:
+ try:
+ lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+ except OSError as why:
+ if why.errno not in (EOPNOTSUPP, ENOTSUP):
+ raise
+
+ if copyfile:
+ # Use fast OS routine for local file copying where available.
+ def _create_file(self, source, metakeys):
+ """Copy the given file to the given target."""
+ try:
+ source = os.fspath(source)
+ except TypeError:
+ super()._create_file(source, metakeys)
+ else:
+ copyfile(source, os.fspath(self._path))
+
+ if os.name == 'nt':
+ # Windows: symlink target might not exist yet if we're copying several
+ # files, so ensure we pass is_dir to os.symlink().
+ def _create_symlink(self, source, metakeys):
+ """Copy the given symlink to the given target."""
+ self._path.symlink_to(source.readlink(), source.is_dir())
+ if metakeys:
+ metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
+ if metadata:
+ self._write_metadata(metadata, follow_symlinks=False)
+
+ def _ensure_different_file(self, source):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ try:
+ if not self._path.samefile(source):
+ return
+ except (OSError, ValueError):
+ return
+ err = OSError(EINVAL, "Source and target are the same file")
+ err.filename = str(source)
+ err.filename2 = str(self._path)
+ raise err
+
+
class _PathInfoBase:
__slots__ = ()