"""
import functools
+import io
import operator
import posixpath
from errno import EINVAL
return path, names
+def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
+ newline=None):
+ """
+ Open the file pointed to by this path and return a file object, as
+ the built-in open() function does.
+ """
+ try:
+ return io.open(path, mode, buffering, encoding, errors, newline)
+ except TypeError:
+ pass
+ cls = type(path)
+ text = 'b' not in mode
+ mode = ''.join(sorted(c for c in mode if c not in 'bt'))
+ if text:
+ try:
+ attr = getattr(cls, f'__open_{mode}__')
+ except AttributeError:
+ pass
+ else:
+ return attr(path, buffering, encoding, errors, newline)
+
+ try:
+ attr = getattr(cls, f'__open_{mode}b__')
+ except AttributeError:
+ pass
+ else:
+ stream = attr(path, buffering)
+ if text:
+ stream = io.TextIOWrapper(stream, encoding, errors, newline)
+ return stream
+
+ raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
+
+
class PathGlobber(_GlobberBase):
"""
Class providing shell-style globbing for path objects.
class CopyReader:
"""
- Class that implements copying between path objects. An instance of this
- class is available from the ReadablePath.copy property; it's made callable
- so that ReadablePath.copy() can be treated as a method.
-
- The target path's CopyWriter drives the process from its _create() method.
- Files and directories are exchanged by calling methods on the source and
- target paths, and metadata is exchanged by calling
- source.copy._read_metadata() and target.copy._write_metadata().
+ Class that implements the "read" part of copying between path objects.
+ An instance of this class is available from the ReadablePath._copy_reader
+ property.
"""
__slots__ = ('_path',)
def __init__(self, path):
self._path = path
- def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
- preserve_metadata=False):
- """
- Recursively copy this file or directory tree to the given destination.
- """
- if not isinstance(target, ReadablePath):
- target = self._path.with_segments(target)
-
- # Delegate to the target path's CopyWriter object.
- try:
- create = target.copy._create
- except AttributeError:
- raise TypeError(f"Target is not writable: {target}") from None
- return create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
-
_readable_metakeys = frozenset()
def _read_metadata(self, metakeys, *, follow_symlinks=True):
raise NotImplementedError
-class CopyWriter(CopyReader):
- __slots__ = ()
+class CopyWriter:
+ """
+ Class that implements the "write" part of copying between path objects. An
+ instance of this class is available from the WritablePath._copy_writer
+ property.
+ """
+ __slots__ = ('_path',)
+
+ def __init__(self, path):
+ self._path = path
_writable_metakeys = frozenset()
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
- metakeys = self._writable_metakeys & source.copy._readable_metakeys
+ metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
- dst.copy._create_symlink(src, metakeys)
+ dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
- dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+ dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
- dst.copy._create_file(src, metakeys)
+ dst._copy_writer._create_file(src, metakeys)
if metakeys:
- metadata = source.copy._read_metadata(metakeys)
+ metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)
def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
- with source.open('rb') as source_f:
+ with magic_open(source, 'rb') as source_f:
try:
- with self._path.open('wb') as target_f:
+ with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
- metadata = source.copy._read_metadata(metakeys)
+ metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
- metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+ metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)
"""
raise NotImplementedError
- def open(self, mode='r', buffering=-1, encoding=None,
- errors=None, newline=None):
+ def __open_rb__(self, buffering=-1):
"""
- Open the file pointed to by this path and return a file object, as
- the built-in open() function does.
+ Open the file pointed to by this path for reading in binary mode and
+ return a file object, like open(mode='rb').
"""
raise NotImplementedError
"""
Open the file in bytes mode, read it, and close the file.
"""
- with self.open(mode='rb', buffering=0) as f:
+ with magic_open(self, mode='rb', buffering=0) as f:
return f.read()
def read_text(self, encoding=None, errors=None, newline=None):
"""
Open the file in text mode, read it, and close the file.
"""
- with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
+ with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
return f.read()
def _scandir(self):
"""
raise NotImplementedError
- copy = property(CopyReader, doc=CopyReader.__call__.__doc__)
+ _copy_reader = property(CopyReader)
+
+ def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
+ preserve_metadata=False):
+ """
+ Recursively copy this file or directory tree to the given destination.
+ """
+ if not hasattr(target, '_copy_writer'):
+ target = self.with_segments(target)
+
+ # Delegate to the target path's CopyWriter object.
+ try:
+ create = target._copy_writer._create
+ except AttributeError:
+ raise TypeError(f"Target is not writable: {target}") from None
+ return create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
- elif isinstance(target_dir, ReadablePath):
+ elif hasattr(target_dir, '_copy_writer'):
target = target_dir / name
else:
target = self.with_segments(target_dir, name)
preserve_metadata=preserve_metadata)
-class WritablePath(ReadablePath):
+class WritablePath(JoinablePath):
__slots__ = ()
def symlink_to(self, target, target_is_directory=False):
"""
raise NotImplementedError
+ def __open_wb__(self, buffering=-1):
+ """
+ Open the file pointed to by this path for writing in binary mode and
+ return a file object, like open(mode='wb').
+ """
+ raise NotImplementedError
+
def write_bytes(self, data):
"""
Open the file in bytes mode, write to it, and close the file.
"""
# type-check for the buffer interface before truncating the file
view = memoryview(data)
- with self.open(mode='wb') as f:
+ with magic_open(self, mode='wb') as f:
return f.write(view)
def write_text(self, data, encoding=None, errors=None, newline=None):
if not isinstance(data, str):
raise TypeError('data must be str, not %s' %
data.__class__.__name__)
- with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
+ with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)
- copy = property(CopyWriter, doc=CopyWriter.__call__.__doc__)
+ _copy_writer = property(CopyWriter)
grp = None
from pathlib._os import copyfile
-from pathlib._abc import CopyWriter, JoinablePath, WritablePath
+from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath
__all__ = [
return "<{}.parents>".format(type(self._path).__name__)
-class _LocalCopyWriter(CopyWriter):
- """This object implements the Path.copy callable. Don't try to construct
- it yourself."""
+class _LocalCopyReader(CopyReader):
+ """This object implements the "read" part of copying local paths. Don't
+ try to construct it yourself.
+ """
__slots__ = ()
_readable_metakeys = {'mode', 'times_ns'}
_readable_metakeys.add('flags')
if hasattr(os, 'listxattr'):
_readable_metakeys.add('xattrs')
- _readable_metakeys = _writable_metakeys = frozenset(_readable_metakeys)
+ _readable_metakeys = frozenset(_readable_metakeys)
def _read_metadata(self, metakeys, *, follow_symlinks=True):
metadata = {}
raise
return metadata
+
+class _LocalCopyWriter(CopyWriter):
+ """This object implements the "write" part of copying local paths. Don't
+ try to construct it yourself.
+ """
+ __slots__ = ()
+
+ _writable_metakeys = _LocalCopyReader._readable_metakeys
+
def _write_metadata(self, metadata, *, follow_symlinks=True):
def _nop(*args, ns=None, follow_symlinks=None):
pass
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
if metakeys:
- metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+ metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)
__slots__ = ()
-class Path(WritablePath, PurePath):
+class Path(WritablePath, ReadablePath, PurePath):
"""PurePath subclass that can make system calls.
Path represents a filesystem path but unlike PurePath, also offers
encoding = io.text_encoding(encoding)
return io.open(self, mode, buffering, encoding, errors, newline)
+ def read_bytes(self):
+ """
+ Open the file in bytes mode, read it, and close the file.
+ """
+ with self.open(mode='rb', buffering=0) as f:
+ return f.read()
+
def read_text(self, encoding=None, errors=None, newline=None):
"""
Open the file in text mode, read it, and close the file.
# Call io.text_encoding() here to ensure any warning is raised at an
# appropriate stack level.
encoding = io.text_encoding(encoding)
- return super().read_text(encoding, errors, newline)
+ with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
+ return f.read()
+
+ def write_bytes(self, data):
+ """
+ Open the file in bytes mode, write to it, and close the file.
+ """
+ # type-check for the buffer interface before truncating the file
+ view = memoryview(data)
+ with self.open(mode='wb') as f:
+ return f.write(view)
def write_text(self, data, encoding=None, errors=None, newline=None):
"""
# Call io.text_encoding() here to ensure any warning is raised at an
# appropriate stack level.
encoding = io.text_encoding(encoding)
- return super().write_text(data, encoding, errors, newline)
+ if not isinstance(data, str):
+ raise TypeError('data must be str, not %s' %
+ data.__class__.__name__)
+ with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
+ return f.write(data)
_remove_leading_dot = operator.itemgetter(slice(2, None))
_remove_trailing_slash = operator.itemgetter(slice(-1))
os.replace(self, target)
return self.with_segments(target)
- copy = property(_LocalCopyWriter, doc=_LocalCopyWriter.__call__.__doc__)
+ _copy_reader = property(_LocalCopyReader)
+ _copy_writer = property(_LocalCopyWriter)
def move(self, target):
"""
except TypeError:
pass
else:
- if not isinstance(target, WritablePath):
+ if not hasattr(target, '_copy_writer'):
target = self.with_segments(target_str)
- target.copy._ensure_different_file(self)
+ target._copy_writer._ensure_different_file(self)
try:
os.replace(self, target_str)
return target
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
- elif isinstance(target_dir, WritablePath):
+ elif hasattr(target_dir, '_copy_writer'):
target = target_dir / name
else:
target = self.with_segments(target_dir, name)
# Tests for the concrete classes.
#
-class PathTest(test_pathlib_abc.DummyWritablePathTest, PurePathTest):
+class PathTest(test_pathlib_abc.DummyRWPathTest, PurePathTest):
"""Tests for the FS-accessing functionalities of the Path classes."""
cls = pathlib.Path
can_symlink = os_helper.can_symlink()
for dirpath, dirnames, filenames in p.walk():
self.assertEqual(42, dirpath.session_id)
+ def test_open_common(self):
+ p = self.cls(self.base)
+ with (p / 'fileA').open('r') as f:
+ self.assertIsInstance(f, io.TextIOBase)
+ self.assertEqual(f.read(), "this is file A\n")
+ with (p / 'fileA').open('rb') as f:
+ self.assertIsInstance(f, io.BufferedIOBase)
+ self.assertEqual(f.read().strip(), b"this is file A")
+
def test_open_unbuffered(self):
p = self.cls(self.base)
with (p / 'fileA').open('rb', buffering=0) as f:
import errno
import unittest
-from pathlib._abc import JoinablePath, ReadablePath, WritablePath
+from pathlib._abc import JoinablePath, ReadablePath, WritablePath, magic_open
from pathlib._types import Parser
import posixpath
class DummyWritablePathIO(io.BytesIO):
"""
- Used by DummyWritablePath to implement `open('w')`
+ Used by DummyWritablePath to implement `__open_wb__()`
"""
def __init__(self, files, path):
super().close()
-class DummyReadablePath(ReadablePath):
+class DummyReadablePath(ReadablePath, DummyJoinablePath):
"""
Simple implementation of DummyReadablePath that keeps files and
directories in memory.
"""
- __slots__ = ('_segments')
+ __slots__ = ()
_files = {}
_directories = {}
- def __init__(self, *segments):
- self._segments = segments
-
- def __str__(self):
- if self._segments:
- return self.parser.join(*self._segments)
- return ''
-
- def __eq__(self, other):
- if not isinstance(other, DummyReadablePath):
- return NotImplemented
- return str(self) == str(other)
-
- def __hash__(self):
- return hash(str(self))
-
- def __repr__(self):
- return "{}({!r})".format(self.__class__.__name__, str(self))
-
- def with_segments(self, *pathsegments):
- return type(self)(*pathsegments)
-
def exists(self, *, follow_symlinks=True):
return self.is_dir() or self.is_file()
def is_symlink(self):
return False
- def open(self, mode='r', buffering=-1, encoding=None,
- errors=None, newline=None):
- if buffering != -1 and not (buffering == 0 and 'b' in mode):
- raise NotImplementedError
+ def __open_rb__(self, buffering=-1):
path = str(self)
if path in self._directories:
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
-
- text = 'b' not in mode
- mode = ''.join(c for c in mode if c not in 'btU')
- if mode == 'r':
- if path not in self._files:
- raise FileNotFoundError(errno.ENOENT, "File not found", path)
- stream = io.BytesIO(self._files[path])
- elif mode == 'w':
- # FIXME: move to DummyWritablePath
- parent, name = posixpath.split(path)
- if parent not in self._directories:
- raise FileNotFoundError(errno.ENOENT, "File not found", parent)
- stream = DummyWritablePathIO(self._files, path)
- self._files[path] = b''
- self._directories[parent].add(name)
- else:
- raise NotImplementedError
- if text:
- stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
- return stream
+ elif path not in self._files:
+ raise FileNotFoundError(errno.ENOENT, "File not found", path)
+ return io.BytesIO(self._files[path])
def iterdir(self):
path = str(self).rstrip('/')
raise FileNotFoundError(errno.ENOENT, "File not found", path)
-class DummyWritablePath(DummyReadablePath, WritablePath):
+class DummyWritablePath(WritablePath, DummyJoinablePath):
__slots__ = ()
+ def __open_wb__(self, buffering=-1):
+ path = str(self)
+ if path in self._directories:
+ raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
+ parent, name = posixpath.split(path)
+ if parent not in self._directories:
+ raise FileNotFoundError(errno.ENOENT, "File not found", parent)
+ self._files[path] = b''
+ self._directories[parent].add(name)
+ return DummyWritablePathIO(self._files, path)
+
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
path = str(self)
parent = str(self.parent)
self.assertIs(False, P(self.base + '\udfff').exists())
self.assertIs(False, P(self.base + '\x00').exists())
- def test_open_common(self):
+ def test_magic_open(self):
p = self.cls(self.base)
- with (p / 'fileA').open('r') as f:
+ with magic_open(p / 'fileA', 'r') as f:
self.assertIsInstance(f, io.TextIOBase)
self.assertEqual(f.read(), "this is file A\n")
- with (p / 'fileA').open('rb') as f:
+ with magic_open(p / 'fileA', 'rb') as f:
self.assertIsInstance(f, io.BufferedIOBase)
self.assertEqual(f.read().strip(), b"this is file A")
self.assertIs((P / 'linkA\x00').is_file(), False)
-class DummyWritablePathTest(DummyReadablePathTest):
+class DummyWritablePathTest(DummyJoinablePathTest):
cls = DummyWritablePath
+
+class DummyRWPath(DummyWritablePath, DummyReadablePath):
+ __slots__ = ()
+
+
+class DummyRWPathTest(DummyWritablePathTest, DummyReadablePathTest):
+ cls = DummyRWPath
+ can_symlink = False
+
def test_read_write_bytes(self):
p = self.cls(self.base)
(p / 'fileA').write_bytes(b'abcdefg')