+import _pyio
import array
import contextlib
import importlib.util
b"zzz", zipfile._Extra.strip(b"zzz", (self.ZIP64_EXTRA,)))
+class StatIO(_pyio.BytesIO):
+ """Buffer which remembers the number of bytes that were read."""
+
+ def __init__(self):
+ super().__init__()
+ self.bytes_read = 0
+
+ def read(self, size=-1):
+ bs = super().read(size)
+ self.bytes_read += len(bs)
+ return bs
+
+
+class StoredZipExtFileRandomReadTest(unittest.TestCase):
+ """Tests whether an uncompressed, unencrypted zip entry can be randomly
+ seek and read without reading redundant bytes."""
+ def test_stored_seek_and_read(self):
+
+ sio = StatIO()
+ # 20000 bytes
+ txt = b'0123456789' * 2000
+
+ # The seek length must be greater than ZipExtFile.MIN_READ_SIZE
+ # as `ZipExtFile._read2()` reads in blocks of this size and we
+ # need to seek out of the buffered data
+ read_buffer_size = zipfile.ZipExtFile.MIN_READ_SIZE
+ self.assertGreaterEqual(10002, read_buffer_size) # for forward seek test
+ self.assertGreaterEqual(5003, read_buffer_size) # for backward seek test
+ # The read length must be less than MIN_READ_SIZE, since we assume that
+ # only 1 block is read in the test.
+ read_length = 100
+ self.assertGreaterEqual(read_buffer_size, read_length) # for read() calls
+
+ with zipfile.ZipFile(sio, "w", compression=zipfile.ZIP_STORED) as zipf:
+ zipf.writestr("foo.txt", txt)
+
+ # check random seek and read on a file
+ with zipfile.ZipFile(sio, "r") as zipf:
+ with zipf.open("foo.txt", "r") as fp:
+ # Test this optimized read hasn't rewound and read from the
+ # start of the file (as in the case of the unoptimized path)
+
+ # forward seek
+ old_count = sio.bytes_read
+ forward_seek_len = 10002
+ current_pos = 0
+ fp.seek(forward_seek_len, os.SEEK_CUR)
+ current_pos += forward_seek_len
+ self.assertEqual(fp.tell(), current_pos)
+ self.assertEqual(fp._left, fp._compress_left)
+ arr = fp.read(read_length)
+ current_pos += read_length
+ self.assertEqual(fp.tell(), current_pos)
+ self.assertEqual(arr, txt[current_pos - read_length:current_pos])
+ self.assertEqual(fp._left, fp._compress_left)
+ read_count = sio.bytes_read - old_count
+ self.assertLessEqual(read_count, read_buffer_size)
+
+ # backward seek
+ old_count = sio.bytes_read
+ backward_seek_len = 5003
+ fp.seek(-backward_seek_len, os.SEEK_CUR)
+ current_pos -= backward_seek_len
+ self.assertEqual(fp.tell(), current_pos)
+ self.assertEqual(fp._left, fp._compress_left)
+ arr = fp.read(read_length)
+ current_pos += read_length
+ self.assertEqual(fp.tell(), current_pos)
+ self.assertEqual(arr, txt[current_pos - read_length:current_pos])
+ self.assertEqual(fp._left, fp._compress_left)
+ read_count = sio.bytes_read - old_count
+ self.assertLessEqual(read_count, read_buffer_size)
+
+ # eof flags test
+ fp.seek(0, os.SEEK_END)
+ fp.seek(12345, os.SEEK_SET)
+ current_pos = 12345
+ arr = fp.read(read_length)
+ current_pos += read_length
+ self.assertEqual(arr, txt[current_pos - read_length:current_pos])
+
+
if __name__ == "__main__":
unittest.main()