import pathlib
import pickle
import string
-from test.support.script_helper import assert_python_ok
+import sys
import unittest
import zipfile
-from ._test_params import parameterize, Invoked
from ._functools import compose
+from ._itertools import Counter
+from ._test_params import parameterize, Invoked
+from ._func_timeout_compat import set_timeout
from test.support.os_helper import temp_dir
-# Poor man's technique to consume a (smallish) iterable.
-consume = tuple
-
-
-# from jaraco.itertools 5.0
class jaraco:
class itertools:
- class Counter:
- def __init__(self, i):
- self.count = 0
- self._orig_iter = iter(i)
+ Counter = Counter
- def __iter__(self):
- return self
- def __next__(self):
- result = next(self._orig_iter)
- self.count += 1
- return result
+consume = tuple
def add_dirs(zf):
u16 = path.joinpath("16.txt")
with u16.open('r', "utf-16") as strm:
data = strm.read()
- self.assertEqual(data, "This was utf-16")
+ assert data == "This was utf-16"
with u16.open(encoding="utf-16") as strm:
data = strm.read()
- self.assertEqual(data, "This was utf-16")
+ assert data == "This was utf-16"
def test_open_encoding_errors(self):
in_memory_file = io.BytesIO()
# encoding= as a positional argument for gh-101144.
data = u16.read_text("utf-8", errors="ignore")
- self.assertEqual(data, "invalid utf-8: .")
+ assert data == "invalid utf-8: ."
with u16.open("r", "utf-8", errors="surrogateescape") as f:
- self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.")
+ assert f.read() == "invalid utf-8: \udcff\udcff."
# encoding= both positional and keyword is an error; gh-101144.
with self.assertRaisesRegex(TypeError, "encoding"):
with self.assertRaises(UnicodeDecodeError):
f.read()
- def test_encoding_warnings(self):
+ @unittest.skipIf(
+ not getattr(sys.flags, 'warn_default_encoding', 0),
+ "Requires warn_default_encoding",
+ )
+ @pass_alpharep
+ def test_encoding_warnings(self, alpharep):
"""EncodingWarning must blame the read_text and open calls."""
- code = '''\
-import io, zipfile
-with zipfile.ZipFile(io.BytesIO(), "w") as zf:
- zf.filename = '<test_encoding_warnings in memory zip file>'
- zf.writestr("path/file.txt", b"Spanish Inquisition")
- root = zipfile.Path(zf)
- (path,) = root.iterdir()
- file_path = path.joinpath("file.txt")
- unused = file_path.read_text() # should warn
- file_path.open("r").close() # should warn
-'''
- proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
- warnings = proc.err.splitlines()
- self.assertEqual(len(warnings), 2, proc.err)
- self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:")
- self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:")
+ assert sys.flags.warn_default_encoding
+ root = zipfile.Path(alpharep)
+ with self.assertWarns(EncodingWarning) as wc:
+ root.joinpath("a.txt").read_text()
+ assert __file__ == wc.filename
+ with self.assertWarns(EncodingWarning) as wc:
+ root.joinpath("a.txt").open("r").close()
+ assert __file__ == wc.filename
def test_open_write(self):
"""
root = zipfile.Path(alpharep)
a, b, g = root.iterdir()
assert a.read_text(encoding="utf-8") == "content of a"
- a.read_text("utf-8") # No positional arg TypeError per gh-101144.
+ # Also check positional encoding arg (gh-101144).
+ assert a.read_text("utf-8") == "content of a"
assert a.read_bytes() == b"content of a"
@pass_alpharep
e = root / "b" / "d" / "e.txt"
assert e.read_text(encoding="utf-8") == "content of e"
- @pass_alpharep
- def test_traverse_simplediv(self, alpharep):
- """
- Disable the __future__.division when testing traversal.
- """
- code = compile(
- source="zipfile.Path(alpharep) / 'a'",
- filename="(test)",
- mode="eval",
- dont_inherit=True,
- )
- eval(code)
-
@pass_alpharep
def test_pathlike_construction(self, alpharep):
"""
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
- # @func_timeout.func_set_timeout(3)
+ @set_timeout(3)
def test_implied_dirs_performance(self):
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
zipfile.CompleteDirs._implied_dirs(data)
assert sub.name == "b"
assert sub.parent
+ @pass_alpharep
+ def test_match_and_glob(self, alpharep):
+ root = zipfile.Path(alpharep)
+ assert not root.match("*.txt")
+
+ assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]
+
+ files = root.glob("**/*.txt")
+ assert all(each.match("*.txt") for each in files)
+
+ assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))
+
+ def test_glob_empty(self):
+ root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
+ with self.assertRaises(ValueError):
+ root.glob('')
+
+ @pass_alpharep
+ def test_eq_hash(self, alpharep):
+ root = zipfile.Path(alpharep)
+ assert root == zipfile.Path(alpharep)
+
+ assert root != (root / "a.txt")
+ assert (root / "a.txt") == (root / "a.txt")
+
+ root = zipfile.Path(alpharep)
+ assert root in {root}
+
+ @pass_alpharep
+ def test_is_symlink(self, alpharep):
+ """
+ See python/cpython#82102 for symlink support beyond this object.
+ """
+
+ root = zipfile.Path(alpharep)
+ assert not root.is_symlink()
+
+ @pass_alpharep
+ def test_relative_to(self, alpharep):
+ root = zipfile.Path(alpharep)
+ relative = root.joinpath("b", "c.txt").relative_to(root / "b")
+ assert str(relative) == "c.txt"
+
+ relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
+ assert str(relative) == "d/e.txt"
+
@pass_alpharep
def test_inheritance(self, alpharep):
cls = type('PathChild', (zipfile.Path,), {})
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
assert first.read_text().startswith('content of ')
+
+ @pass_alpharep
+ def test_extract_orig_with_implied_dirs(self, alpharep):
+ """
+ A zip file wrapped in a Path should extract even with implied dirs.
+ """
+ source_path = self.zipfile_ondisk(alpharep)
+ zf = zipfile.ZipFile(source_path)
+ # wrap the zipfile for its side effect
+ zipfile.Path(zf)
+ zf.extractall(source_path.parent)
import itertools
import contextlib
import pathlib
+import re
+import fnmatch
__all__ = ['Path']
return _dedupe(_difference(as_dirs, names))
def namelist(self):
- names = super(CompleteDirs, self).namelist()
+ names = super().namelist()
return names + list(self._implied_dirs(names))
def _name_set(self):
dir_match = name not in names and dirname in names
return dirname if dir_match else name
+ def getinfo(self, name):
+ """
+ Supplement getinfo for implied dirs.
+ """
+ try:
+ return super().getinfo(name)
+ except KeyError:
+ if not name.endswith('/') or name not in self._name_set():
+ raise
+ return zipfile.ZipInfo(filename=name)
+
@classmethod
def make(cls, source):
"""
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
- self.__names = super(FastLookup, self).namelist()
+ self.__names = super().namelist()
return self.__names
def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
- self.__lookup = super(FastLookup, self)._name_set()
+ self.__lookup = super()._name_set()
return self.__lookup
self.root = FastLookup.make(root)
self.at = at
+ def __eq__(self, other):
+ """
+ >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
+ False
+ """
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return (self.root, self.at) == (other.root, other.at)
+
+ def __hash__(self):
+ return hash((self.root, self.at))
+
def open(self, mode='r', *args, pwd=None, **kwargs):
"""
Open this entry as text or binary following the semantics
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)
+ def match(self, path_pattern):
+ return pathlib.Path(self.at).match(path_pattern)
+
+ def is_symlink(self):
+ """
+ Return whether this path is a symlink. Always false (python/cpython#82102).
+ """
+ return False
+
+ def _descendants(self):
+ for child in self.iterdir():
+ yield child
+ if child.is_dir():
+ yield from child._descendants()
+
+ def glob(self, pattern):
+ if not pattern:
+ raise ValueError(f"Unacceptable pattern: {pattern!r}")
+
+ matches = re.compile(fnmatch.translate(pattern)).fullmatch
+ return (
+ child
+ for child in self._descendants()
+ if matches(str(child.relative_to(self)))
+ )
+
+ def rglob(self, pattern):
+ return self.glob(f'**/{pattern}')
+
+ def relative_to(self, other, *extra):
+ return posixpath.relpath(str(self), str(other.joinpath(*extra)))
+
def __str__(self):
return posixpath.join(self.root.filename, self.at)