--- /dev/null
+"""
+Implementation of ReadablePath for local paths, for use in pathlib tests.
+
+LocalPathGround is also defined here. It helps establish the "ground truth"
+about local paths in tests.
+"""
+
+import os
+import pathlib.types
+
+from test.support import os_helper
+from test.test_pathlib.support.lexical_path import LexicalPath
+
+
+class LocalPathGround:
+ can_symlink = os_helper.can_symlink()
+
+ def __init__(self, path_cls):
+ self.path_cls = path_cls
+
+ def setup(self, local_suffix=""):
+ root = self.path_cls(os_helper.TESTFN + local_suffix)
+ os.mkdir(root)
+ return root
+
+ def teardown(self, root):
+ os_helper.rmtree(root)
+
+ def create_file(self, p, data=b''):
+ with open(p, 'wb') as f:
+ f.write(data)
+
+ def create_dir(self, p):
+ os.mkdir(p)
+
+ def create_symlink(self, p, target):
+ os.symlink(target, p)
+
+ def create_hierarchy(self, p):
+ os.mkdir(os.path.join(p, 'dirA'))
+ os.mkdir(os.path.join(p, 'dirB'))
+ os.mkdir(os.path.join(p, 'dirC'))
+ os.mkdir(os.path.join(p, 'dirC', 'dirD'))
+ with open(os.path.join(p, 'fileA'), 'wb') as f:
+ f.write(b"this is file A\n")
+ with open(os.path.join(p, 'dirB', 'fileB'), 'wb') as f:
+ f.write(b"this is file B\n")
+ with open(os.path.join(p, 'dirC', 'fileC'), 'wb') as f:
+ f.write(b"this is file C\n")
+ with open(os.path.join(p, 'dirC', 'novel.txt'), 'wb') as f:
+ f.write(b"this is a novel\n")
+ with open(os.path.join(p, 'dirC', 'dirD', 'fileD'), 'wb') as f:
+ f.write(b"this is file D\n")
+ if self.can_symlink:
+ # Relative symlinks.
+ os.symlink('fileA', os.path.join(p, 'linkA'))
+ os.symlink('non-existing', os.path.join(p, 'brokenLink'))
+ os.symlink('dirB',
+ os.path.join(p, 'linkB'),
+ target_is_directory=True)
+ os.symlink(os.path.join('..', 'dirB'),
+ os.path.join(p, 'dirA', 'linkC'),
+ target_is_directory=True)
+ # Broken symlink (pointing to itself).
+ os.symlink('brokenLinkLoop', os.path.join(p, 'brokenLinkLoop'))
+
+ isdir = staticmethod(os.path.isdir)
+ isfile = staticmethod(os.path.isfile)
+ islink = staticmethod(os.path.islink)
+ readlink = staticmethod(os.readlink)
+
+ def readtext(self, p):
+ with open(p, 'r') as f:
+ return f.read()
+
+ def readbytes(self, p):
+ with open(p, 'rb') as f:
+ return f.read()
+
+
+class LocalPathInfo(pathlib.types.PathInfo):
+ """
+ Simple implementation of PathInfo for a local path
+ """
+ __slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
+
+ def __init__(self, path):
+ self._path = str(path)
+ self._exists = None
+ self._is_dir = None
+ self._is_file = None
+ self._is_symlink = None
+
+ def exists(self, *, follow_symlinks=True):
+ """Whether this path exists."""
+ if not follow_symlinks and self.is_symlink():
+ return True
+ if self._exists is None:
+ self._exists = os.path.exists(self._path)
+ return self._exists
+
+ def is_dir(self, *, follow_symlinks=True):
+ """Whether this path is a directory."""
+ if not follow_symlinks and self.is_symlink():
+ return False
+ if self._is_dir is None:
+ self._is_dir = os.path.isdir(self._path)
+ return self._is_dir
+
+ def is_file(self, *, follow_symlinks=True):
+ """Whether this path is a regular file."""
+ if not follow_symlinks and self.is_symlink():
+ return False
+ if self._is_file is None:
+ self._is_file = os.path.isfile(self._path)
+ return self._is_file
+
+ def is_symlink(self):
+ """Whether this path is a symbolic link."""
+ if self._is_symlink is None:
+ self._is_symlink = os.path.islink(self._path)
+ return self._is_symlink
+
+
+class ReadableLocalPath(pathlib.types._ReadablePath, LexicalPath):
+ """
+ Simple implementation of a ReadablePath class for local filesystem paths.
+ """
+ __slots__ = ('info',)
+
+ def __init__(self, *pathsegments):
+ super().__init__(*pathsegments)
+ self.info = LocalPathInfo(self)
+
+ def __fspath__(self):
+ return str(self)
+
+ def __open_rb__(self, buffering=-1):
+ return open(self, 'rb')
+
+ def iterdir(self):
+ return (self / name for name in os.listdir(self))
+
+ def readlink(self):
+ return self.with_segments(os.readlink(self))
--- /dev/null
+"""
+Implementation of ReadablePath for zip file members, for use in pathlib tests.
+
+ZipPathGround is also defined here. It helps establish the "ground truth"
+about zip file members in tests.
+"""
+
+import errno
+import io
+import pathlib.types
+import posixpath
+import stat
+import zipfile
+from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK
+
+
+class ZipPathGround:
+ can_symlink = True
+
+ def __init__(self, path_cls):
+ self.path_cls = path_cls
+
+ def setup(self, local_suffix=""):
+ return self.path_cls(zip_file=zipfile.ZipFile(io.BytesIO(), "w"))
+
+ def teardown(self, root):
+ root.zip_file.close()
+
+ def create_file(self, path, data=b''):
+ path.zip_file.writestr(str(path), data)
+
+ def create_dir(self, path):
+ path.zip_file.mkdir(str(path))
+
+ def create_symlink(self, path, target):
+ zip_info = zipfile.ZipInfo(str(path))
+ zip_info.external_attr = stat.S_IFLNK << 16
+ path.zip_file.writestr(zip_info, target.encode())
+
+ def create_hierarchy(self, p):
+ # Add regular files
+ self.create_file(p.joinpath('fileA'), b'this is file A\n')
+ self.create_file(p.joinpath('dirB/fileB'), b'this is file B\n')
+ self.create_file(p.joinpath('dirC/fileC'), b'this is file C\n')
+ self.create_file(p.joinpath('dirC/dirD/fileD'), b'this is file D\n')
+ self.create_file(p.joinpath('dirC/novel.txt'), b'this is a novel\n')
+ # Add symlinks
+ self.create_symlink(p.joinpath('linkA'), 'fileA')
+ self.create_symlink(p.joinpath('linkB'), 'dirB')
+ self.create_symlink(p.joinpath('dirA/linkC'), '../dirB')
+ self.create_symlink(p.joinpath('brokenLink'), 'non-existing')
+ self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop')
+
+ def readtext(self, p):
+ with p.zip_file.open(str(p), 'r') as f:
+ f = io.TextIOWrapper(f)
+ return f.read()
+
+ def readbytes(self, p):
+ with p.zip_file.open(str(p), 'r') as f:
+ return f.read()
+
+ readlink = readtext
+
+ def isdir(self, p):
+ path_str = str(p) + "/"
+ return path_str in p.zip_file.NameToInfo
+
+ def isfile(self, p):
+ info = p.zip_file.NameToInfo.get(str(p))
+ if info is None:
+ return False
+ return not stat.S_ISLNK(info.external_attr >> 16)
+
+ def islink(self, p):
+ info = p.zip_file.NameToInfo.get(str(p))
+ if info is None:
+ return False
+ return stat.S_ISLNK(info.external_attr >> 16)
+
+
+class MissingZipPathInfo:
+ """
+ PathInfo implementation that is used when a zip file member is missing.
+ """
+ __slots__ = ()
+
+ def exists(self, follow_symlinks=True):
+ return False
+
+ def is_dir(self, follow_symlinks=True):
+ return False
+
+ def is_file(self, follow_symlinks=True):
+ return False
+
+ def is_symlink(self):
+ return False
+
+ def resolve(self):
+ return self
+
+
+missing_zip_path_info = MissingZipPathInfo()
+
+
+class ZipPathInfo:
+ """
+ PathInfo implementation for an existing zip file member.
+ """
+ __slots__ = ('zip_file', 'zip_info', 'parent', 'children')
+
+ def __init__(self, zip_file, parent=None):
+ self.zip_file = zip_file
+ self.zip_info = None
+ self.parent = parent or self
+ self.children = {}
+
+ def exists(self, follow_symlinks=True):
+ if follow_symlinks and self.is_symlink():
+ return self.resolve().exists()
+ return True
+
+ def is_dir(self, follow_symlinks=True):
+ if follow_symlinks and self.is_symlink():
+ return self.resolve().is_dir()
+ elif self.zip_info is None:
+ return True
+ elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
+ return S_ISDIR(fmt)
+ else:
+ return self.zip_info.filename.endswith('/')
+
+ def is_file(self, follow_symlinks=True):
+ if follow_symlinks and self.is_symlink():
+ return self.resolve().is_file()
+ elif self.zip_info is None:
+ return False
+ elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
+ return S_ISREG(fmt)
+ else:
+ return not self.zip_info.filename.endswith('/')
+
+ def is_symlink(self):
+ if self.zip_info is None:
+ return False
+ elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
+ return S_ISLNK(fmt)
+ else:
+ return False
+
+ def resolve(self, path=None, create=False, follow_symlinks=True):
+ """
+ Traverse zip hierarchy (parents, children and symlinks) starting
+ from this PathInfo. This is called from three places:
+
+ - When a zip file member is added to ZipFile.filelist, this method
+ populates the ZipPathInfo tree (using create=True).
+ - When ReadableZipPath.info is accessed, this method is finds a
+ ZipPathInfo entry for the path without resolving any final symlink
+ (using follow_symlinks=False)
+ - When ZipPathInfo methods are called with follow_symlinks=True, this
+ method resolves any symlink in the final path position.
+ """
+ link_count = 0
+ stack = path.split('/')[::-1] if path else []
+ info = self
+ while True:
+ if info.is_symlink() and (follow_symlinks or stack):
+ link_count += 1
+ if link_count >= 40:
+ return missing_zip_path_info # Symlink loop!
+ path = info.zip_file.read(info.zip_info).decode()
+ stack += path.split('/')[::-1] if path else []
+ info = info.parent
+
+ if stack:
+ name = stack.pop()
+ else:
+ return info
+
+ if name == '..':
+ info = info.parent
+ elif name and name != '.':
+ if name not in info.children:
+ if create:
+ info.children[name] = ZipPathInfo(info.zip_file, info)
+ else:
+ return missing_zip_path_info # No such child!
+ info = info.children[name]
+
+
+class ZipFileList:
+ """
+ `list`-like object that we inject as `ZipFile.filelist`. We maintain a
+ tree of `ZipPathInfo` objects representing the zip file members.
+ """
+
+ __slots__ = ('tree', '_items')
+
+ def __init__(self, zip_file):
+ self.tree = ZipPathInfo(zip_file)
+ self._items = []
+ for item in zip_file.filelist:
+ self.append(item)
+
+ def __len__(self):
+ return len(self._items)
+
+ def __iter__(self):
+ return iter(self._items)
+
+ def append(self, item):
+ self._items.append(item)
+ self.tree.resolve(item.filename, create=True).zip_info = item
+
+
+class ReadableZipPath(pathlib.types._ReadablePath):
+ """
+ Simple implementation of a ReadablePath class for .zip files.
+ """
+
+ __slots__ = ('_segments', 'zip_file')
+ parser = posixpath
+
+ def __init__(self, *pathsegments, zip_file):
+ self._segments = pathsegments
+ self.zip_file = zip_file
+ if not isinstance(zip_file.filelist, ZipFileList):
+ zip_file.filelist = ZipFileList(zip_file)
+
+ def __hash__(self):
+ return hash((str(self), self.zip_file))
+
+ def __eq__(self, other):
+ if not isinstance(other, ReadableZipPath):
+ return NotImplemented
+ return str(self) == str(other) and self.zip_file is other.zip_file
+
+ def __str__(self):
+ if not self._segments:
+ return ''
+ return self.parser.join(*self._segments)
+
+ def __repr__(self):
+ return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})'
+
+ def with_segments(self, *pathsegments):
+ return type(self)(*pathsegments, zip_file=self.zip_file)
+
+ @property
+ def info(self):
+ tree = self.zip_file.filelist.tree
+ return tree.resolve(str(self), follow_symlinks=False)
+
+ def __open_rb__(self, buffering=-1):
+ info = self.info.resolve()
+ if not info.exists():
+ raise FileNotFoundError(errno.ENOENT, "File not found", self)
+ elif info.is_dir():
+ raise IsADirectoryError(errno.EISDIR, "Is a directory", self)
+ return self.zip_file.open(info.zip_info, 'r')
+
+ def iterdir(self):
+ info = self.info.resolve()
+ if not info.exists():
+ raise FileNotFoundError(errno.ENOENT, "File not found", self)
+ elif not info.is_dir():
+ raise NotADirectoryError(errno.ENOTDIR, "Not a directory", self)
+ return (self / name for name in info.children)
+
+ def readlink(self):
+ info = self.info
+ if not info.exists():
+ raise FileNotFoundError(errno.ENOENT, "File not found", self)
+ elif not info.is_symlink():
+ raise OSError(errno.EINVAL, "Not a symlink", self)
+ return self.with_segments(self.zip_file.read(info.zip_info).decode())
with self.assertRaises(pathlib.UnsupportedOperation):
q.symlink_to(p)
+ def test_info_exists_caching(self):
+ p = self.cls(self.base)
+ q = p / 'myfile'
+ self.assertFalse(q.info.exists())
+ self.assertFalse(q.info.exists(follow_symlinks=False))
+ q.write_text('hullo')
+ self.assertFalse(q.info.exists())
+ self.assertFalse(q.info.exists(follow_symlinks=False))
+
+ def test_info_is_dir_caching(self):
+ p = self.cls(self.base)
+ q = p / 'mydir'
+ self.assertFalse(q.info.is_dir())
+ self.assertFalse(q.info.is_dir(follow_symlinks=False))
+ q.mkdir()
+ self.assertFalse(q.info.is_dir())
+ self.assertFalse(q.info.is_dir(follow_symlinks=False))
+
+ def test_info_is_file_caching(self):
+ p = self.cls(self.base)
+ q = p / 'myfile'
+ self.assertFalse(q.info.is_file())
+ self.assertFalse(q.info.is_file(follow_symlinks=False))
+ q.write_text('hullo')
+ self.assertFalse(q.info.is_file())
+ self.assertFalse(q.info.is_file(follow_symlinks=False))
+
@needs_symlinks
def test_info_is_symlink_caching(self):
p = self.cls(self.base)
normcase = self.parser.normcase
self.assertEqual(normcase(path_a), normcase(path_b))
- def test_is_readable(self):
- p = self.cls(self.base)
- self.assertIsInstance(p, _ReadablePath)
-
- def test_magic_open(self):
- p = self.cls(self.base)
- with magic_open(p / 'fileA', 'r') as f:
- self.assertIsInstance(f, io.TextIOBase)
- self.assertEqual(f.read(), "this is file A\n")
- with magic_open(p / 'fileA', 'rb') as f:
- self.assertIsInstance(f, io.BufferedIOBase)
- self.assertEqual(f.read().strip(), b"this is file A")
-
- def test_iterdir(self):
- P = self.cls
- p = P(self.base)
- it = p.iterdir()
- paths = set(it)
- expected = ['dirA', 'dirB', 'dirC', 'dirE', 'fileA']
- if self.can_symlink:
- expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop']
- self.assertEqual(paths, { P(self.base, q) for q in expected })
-
- def test_iterdir_nodir(self):
- # __iter__ on something that is not a directory.
- p = self.cls(self.base, 'fileA')
- with self.assertRaises(OSError) as cm:
- p.iterdir()
- # ENOENT or EINVAL under Windows, ENOTDIR otherwise
- # (see issue #12802).
- self.assertIn(cm.exception.errno, (errno.ENOTDIR,
- errno.ENOENT, errno.EINVAL))
-
- def test_iterdir_info(self):
- p = self.cls(self.base)
- for child in p.iterdir():
- self.assertIsInstance(child.info, PathInfo)
- self.assertTrue(child.info.exists(follow_symlinks=False))
-
- def test_glob_common(self):
- def _check(glob, expected):
- self.assertEqual(set(glob), { P(self.base, q) for q in expected })
- P = self.cls
- p = P(self.base)
- it = p.glob("fileA")
- self.assertIsInstance(it, collections.abc.Iterator)
- _check(it, ["fileA"])
- _check(p.glob("fileB"), [])
- _check(p.glob("dir*/file*"), ["dirB/fileB", "dirC/fileC"])
- if not self.can_symlink:
- _check(p.glob("*A"), ['dirA', 'fileA'])
- else:
- _check(p.glob("*A"), ['dirA', 'fileA', 'linkA'])
- if not self.can_symlink:
- _check(p.glob("*B/*"), ['dirB/fileB'])
- else:
- _check(p.glob("*B/*"), ['dirB/fileB', 'dirB/linkD',
- 'linkB/fileB', 'linkB/linkD'])
- if not self.can_symlink:
- _check(p.glob("*/fileB"), ['dirB/fileB'])
- else:
- _check(p.glob("*/fileB"), ['dirB/fileB', 'linkB/fileB'])
- if self.can_symlink:
- _check(p.glob("brokenLink"), ['brokenLink'])
-
- if not self.can_symlink:
- _check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/"])
- else:
- _check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/", "linkB/"])
-
@needs_posix
def test_glob_posix(self):
P = self.cls
self.assertEqual(set(p.glob("*a\\")), { P(self.base, "dirA/") })
self.assertEqual(set(p.glob("F*a")), { P(self.base, "fileA") })
- def test_glob_empty_pattern(self):
- P = self.cls
- p = P(self.base)
- self.assertEqual(list(p.glob("")), [p.joinpath("")])
-
- def test_info_exists(self):
- p = self.cls(self.base)
- self.assertTrue(p.info.exists())
- self.assertTrue((p / 'dirA').info.exists())
- self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False))
- self.assertTrue((p / 'fileA').info.exists())
- self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False))
- self.assertFalse((p / 'non-existing').info.exists())
- self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False))
- if self.can_symlink:
- self.assertTrue((p / 'linkA').info.exists())
- self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False))
- self.assertTrue((p / 'linkB').info.exists())
- self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True))
- self.assertFalse((p / 'brokenLink').info.exists())
- self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False))
- self.assertFalse((p / 'brokenLinkLoop').info.exists())
- self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False))
- self.assertFalse((p / 'fileA\udfff').info.exists())
- self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False))
- self.assertFalse((p / 'fileA\x00').info.exists())
- self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False))
-
- def test_info_exists_caching(self):
- p = self.cls(self.base)
- q = p / 'myfile'
- self.assertFalse(q.info.exists())
- self.assertFalse(q.info.exists(follow_symlinks=False))
- if isinstance(self.cls, _WritablePath):
- q.write_text('hullo')
- self.assertFalse(q.info.exists())
- self.assertFalse(q.info.exists(follow_symlinks=False))
-
- def test_info_is_dir(self):
- p = self.cls(self.base)
- self.assertTrue((p / 'dirA').info.is_dir())
- self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'fileA').info.is_dir())
- self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'non-existing').info.is_dir())
- self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False))
- if self.can_symlink:
- self.assertFalse((p / 'linkA').info.is_dir())
- self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False))
- self.assertTrue((p / 'linkB').info.is_dir())
- self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'brokenLink').info.is_dir())
- self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'brokenLinkLoop').info.is_dir())
- self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'dirA\udfff').info.is_dir())
- self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False))
- self.assertFalse((p / 'dirA\x00').info.is_dir())
- self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False))
-
- def test_info_is_dir_caching(self):
- p = self.cls(self.base)
- q = p / 'mydir'
- self.assertFalse(q.info.is_dir())
- self.assertFalse(q.info.is_dir(follow_symlinks=False))
- if isinstance(self.cls, _WritablePath):
- q.mkdir()
- self.assertFalse(q.info.is_dir())
- self.assertFalse(q.info.is_dir(follow_symlinks=False))
-
- def test_info_is_file(self):
- p = self.cls(self.base)
- self.assertTrue((p / 'fileA').info.is_file())
- self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'dirA').info.is_file())
- self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'non-existing').info.is_file())
- self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False))
- if self.can_symlink:
- self.assertTrue((p / 'linkA').info.is_file())
- self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'linkB').info.is_file())
- self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'brokenLink').info.is_file())
- self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'brokenLinkLoop').info.is_file())
- self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'fileA\udfff').info.is_file())
- self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False))
- self.assertFalse((p / 'fileA\x00').info.is_file())
- self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False))
-
- def test_info_is_file_caching(self):
- p = self.cls(self.base)
- q = p / 'myfile'
- self.assertFalse(q.info.is_file())
- self.assertFalse(q.info.is_file(follow_symlinks=False))
- if isinstance(self.cls, _WritablePath):
- q.write_text('hullo')
- self.assertFalse(q.info.is_file())
- self.assertFalse(q.info.is_file(follow_symlinks=False))
-
- def test_info_is_symlink(self):
- p = self.cls(self.base)
- self.assertFalse((p / 'fileA').info.is_symlink())
- self.assertFalse((p / 'dirA').info.is_symlink())
- self.assertFalse((p / 'non-existing').info.is_symlink())
- if self.can_symlink:
- self.assertTrue((p / 'linkA').info.is_symlink())
- self.assertTrue((p / 'linkB').info.is_symlink())
- self.assertTrue((p / 'brokenLink').info.is_symlink())
- self.assertFalse((p / 'linkA\udfff').info.is_symlink())
- self.assertFalse((p / 'linkA\x00').info.is_symlink())
- self.assertTrue((p / 'brokenLinkLoop').info.is_symlink())
- self.assertFalse((p / 'fileA\udfff').info.is_symlink())
- self.assertFalse((p / 'fileA\x00').info.is_symlink())
-
class WritablePathTest(JoinablePathTest):
cls = DummyWritablePath
self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
- def test_read_text_with_newlines(self):
- p = self.cls(self.base)
- # Check that `\n` character change nothing
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\n'),
- 'abcde\r\nfghlk\n\rmnopq')
- # Check that `\r` character replaces `\n`
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\r'),
- 'abcde\r\nfghlk\n\rmnopq')
- # Check that `\r\n` character replaces `\n`
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
- 'abcde\r\nfghlk\n\rmnopq')
-
def test_write_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
cls._files.clear()
cls._directories.clear()
- def test_walk_topdown(self):
- walker = self.walk_path.walk()
- entry = next(walker)
- entry[1].sort() # Ensure we visit SUB1 before SUB2
- self.assertEqual(entry, (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
- entry = next(walker)
- self.assertEqual(entry, (self.sub1_path, ["SUB11"], ["tmp2"]))
- entry = next(walker)
- self.assertEqual(entry, (self.sub11_path, [], []))
- entry = next(walker)
- entry[1].sort()
- entry[2].sort()
- self.assertEqual(entry, self.sub2_tree)
- with self.assertRaises(StopIteration):
- next(walker)
-
- def test_walk_prune(self):
- # Prune the search.
- all = []
- for root, dirs, files in self.walk_path.walk():
- all.append((root, dirs, files))
- if 'SUB1' in dirs:
- # Note that this also mutates the dirs we appended to all!
- dirs.remove('SUB1')
-
- self.assertEqual(len(all), 2)
- self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
-
- all[1][-1].sort()
- all[1][1].sort()
- self.assertEqual(all[1], self.sub2_tree)
-
- def test_walk_bottom_up(self):
- seen_testfn = seen_sub1 = seen_sub11 = seen_sub2 = False
- for path, dirnames, filenames in self.walk_path.walk(top_down=False):
- if path == self.walk_path:
- self.assertFalse(seen_testfn)
- self.assertTrue(seen_sub1)
- self.assertTrue(seen_sub2)
- self.assertEqual(sorted(dirnames), ["SUB1", "SUB2"])
- self.assertEqual(filenames, ["tmp1"])
- seen_testfn = True
- elif path == self.sub1_path:
- self.assertFalse(seen_testfn)
- self.assertFalse(seen_sub1)
- self.assertTrue(seen_sub11)
- self.assertEqual(dirnames, ["SUB11"])
- self.assertEqual(filenames, ["tmp2"])
- seen_sub1 = True
- elif path == self.sub11_path:
- self.assertFalse(seen_sub1)
- self.assertFalse(seen_sub11)
- self.assertEqual(dirnames, [])
- self.assertEqual(filenames, [])
- seen_sub11 = True
- elif path == self.sub2_path:
- self.assertFalse(seen_testfn)
- self.assertFalse(seen_sub2)
- self.assertEqual(sorted(dirnames), sorted(self.sub2_tree[1]))
- self.assertEqual(sorted(filenames), sorted(self.sub2_tree[2]))
- seen_sub2 = True
- else:
- raise AssertionError(f"Unexpected path: {path}")
- self.assertTrue(seen_testfn)
-
-
if __name__ == "__main__":
unittest.main()
--- /dev/null
+"""
+Tests for pathlib.types._ReadablePath
+"""
+
+import collections.abc
+import io
+import unittest
+
+from pathlib import Path
+from pathlib.types import PathInfo, _ReadablePath
+from pathlib._os import magic_open
+
+from test.test_pathlib.support.local_path import ReadableLocalPath, LocalPathGround
+from test.test_pathlib.support.zip_path import ReadableZipPath, ZipPathGround
+
+
+class ReadTestBase:
+ def setUp(self):
+ self.root = self.ground.setup()
+ self.ground.create_hierarchy(self.root)
+
+ def tearDown(self):
+ self.ground.teardown(self.root)
+
+ def test_is_readable(self):
+ self.assertIsInstance(self.root, _ReadablePath)
+
+ def test_open_r(self):
+ p = self.root / 'fileA'
+ with magic_open(p, 'r') as f:
+ self.assertIsInstance(f, io.TextIOBase)
+ self.assertEqual(f.read(), 'this is file A\n')
+
+ def test_open_rb(self):
+ p = self.root / 'fileA'
+ with magic_open(p, 'rb') as f:
+ self.assertEqual(f.read(), b'this is file A\n')
+
+ def test_read_bytes(self):
+ p = self.root / 'fileA'
+ self.assertEqual(p.read_bytes(), b'this is file A\n')
+
+ def test_read_text(self):
+ p = self.root / 'fileA'
+ self.assertEqual(p.read_text(), 'this is file A\n')
+ q = self.root / 'abc'
+ self.ground.create_file(q, b'\xe4bcdefg')
+ self.assertEqual(q.read_text(encoding='latin-1'), 'äbcdefg')
+ self.assertEqual(q.read_text(encoding='utf-8', errors='ignore'), 'bcdefg')
+
+ def test_read_text_with_newlines(self):
+ p = self.root / 'abc'
+ self.ground.create_file(p, b'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\n` character change nothing
+ self.assertEqual(p.read_text(newline='\n'), 'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\r` character replaces `\n`
+ self.assertEqual(p.read_text(newline='\r'), 'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\r\n` character replaces `\n`
+ self.assertEqual(p.read_text(newline='\r\n'), 'abcde\r\nfghlk\n\rmnopq')
+
+ def test_iterdir(self):
+ expected = ['dirA', 'dirB', 'dirC', 'fileA']
+ if self.ground.can_symlink:
+ expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop']
+ expected = {self.root.joinpath(name) for name in expected}
+ actual = set(self.root.iterdir())
+ self.assertEqual(actual, expected)
+
+ def test_iterdir_nodir(self):
+ p = self.root / 'fileA'
+ self.assertRaises(OSError, p.iterdir)
+
+ def test_iterdir_info(self):
+ for child in self.root.iterdir():
+ self.assertIsInstance(child.info, PathInfo)
+ self.assertTrue(child.info.exists(follow_symlinks=False))
+
+ def test_glob(self):
+ if not self.ground.can_symlink:
+ self.skipTest("requires symlinks")
+
+ p = self.root
+ sep = self.root.parser.sep
+ altsep = self.root.parser.altsep
+ def check(pattern, expected):
+ if altsep:
+ expected = {name.replace(altsep, sep) for name in expected}
+ expected = {p.joinpath(name) for name in expected}
+ actual = set(p.glob(pattern, recurse_symlinks=True))
+ self.assertEqual(actual, expected)
+
+ it = p.glob("fileA")
+ self.assertIsInstance(it, collections.abc.Iterator)
+ self.assertEqual(list(it), [p.joinpath("fileA")])
+ check("*A", ["dirA", "fileA", "linkA"])
+ check("*A", ['dirA', 'fileA', 'linkA'])
+ check("*B/*", ["dirB/fileB", "linkB/fileB"])
+ check("*B/*", ['dirB/fileB', 'linkB/fileB'])
+ check("brokenLink", ['brokenLink'])
+ check("brokenLinkLoop", ['brokenLinkLoop'])
+ check("**/", ["", "dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"])
+ check("**/*/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"])
+ check("*/", ["dirA/", "dirB/", "dirC/", "linkB/"])
+ check("*/dirD/**/", ["dirC/dirD/"])
+ check("*/dirD/**", ["dirC/dirD/", "dirC/dirD/fileD"])
+ check("dir*/**", ["dirA/", "dirA/linkC", "dirA/linkC/fileB", "dirB/", "dirB/fileB", "dirC/",
+ "dirC/fileC", "dirC/dirD", "dirC/dirD/fileD", "dirC/novel.txt"])
+ check("dir*/**/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/"])
+ check("dir*/**/..", ["dirA/..", "dirA/linkC/..", "dirB/..", "dirC/..", "dirC/dirD/.."])
+ check("dir*/*/**", ["dirA/linkC/", "dirA/linkC/fileB", "dirC/dirD/", "dirC/dirD/fileD"])
+ check("dir*/*/**/", ["dirA/linkC/", "dirC/dirD/"])
+ check("dir*/*/**/..", ["dirA/linkC/..", "dirC/dirD/.."])
+ check("dir*/*/..", ["dirC/dirD/..", "dirA/linkC/.."])
+ check("dir*/*/../dirD/**/", ["dirC/dirD/../dirD/"])
+ check("dir*/**/fileC", ["dirC/fileC"])
+ check("dir*/file*", ["dirB/fileB", "dirC/fileC"])
+ check("**/*/fileA", [])
+ check("fileB", [])
+ check("**/*/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"])
+ check("**/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"])
+ check("*/fileB", ["dirB/fileB", "linkB/fileB"])
+ check("*/fileB", ['dirB/fileB', 'linkB/fileB'])
+ check("**/file*",
+ ["fileA", "dirA/linkC/fileB", "dirB/fileB", "dirC/fileC", "dirC/dirD/fileD",
+ "linkB/fileB"])
+
+ def test_walk_top_down(self):
+ it = self.root.walk()
+
+ path, dirnames, filenames = next(it)
+ dirnames.sort()
+ filenames.sort()
+ self.assertEqual(path, self.root)
+ self.assertEqual(dirnames, ['dirA', 'dirB', 'dirC'])
+ self.assertEqual(filenames, ['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB']
+ if self.ground.can_symlink else ['fileA'])
+
+ path, dirnames, filenames = next(it)
+ self.assertEqual(path, self.root / 'dirA')
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else [])
+
+ path, dirnames, filenames = next(it)
+ self.assertEqual(path, self.root / 'dirB')
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['fileB'])
+
+ path, dirnames, filenames = next(it)
+ filenames.sort()
+ self.assertEqual(path, self.root / 'dirC')
+ self.assertEqual(dirnames, ['dirD'])
+ self.assertEqual(filenames, ['fileC', 'novel.txt'])
+
+ path, dirnames, filenames = next(it)
+ self.assertEqual(path, self.root / 'dirC' / 'dirD')
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['fileD'])
+
+ self.assertRaises(StopIteration, next, it)
+
+ def test_walk_prune(self):
+ expected = {self.root, self.root / 'dirA', self.root / 'dirC', self.root / 'dirC' / 'dirD'}
+ actual = set()
+ for path, dirnames, filenames in self.root.walk():
+ actual.add(path)
+ if path == self.root:
+ dirnames.remove('dirB')
+ self.assertEqual(actual, expected)
+
+ def test_walk_bottom_up(self):
+ seen_root = seen_dira = seen_dirb = seen_dirc = seen_dird = False
+ for path, dirnames, filenames in self.root.walk(top_down=False):
+ if path == self.root:
+ self.assertFalse(seen_root)
+ self.assertTrue(seen_dira)
+ self.assertTrue(seen_dirb)
+ self.assertTrue(seen_dirc)
+ self.assertEqual(sorted(dirnames), ['dirA', 'dirB', 'dirC'])
+ self.assertEqual(sorted(filenames),
+ ['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB']
+ if self.ground.can_symlink else ['fileA'])
+ seen_root = True
+ elif path == self.root / 'dirA':
+ self.assertFalse(seen_root)
+ self.assertFalse(seen_dira)
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else [])
+ seen_dira = True
+ elif path == self.root / 'dirB':
+ self.assertFalse(seen_root)
+ self.assertFalse(seen_dirb)
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['fileB'])
+ seen_dirb = True
+ elif path == self.root / 'dirC':
+ self.assertFalse(seen_root)
+ self.assertFalse(seen_dirc)
+ self.assertTrue(seen_dird)
+ self.assertEqual(dirnames, ['dirD'])
+ self.assertEqual(sorted(filenames), ['fileC', 'novel.txt'])
+ seen_dirc = True
+ elif path == self.root / 'dirC' / 'dirD':
+ self.assertFalse(seen_root)
+ self.assertFalse(seen_dirc)
+ self.assertFalse(seen_dird)
+ self.assertEqual(dirnames, [])
+ self.assertEqual(filenames, ['fileD'])
+ seen_dird = True
+ else:
+ raise AssertionError(f"Unexpected path: {path}")
+ self.assertTrue(seen_root)
+
+ def test_info_exists(self):
+ p = self.root
+ self.assertTrue(p.info.exists())
+ self.assertTrue((p / 'dirA').info.exists())
+ self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False))
+ self.assertTrue((p / 'fileA').info.exists())
+ self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False))
+ self.assertFalse((p / 'non-existing').info.exists())
+ self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False))
+ if self.ground.can_symlink:
+ self.assertTrue((p / 'linkA').info.exists())
+ self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False))
+ self.assertTrue((p / 'linkB').info.exists())
+ self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True))
+ self.assertFalse((p / 'brokenLink').info.exists())
+ self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False))
+ self.assertFalse((p / 'brokenLinkLoop').info.exists())
+ self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False))
+ self.assertFalse((p / 'fileA\udfff').info.exists())
+ self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False))
+ self.assertFalse((p / 'fileA\x00').info.exists())
+ self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False))
+
+ def test_info_is_dir(self):
+ p = self.root
+ self.assertTrue((p / 'dirA').info.is_dir())
+ self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'fileA').info.is_dir())
+ self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'non-existing').info.is_dir())
+ self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False))
+ if self.ground.can_symlink:
+ self.assertFalse((p / 'linkA').info.is_dir())
+ self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False))
+ self.assertTrue((p / 'linkB').info.is_dir())
+ self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'brokenLink').info.is_dir())
+ self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'brokenLinkLoop').info.is_dir())
+ self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'dirA\udfff').info.is_dir())
+ self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False))
+ self.assertFalse((p / 'dirA\x00').info.is_dir())
+ self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False))
+
+ def test_info_is_file(self):
+ p = self.root
+ self.assertTrue((p / 'fileA').info.is_file())
+ self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'dirA').info.is_file())
+ self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'non-existing').info.is_file())
+ self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False))
+ if self.ground.can_symlink:
+ self.assertTrue((p / 'linkA').info.is_file())
+ self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'linkB').info.is_file())
+ self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'brokenLink').info.is_file())
+ self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'brokenLinkLoop').info.is_file())
+ self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'fileA\udfff').info.is_file())
+ self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False))
+ self.assertFalse((p / 'fileA\x00').info.is_file())
+ self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False))
+
+ def test_info_is_symlink(self):
+ p = self.root
+ self.assertFalse((p / 'fileA').info.is_symlink())
+ self.assertFalse((p / 'dirA').info.is_symlink())
+ self.assertFalse((p / 'non-existing').info.is_symlink())
+ if self.ground.can_symlink:
+ self.assertTrue((p / 'linkA').info.is_symlink())
+ self.assertTrue((p / 'linkB').info.is_symlink())
+ self.assertTrue((p / 'brokenLink').info.is_symlink())
+ self.assertFalse((p / 'linkA\udfff').info.is_symlink())
+ self.assertFalse((p / 'linkA\x00').info.is_symlink())
+ self.assertTrue((p / 'brokenLinkLoop').info.is_symlink())
+ self.assertFalse((p / 'fileA\udfff').info.is_symlink())
+ self.assertFalse((p / 'fileA\x00').info.is_symlink())
+
+
+class ZipPathReadTest(ReadTestBase, unittest.TestCase):
+ ground = ZipPathGround(ReadableZipPath)
+
+
+class LocalPathReadTest(ReadTestBase, unittest.TestCase):
+ ground = LocalPathGround(ReadableLocalPath)
+
+
+class PathReadTest(ReadTestBase, unittest.TestCase):
+ ground = LocalPathGround(Path)
+
+
+if __name__ == "__main__":
+ unittest.main()