raise ValueError("Invalid mode: {!r}".format(mode))
if mode and 'b' not in mode:
mode += 'b'
- if fileobj is None:
- fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
- if filename is None:
- filename = getattr(fileobj, 'name', '')
- if not isinstance(filename, (str, bytes)):
- filename = ''
- else:
- filename = os.fspath(filename)
- origmode = mode
- if mode is None:
- mode = getattr(fileobj, 'mode', 'rb')
-
-
- if mode.startswith('r'):
- self.mode = READ
- raw = _GzipReader(fileobj)
- self._buffer = io.BufferedReader(raw)
- self.name = filename
-
- elif mode.startswith(('w', 'a', 'x')):
- if origmode is None:
- import warnings
- warnings.warn(
- "GzipFile was opened for writing, but this will "
- "change in future Python releases. "
- "Specify the mode argument for opening it for writing.",
- FutureWarning, 2)
- self.mode = WRITE
- self._init_write(filename)
- self.compress = zlib.compressobj(compresslevel,
- zlib.DEFLATED,
- -zlib.MAX_WBITS,
- zlib.DEF_MEM_LEVEL,
- 0)
- self._write_mtime = mtime
- self._buffer_size = _WRITE_BUFFER_SIZE
- self._buffer = io.BufferedWriter(_WriteBufferStream(self),
- buffer_size=self._buffer_size)
- else:
- raise ValueError("Invalid mode: {!r}".format(mode))
- self.fileobj = fileobj
+ try:
+ if fileobj is None:
+ fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
+ if filename is None:
+ filename = getattr(fileobj, 'name', '')
+ if not isinstance(filename, (str, bytes)):
+ filename = ''
+ else:
+ filename = os.fspath(filename)
+ origmode = mode
+ if mode is None:
+ mode = getattr(fileobj, 'mode', 'rb')
+
+
+ if mode.startswith('r'):
+ self.mode = READ
+ raw = _GzipReader(fileobj)
+ self._buffer = io.BufferedReader(raw)
+ self.name = filename
+
+ elif mode.startswith(('w', 'a', 'x')):
+ if origmode is None:
+ import warnings
+ warnings.warn(
+ "GzipFile was opened for writing, but this will "
+ "change in future Python releases. "
+ "Specify the mode argument for opening it for writing.",
+ FutureWarning, 2)
+ self.mode = WRITE
+ self._init_write(filename)
+ self.compress = zlib.compressobj(compresslevel,
+ zlib.DEFLATED,
+ -zlib.MAX_WBITS,
+ zlib.DEF_MEM_LEVEL,
+ 0)
+ self._write_mtime = mtime
+ self._buffer_size = _WRITE_BUFFER_SIZE
+ self._buffer = io.BufferedWriter(_WriteBufferStream(self),
+ buffer_size=self._buffer_size)
+ else:
+ raise ValueError("Invalid mode: {!r}".format(mode))
+
+ self.fileobj = fileobj
- if self.mode == WRITE:
- self._write_gzip_header(compresslevel)
+ if self.mode == WRITE:
+ self._write_gzip_header(compresslevel)
+ except:
+ # Avoid a ResourceWarning if the write fails,
+ # eg read-only file or KeyboardInterrupt
+ self._close()
+ raise
@property
def mtime(self):
elif self.mode == READ:
self._buffer.close()
finally:
- self.fileobj = None
- myfileobj = self.myfileobj
- if myfileobj:
- self.myfileobj = None
- myfileobj.close()
+ self._close()
+
+ def _close(self):
+ self.fileobj = None
+ myfileobj = self.myfileobj
+ if myfileobj is not None:
+ self.myfileobj = None
+ myfileobj.close()
def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
self._check_not_closed()
raise exctype
f = BadFile()
- with self.assertRaises(exctype):
- tar = tarfile.open(tmpname, self.mode, fileobj=f,
- format=tarfile.PAX_FORMAT,
- pax_headers={'non': 'empty'})
+ with (
+ warnings_helper.check_no_resource_warning(self),
+ self.assertRaises(exctype),
+ ):
+ tarfile.open(tmpname, self.mode, fileobj=f,
+ format=tarfile.PAX_FORMAT,
+ pax_headers={'non': 'empty'})
self.assertFalse(f.closed)