return length
- def read(self, size=-1):
- self._check_not_closed()
+ def _check_read(self, caller):
if self.mode != READ:
import errno
- raise OSError(errno.EBADF, "read() on write-only GzipFile object")
+ msg = f"{caller}() on write-only GzipFile object"
+ raise OSError(errno.EBADF, msg)
+
+ def read(self, size=-1):
+ self._check_not_closed()
+ self._check_read("read")
return self._buffer.read(size)
def read1(self, size=-1):
Reads up to a buffer's worth of data if size is negative."""
self._check_not_closed()
- if self.mode != READ:
- import errno
- raise OSError(errno.EBADF, "read1() on write-only GzipFile object")
+ self._check_read("read1")
if size < 0:
size = io.DEFAULT_BUFFER_SIZE
return self._buffer.read1(size)
+ def readinto(self, b):
+ self._check_not_closed()
+ self._check_read("readinto")
+ return self._buffer.readinto(b)
+
+ def readinto1(self, b):
+ self._check_not_closed()
+ self._check_read("readinto1")
+ return self._buffer.readinto1(b)
+
def peek(self, n):
self._check_not_closed()
- if self.mode != READ:
- import errno
- raise OSError(errno.EBADF, "peek() on write-only GzipFile object")
+ self._check_read("peek")
return self._buffer.peek(n)
@property
self.assertEqual(f.tell(), nread)
self.assertEqual(b''.join(blocks), data1 * 50)
+ def test_readinto(self):
+ # 10MB of uncompressible data to ensure multiple reads
+ large_data = os.urandom(10 * 2**20)
+ with gzip.GzipFile(self.filename, 'wb') as f:
+ f.write(large_data)
+
+ buf = bytearray(len(large_data))
+ with gzip.GzipFile(self.filename, 'r') as f:
+ nbytes = f.readinto(buf)
+ self.assertEqual(nbytes, len(large_data))
+ self.assertEqual(buf, large_data)
+
+ def test_readinto1(self):
+ # 10MB of uncompressible data to ensure multiple reads
+ large_data = os.urandom(10 * 2**20)
+ with gzip.GzipFile(self.filename, 'wb') as f:
+ f.write(large_data)
+
+ nread = 0
+ buf = bytearray(len(large_data))
+ memview = memoryview(buf) # Simplifies slicing
+ with gzip.GzipFile(self.filename, 'r') as f:
+ for count in range(200):
+ nbytes = f.readinto1(memview[nread:])
+ if not nbytes:
+ break
+ nread += nbytes
+ self.assertEqual(f.tell(), nread)
+ self.assertEqual(buf, large_data)
+ # readinto1() should require multiple loops
+ self.assertGreater(count, 1)
+
@bigmemtest(size=_4G, memuse=1)
def test_read_large(self, size):
# Read chunk size over UINT_MAX should be supported, despite zlib's