"""
def __init__(self, name, mode, comptype, fileobj, bufsize,
- compresslevel, preset):
+ compresslevel, preset, mtime):
"""Construct a _Stream object.
"""
self._extfileobj = True
self.exception = zlib.error
self._init_read_gz()
else:
- self._init_write_gz(compresslevel)
+ self._init_write_gz(compresslevel, mtime)
elif comptype == "bz2":
try:
if hasattr(self, "closed") and not self.closed:
self.close()
- def _init_write_gz(self, compresslevel):
+ def _init_write_gz(self, compresslevel, mtime):
"""Initialize for writing with gzip compression.
"""
self.cmp = self.zlib.compressobj(compresslevel,
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
- timestamp = struct.pack("<L", int(time.time()))
+ if mtime is None:
+ mtime = int(time.time())
+ timestamp = struct.pack("<L", mtime)
self.__write(b"\037\213\010\010" + timestamp + b"\002\377")
if self.name.endswith(".gz"):
self.name = self.name[:-3]
def __init__(self, name=None, mode="r", fileobj=None, format=None,
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
errors="surrogateescape", pax_headers=None, debug=None,
- errorlevel=None, copybufsize=None, stream=False):
+ errorlevel=None, copybufsize=None, stream=False, mtime=None):
"""Open an (uncompressed) tar archive 'name'. 'mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. 'mode'
compresslevel = kwargs.pop("compresslevel", 6)
preset = kwargs.pop("preset", None)
+ mtime = kwargs.pop("mtime", None)
stream = _Stream(name, filemode, comptype, fileobj, bufsize,
- compresslevel, preset)
+ compresslevel, preset, mtime)
try:
t = cls(name, filemode, stream, **kwargs)
except:
raise CompressionError("gzip module is not available") from None
try:
- fileobj = GzipFile(name, mode + "b", compresslevel, fileobj)
+ mtime = kwargs.pop("mtime", None)
+ fileobj = GzipFile(name, mode + "b", compresslevel, fileobj, mtime=mtime)
except OSError as e:
if fileobj is not None and mode == 'r':
raise ReadError("not a gzip file") from e
import re
import warnings
import stat
+import time
import unittest
import unittest.mock
payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
assert os.path.dirname(tmpname) not in payload
+ def test_create_with_mtime(self):
+ tarfile.open(tmpname, self.mode, mtime=0).close()
+ with self.open(tmpname, 'r') as fobj:
+ fobj.read()
+ self.assertEqual(fobj.mtime, 0)
+
+ def test_create_without_mtime(self):
+ before = int(time.time())
+ tarfile.open(tmpname, self.mode).close()
+ after = int(time.time())
+ with self.open(tmpname, 'r') as fobj:
+ fobj.read()
+ self.assertTrue(before <= fobj.mtime <= after)
class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
decompressor = bz2.BZ2Decompressor if bz2 else None
with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
pass
+ def test_create_with_mtime(self):
+ tarfile.open(tmpname, self.mode, mtime=0).close()
+ with self.open(tmpname, 'rb') as fobj:
+ fobj.read()
+ self.assertEqual(fobj.mtime, 0)
+
+ def test_create_without_mtime(self):
+ before = int(time.time())
+ tarfile.open(tmpname, self.mode).close()
+ after = int(time.time())
+ with self.open(tmpname, 'r') as fobj:
+ fobj.read()
+ self.assertTrue(before <= fobj.mtime <= after)
class Bz2CreateTest(Bz2Test, CreateTest):