if not zipfile.is_zipfile(filename):
raise ReadError("%s is not a zip file" % filename)
- zip = zipfile.ZipFile(filename)
- try:
- for info in zip.infolist():
- name = info.filename
-
- # don't extract absolute paths or ones with .. in them
- if name.startswith('/') or '..' in name:
- continue
-
- targetpath = os.path.join(extract_dir, *name.split('/'))
- if not targetpath:
- continue
-
- _ensure_directory(targetpath)
- if not name.endswith('/'):
- # file
- with zip.open(name, 'r') as source, \
- open(targetpath, 'wb') as target:
- copyfileobj(source, target)
- finally:
- zip.close()
+ with zipfile.ZipFile(filename) as zip:
+ zip._ignore_invalid_names = True
+ zip.extractall(extract_dir)
def _unpack_tarfile(filename, extract_dir, *, filter=None):
"""Unpack tar/tar.gz/tar.bz2/tar.xz/tar.zst `filename` to `extract_dir`
def check_unpack_archive(self, format, **kwargs):
self.check_unpack_archive_with_converter(
format, lambda path: path, **kwargs)
- self.check_unpack_archive_with_converter(
- format, FakePath, **kwargs)
self.check_unpack_archive_with_converter(format, FakePath, **kwargs)
def check_unpack_archive_with_converter(self, format, converter, **kwargs):
with self.assertRaises(TypeError):
self.check_unpack_archive('zip', filter='data')
+ def test_unpack_archive_zip_badpaths(self):
+ srcdir = self.mkdtemp()
+ zipname = os.path.join(srcdir, 'test.zip')
+ abspath = os.path.join(srcdir, 'abspath')
+ with zipfile.ZipFile(zipname, 'w') as zf:
+ zf.writestr(abspath, 'badfile')
+ zf.writestr(os.sep + abspath, 'badfile')
+ zf.writestr('/abspath', 'badfile')
+ zf.writestr('C:/abspath', 'badfile')
+ zf.writestr('D:\\abspath', 'badfile')
+ zf.writestr('E:abspath', 'badfile')
+ zf.writestr('F:/G:/abspath', 'badfile')
+ zf.writestr('//server/share/abspath', 'badfile')
+ zf.writestr('\\\\server2\\share\\abspath', 'badfile')
+ zf.writestr('../relpath', 'badfile')
+ zf.writestr(os.pardir + os.sep + 'relpath2', 'badfile')
+ zf.writestr('good/file', 'goodfile')
+ zf.writestr('good..file', 'goodfile')
+
+ dstdir = os.path.join(self.mkdtemp(), 'dst')
+ unpack_archive(zipname, dstdir)
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'good', 'file')))
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'good..file')))
+ self.assertFalse(os.path.exists(abspath))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, 'abspath')))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, 'G_')))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, 'server')))
+ if os.name != 'nt':
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'C:', 'abspath')))
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'D:\\abspath')))
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'E:abspath')))
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, 'F:', 'G:', 'abspath')))
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, '\\\\server2\\share\\abspath')))
+ if os.pardir == '..':
+ self.assertFalse(os.path.exists(os.path.join(dstdir, '..', 'relpath')))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, 'relpath')))
+ else:
+ self.assertTrue(os.path.isfile(os.path.join(dstdir, '..', 'relpath')))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, os.pardir, 'relpath2')))
+ self.assertFalse(os.path.exists(os.path.join(dstdir, 'relpath2')))
+
+ dstdir2 = os.path.join(self.mkdtemp(), 'dst')
+ os.mkdir(dstdir2)
+ with os_helper.change_cwd(dstdir2):
+ unpack_archive(zipname, '')
+ self.assertTrue(os.path.isfile(os.path.join('good', 'file')))
+ self.assertTrue(os.path.isfile('good..file'))
+ self.assertFalse(os.path.exists(abspath))
+ self.assertFalse(os.path.exists('abspath'))
+ self.assertFalse(os.path.exists('C_'))
+ self.assertFalse(os.path.exists('server'))
+ if os.name != 'nt':
+ self.assertTrue(os.path.isfile(os.path.join('C:', 'abspath')))
+ self.assertTrue(os.path.isfile('D:\\abspath'))
+ self.assertTrue(os.path.isfile('E:abspath'))
+ self.assertTrue(os.path.isfile(os.path.join('F:', 'G:', 'abspath')))
+ self.assertTrue(os.path.isfile('\\\\server2\\share\\abspath'))
+ if os.pardir == '..':
+ self.assertFalse(os.path.exists(os.path.join('..', 'relpath')))
+ self.assertFalse(os.path.exists('relpath'))
+ else:
+ self.assertTrue(os.path.isfile(os.path.join('..', 'relpath')))
+ self.assertFalse(os.path.exists(os.path.join(os.pardir, 'relpath2')))
+ self.assertFalse(os.path.exists('relpath2'))
+
def test_unpack_registry(self):
formats = get_unpack_formats()
fp = None # Set here since __del__ checks it
_windows_illegal_name_trans_table = None
+ _ignore_invalid_names = False
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True,
compresslevel=None, *, strict_timestamps=True, metadata_encoding=None):
# build the destination pathname, replacing
# forward slashes to platform specific separators.
- arcname = member.filename.replace('/', os.path.sep)
-
- if os.path.altsep:
+ arcname = member.filename
+ if os.path.sep != '/':
+ arcname = arcname.replace('/', os.path.sep)
+ if os.path.altsep and os.path.altsep != '/':
arcname = arcname.replace(os.path.altsep, os.path.sep)
# interpret absolute pathname as relative, remove drive letter or
# UNC path, redundant separators, "." and ".." components.
- arcname = os.path.splitdrive(arcname)[1]
+ drive, root, arcname = os.path.splitroot(arcname)
+ if self._ignore_invalid_names and (drive or root):
+ return None
+ if self._ignore_invalid_names and os.path.pardir in arcname.split(os.path.sep):
+ return None
invalid_path_parts = ('', os.path.curdir, os.path.pardir)
arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
if x not in invalid_path_parts)
if os.path.sep == '\\':
# filter illegal characters on Windows
- arcname = self._sanitize_windows_name(arcname, os.path.sep)
+ arcname2 = self._sanitize_windows_name(arcname, os.path.sep)
+ if self._ignore_invalid_names and arcname2 != arcname:
+ return None
+ arcname = arcname2
if not arcname and not member.is_dir():
+ if self._ignore_invalid_names:
+ return None
raise ValueError("Empty filename.")
targetpath = os.path.join(targetpath, arcname)