from pathlib._os import (
PathInfo, DirEntryInfo,
- magic_open, vfspath,
+ vfsopen, vfspath,
ensure_different_files, ensure_distinct_paths,
copyfile2, copyfileobj, copy_info,
)
def _copy_from_file(self, source, preserve_metadata=False):
ensure_different_files(source, self)
- with magic_open(source, 'rb') as source_f:
+ with vfsopen(source, 'rb') as source_f:
with open(self, 'wb') as target_f:
copyfileobj(source_f, target_f)
if preserve_metadata:
write_target(buf)
-def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
- newline=None):
+def _open_reader(obj):
+ cls = type(obj)
+ try:
+ open_reader = cls.__open_reader__
+ except AttributeError:
+ cls_name = cls.__name__
+ raise TypeError(f"{cls_name} can't be opened for reading") from None
+ else:
+ return open_reader(obj)
+
+
+def _open_writer(obj, mode):
+ cls = type(obj)
+ try:
+ open_writer = cls.__open_writer__
+ except AttributeError:
+ cls_name = cls.__name__
+ raise TypeError(f"{cls_name} can't be opened for writing") from None
+ else:
+ return open_writer(obj, mode)
+
+
+def _open_updater(obj, mode):
+ cls = type(obj)
+ try:
+ open_updater = cls.__open_updater__
+ except AttributeError:
+ cls_name = cls.__name__
+ raise TypeError(f"{cls_name} can't be opened for updating") from None
+ else:
+ return open_updater(obj, mode)
+
+
+def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None,
+ newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
+
+ Unlike the built-in open() function, this function additionally accepts
+ 'openable' objects, which are objects with any of these special methods:
+
+ __open_reader__()
+ __open_writer__(mode)
+ __open_updater__(mode)
+
+ '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w'
+ and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text
+ mode is requested, the result is wrapped in an io.TextIOWrapper object.
"""
+ if buffering != -1:
+ raise ValueError("buffer size can't be customized")
text = 'b' not in mode
if text:
# Call io.text_encoding() here to ensure any warning is raised at an
# appropriate stack level.
encoding = text_encoding(encoding)
try:
- return open(path, mode, buffering, encoding, errors, newline)
+ return open(obj, mode, buffering, encoding, errors, newline)
except TypeError:
pass
- cls = type(path)
+ if not text:
+ if encoding is not None:
+ raise ValueError("binary mode doesn't take an encoding argument")
+ if errors is not None:
+ raise ValueError("binary mode doesn't take an errors argument")
+ if newline is not None:
+ raise ValueError("binary mode doesn't take a newline argument")
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
- if text:
- try:
- attr = getattr(cls, f'__open_{mode}__')
- except AttributeError:
- pass
- else:
- return attr(path, buffering, encoding, errors, newline)
- elif encoding is not None:
- raise ValueError("binary mode doesn't take an encoding argument")
- elif errors is not None:
- raise ValueError("binary mode doesn't take an errors argument")
- elif newline is not None:
- raise ValueError("binary mode doesn't take a newline argument")
-
- try:
- attr = getattr(cls, f'__open_{mode}b__')
- except AttributeError:
- pass
+ if mode == 'r':
+ stream = _open_reader(obj)
+ elif mode in ('a', 'w', 'x'):
+ stream = _open_writer(obj, mode)
+ elif mode in ('+r', '+w'):
+ stream = _open_updater(obj, mode[1])
else:
- stream = attr(path, buffering)
- if text:
- stream = TextIOWrapper(stream, encoding, errors, newline)
- return stream
-
- raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
+ raise ValueError(f'invalid mode: {mode}')
+ if text:
+ stream = TextIOWrapper(stream, encoding, errors, newline)
+ return stream
def vfspath(obj):
from abc import ABC, abstractmethod
from glob import _GlobberBase
from io import text_encoding
-from pathlib._os import (magic_open, vfspath, ensure_distinct_paths,
+from pathlib._os import (vfsopen, vfspath, ensure_distinct_paths,
ensure_different_files, copyfileobj)
from pathlib import PurePath, Path
from typing import Optional, Protocol, runtime_checkable
raise NotImplementedError
@abstractmethod
- def __open_rb__(self, buffering=-1):
+ def __open_reader__(self):
"""
Open the file pointed to by this path for reading in binary mode and
- return a file object, like open(mode='rb').
+ return a file object.
"""
raise NotImplementedError
"""
Open the file in bytes mode, read it, and close the file.
"""
- with magic_open(self, mode='rb', buffering=0) as f:
+ with vfsopen(self, mode='rb') as f:
return f.read()
def read_text(self, encoding=None, errors=None, newline=None):
# Call io.text_encoding() here to ensure any warning is raised at an
# appropriate stack level.
encoding = text_encoding(encoding)
- with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
+ with vfsopen(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
return f.read()
@abstractmethod
raise NotImplementedError
@abstractmethod
- def __open_wb__(self, buffering=-1):
+ def __open_writer__(self, mode):
"""
Open the file pointed to by this path for writing in binary mode and
- return a file object, like open(mode='wb').
+ return a file object.
"""
raise NotImplementedError
"""
# type-check for the buffer interface before truncating the file
view = memoryview(data)
- with magic_open(self, mode='wb') as f:
+ with vfsopen(self, mode='wb') as f:
return f.write(view)
def write_text(self, data, encoding=None, errors=None, newline=None):
if not isinstance(data, str):
raise TypeError('data must be str, not %s' %
data.__class__.__name__)
- with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
+ with vfsopen(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)
def _copy_from(self, source, follow_symlinks=True):
stack.append((child, dst.joinpath(child.name)))
else:
ensure_different_files(src, dst)
- with magic_open(src, 'rb') as source_f:
- with magic_open(dst, 'wb') as target_f:
+ with vfsopen(src, 'rb') as source_f:
+ with vfsopen(dst, 'wb') as target_f:
copyfileobj(source_f, target_f)
super().__init__(*pathsegments)
self.info = LocalPathInfo(self)
- def __open_rb__(self, buffering=-1):
+ def __open_reader__(self):
return open(self, 'rb')
def iterdir(self):
__slots__ = ()
__fspath__ = LexicalPath.__vfspath__
- def __open_wb__(self, buffering=-1):
- return open(self, 'wb')
+ def __open_writer__(self, mode):
+ return open(self, f'{mode}b')
def mkdir(self, mode=0o777):
os.mkdir(self, mode)
tree = self.zip_file.filelist.tree
return tree.resolve(vfspath(self), follow_symlinks=False)
- def __open_rb__(self, buffering=-1):
+ def __open_reader__(self):
info = self.info.resolve()
if not info.exists():
raise FileNotFoundError(errno.ENOENT, "File not found", self)
elif info.is_dir():
raise IsADirectoryError(errno.EISDIR, "Is a directory", self)
- return self.zip_file.open(info.zip_info, 'r')
+ return self.zip_file.open(info.zip_info)
def iterdir(self):
info = self.info.resolve()
def with_segments(self, *pathsegments):
return type(self)(*pathsegments, zip_file=self.zip_file)
- def __open_wb__(self, buffering=-1):
- return self.zip_file.open(vfspath(self), 'w')
+ def __open_writer__(self, mode):
+ return self.zip_file.open(vfspath(self), mode)
def mkdir(self, mode=0o777):
zinfo = zipfile.ZipInfo(vfspath(self) + '/')
if is_pypi:
from pathlib_abc import PathInfo, _ReadablePath
- from pathlib_abc._os import magic_open
+ from pathlib_abc._os import vfsopen
else:
from pathlib.types import PathInfo, _ReadablePath
- from pathlib._os import magic_open
+ from pathlib._os import vfsopen
class ReadTestBase:
def test_open_r(self):
p = self.root / 'fileA'
- with magic_open(p, 'r', encoding='utf-8') as f:
+ with vfsopen(p, 'r', encoding='utf-8') as f:
self.assertIsInstance(f, io.TextIOBase)
self.assertEqual(f.read(), 'this is file A\n')
+ def test_open_r_buffering_error(self):
+ p = self.root / 'fileA'
+ self.assertRaises(ValueError, vfsopen, p, 'r', buffering=0)
+ self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1)
+ self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1024)
+
@unittest.skipIf(
not getattr(sys.flags, 'warn_default_encoding', 0),
"Requires warn_default_encoding",
def test_open_r_encoding_warning(self):
p = self.root / 'fileA'
with self.assertWarns(EncodingWarning) as wc:
- with magic_open(p, 'r'):
+ with vfsopen(p, 'r'):
pass
self.assertEqual(wc.filename, __file__)
def test_open_rb(self):
p = self.root / 'fileA'
- with magic_open(p, 'rb') as f:
+ with vfsopen(p, 'rb') as f:
self.assertEqual(f.read(), b'this is file A\n')
- self.assertRaises(ValueError, magic_open, p, 'rb', encoding='utf8')
- self.assertRaises(ValueError, magic_open, p, 'rb', errors='strict')
- self.assertRaises(ValueError, magic_open, p, 'rb', newline='')
+ self.assertRaises(ValueError, vfsopen, p, 'rb', encoding='utf8')
+ self.assertRaises(ValueError, vfsopen, p, 'rb', errors='strict')
+ self.assertRaises(ValueError, vfsopen, p, 'rb', newline='')
def test_read_bytes(self):
p = self.root / 'fileA'
if is_pypi:
from pathlib_abc import _WritablePath
- from pathlib_abc._os import magic_open
+ from pathlib_abc._os import vfsopen
else:
from pathlib.types import _WritablePath
- from pathlib._os import magic_open
+ from pathlib._os import vfsopen
class WriteTestBase:
def test_open_w(self):
p = self.root / 'fileA'
- with magic_open(p, 'w', encoding='utf-8') as f:
+ with vfsopen(p, 'w', encoding='utf-8') as f:
self.assertIsInstance(f, io.TextIOBase)
f.write('this is file A\n')
self.assertEqual(self.ground.readtext(p), 'this is file A\n')
+ def test_open_w_buffering_error(self):
+ p = self.root / 'fileA'
+ self.assertRaises(ValueError, vfsopen, p, 'w', buffering=0)
+ self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1)
+ self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1024)
+
@unittest.skipIf(
not getattr(sys.flags, 'warn_default_encoding', 0),
"Requires warn_default_encoding",
def test_open_w_encoding_warning(self):
p = self.root / 'fileA'
with self.assertWarns(EncodingWarning) as wc:
- with magic_open(p, 'w'):
+ with vfsopen(p, 'w'):
pass
self.assertEqual(wc.filename, __file__)
def test_open_wb(self):
p = self.root / 'fileA'
- with magic_open(p, 'wb') as f:
+ with vfsopen(p, 'wb') as f:
#self.assertIsInstance(f, io.BufferedWriter)
f.write(b'this is file A\n')
self.assertEqual(self.ground.readbytes(p), b'this is file A\n')
- self.assertRaises(ValueError, magic_open, p, 'wb', encoding='utf8')
- self.assertRaises(ValueError, magic_open, p, 'wb', errors='strict')
- self.assertRaises(ValueError, magic_open, p, 'wb', newline='')
+ self.assertRaises(ValueError, vfsopen, p, 'wb', encoding='utf8')
+ self.assertRaises(ValueError, vfsopen, p, 'wb', errors='strict')
+ self.assertRaises(ValueError, vfsopen, p, 'wb', newline='')
def test_write_bytes(self):
p = self.root / 'fileA'