from test.support import import_helper
from test.support import os_helper
-from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED
+from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED, ZIP_ZSTANDARD
import zipimport
import linecache
# occur in that case (builtin modules are always found first),
# so we'll simply skip it then. Bug #765456.
#
- if "zlib" in sys.builtin_module_names:
- self.skipTest('zlib is a builtin module')
- if "zlib" in sys.modules:
- del sys.modules["zlib"]
- files = {"zlib.py": test_src}
+ if self.compression == ZIP_DEFLATED:
+ mod_name = "zlib"
+ if zipimport._zlib_decompress: # validate attr name
+ # reset the cached import to avoid test order dependencies
+ zipimport._zlib_decompress = None # reset cache
+ elif self.compression == ZIP_ZSTANDARD:
+ mod_name = "_zstd"
+ if zipimport._zstd_decompressor_class: # validate attr name
+ # reset the cached import to avoid test order dependencies
+ zipimport._zstd_decompressor_class = None
+ else:
+ mod_name = "zlib" # the ZIP_STORED case below
+
+ if mod_name in sys.builtin_module_names:
+ self.skipTest(f"{mod_name} is a builtin module")
+ if mod_name in sys.modules:
+ del sys.modules[mod_name]
+ files = {f"{mod_name}.py": test_src}
try:
- self.doTest(".py", files, "zlib")
+ self.doTest(".py", files, mod_name)
except ImportError:
- if self.compression != ZIP_DEFLATED:
- self.fail("expected test to not raise ImportError")
- else:
if self.compression != ZIP_STORED:
- self.fail("expected test to raise ImportError")
+ # Expected - fake compression module can't decompress
+ pass
+ else:
+ self.fail("expected test to not raise ImportError for uncompressed")
+ else:
+ if self.compression == ZIP_STORED:
+ # Expected - no compression needed, so fake module works
+ pass
+ else:
+ self.fail("expected test to raise ImportError for compressed zip with fake compression module")
def testPy(self):
files = {TESTMOD + ".py": test_src}
@support.requires_zlib()
-class CompressedZipImportTestCase(UncompressedZipImportTestCase):
+class DeflateCompressedZipImportTestCase(UncompressedZipImportTestCase):
compression = ZIP_DEFLATED
+@support.requires_zstd()
+class ZStdCompressedZipImportTestCase(UncompressedZipImportTestCase):
+ compression = ZIP_ZSTANDARD
+
+
class BadFileZipImportTestCase(unittest.TestCase):
def assertZipFailure(self, filename):
self.assertRaises(zipimport.ZipImportError,
)
_importing_zlib = False
+_zlib_decompress = None
# Return the zlib.decompress function object, or NULL if zlib couldn't
# be imported. The function is cached when found, so subsequent calls
# don't import zlib again.
-def _get_decompress_func():
+def _get_zlib_decompress_func():
+ global _zlib_decompress
+ if _zlib_decompress:
+ return _zlib_decompress
+
global _importing_zlib
if _importing_zlib:
# Someone has a zlib.py[co] in their Zip file
_importing_zlib = True
try:
- from zlib import decompress
+ from zlib import decompress as _zlib_decompress
except Exception:
_bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
raise ZipImportError("can't decompress data; zlib not available")
_importing_zlib = False
_bootstrap._verbose_message('zipimport: zlib available')
- return decompress
+ return _zlib_decompress
+
+
+_importing_zstd = False
+_zstd_decompressor_class = None
+
+# Return the _zstd.ZstdDecompressor function object, or NULL if _zstd couldn't
+# be imported. The result is cached when found.
+def _get_zstd_decompressor_class():
+ global _zstd_decompressor_class
+ if _zstd_decompressor_class:
+ return _zstd_decompressor_class
+
+ global _importing_zstd
+ if _importing_zstd:
+ # Someone has a _zstd.py[co] in their Zip file
+ # let's avoid a stack overflow.
+ _bootstrap._verbose_message("zipimport: zstd UNAVAILABLE")
+ raise ZipImportError("can't decompress data; zstd not available")
+
+ _importing_zstd = True
+ try:
+ from _zstd import ZstdDecompressor as _zstd_decompressor_class
+ except Exception:
+ _bootstrap._verbose_message("zipimport: zstd UNAVAILABLE")
+ raise ZipImportError("can't decompress data; zstd not available")
+ finally:
+ _importing_zstd = False
+
+ _bootstrap._verbose_message("zipimport: zstd available")
+ return _zstd_decompressor_class
+
+
+def _zstd_decompress(data):
+ # A simple version of compression.zstd.decompress() as we cannot import
+ # that here as the stdlib itself could be being zipimported.
+ results = []
+ while True:
+ decomp = _get_zstd_decompressor_class()()
+ results.append(decomp.decompress(data))
+ if not decomp.eof:
+ raise ZipImportError("zipimport: zstd compressed data ended before "
+ "the end-of-stream marker")
+ data = decomp.unused_data
+ if not data:
+ break
+ return b"".join(results)
+
# Given a path to a Zip file and a toc_entry, return the (uncompressed) data.
def _get_data(archive, toc_entry):
if len(raw_data) != data_size:
raise OSError("zipimport: can't read data")
- if compress == 0:
- # data is not compressed
- return raw_data
-
- # Decompress with zlib
- try:
- decompress = _get_decompress_func()
- except Exception:
- raise ZipImportError("can't decompress data; zlib not available")
- return decompress(raw_data, -15)
+ match compress:
+ case 0: # stored
+ return raw_data
+ case 8: # deflate aka zlib
+ try:
+ decompress = _get_zlib_decompress_func()
+ except Exception:
+ raise ZipImportError("can't decompress data; zlib not available")
+ return decompress(raw_data, -15)
+ case 93: # zstd
+ try:
+ return _zstd_decompress(raw_data)
+ except Exception:
+ raise ZipImportError("could not decompress zstd data")
+ # bz2 and lzma could be added, but are largely obsolete.
+ case _:
+ raise ZipImportError(f"zipimport: unsupported compression {compress}")
# Lenient date/time comparison function. The precision of the mtime