EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
BAD_DATA = b'this is not a valid bzip2 file'
- # Some tests need more than one block of uncompressed data. Since one block
- # is at least 100,000 bytes, we gather some data dynamically and compress it.
- # Note that this assumes that compression works correctly, so we cannot
- # simply use the bigger test data for all tests.
+ # Some tests need more than one block of data. The bz2 module does not
+ # support flushing a block during compression, so we must read in data until
+ # there are at least 2 blocks. Since different orderings of Python files may
+ # be compressed differently, we need to check the compression output for
+ # more than one bzip2 block header magic, a hex encoding of Pi
+ # (0x314159265359)
+ bz2_block_magic = bytes.fromhex('314159265359')
test_size = 0
- BIG_TEXT = bytearray(128*1024)
+ BIG_TEXT = b''
+ BIG_DATA = b''
+ compressor = BZ2Compressor(1)
for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
with open(fname, 'rb') as fh:
- test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
- if test_size > 128*1024:
+ data = fh.read()
+ BIG_DATA += compressor.compress(data)
+ BIG_TEXT += data
+ # TODO(emmatyping): if it is impossible for a block header to cross
+ # multiple outputs, we can just search the output of each compress call
+ # which should be more efficient
+ if BIG_DATA.count(bz2_block_magic) > 1:
+ BIG_DATA += compressor.flush()
break
- BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
def setUp(self):
fd, self.filename = tempfile.mkstemp()