]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-145607: Ensure BIG_DATA has two compressed blocks in test_bz2 (#145730)
authorEmma Smith <emma@emmatyping.dev>
Tue, 10 Mar 2026 09:21:57 +0000 (02:21 -0700)
committerGitHub <noreply@github.com>
Tue, 10 Mar 2026 09:21:57 +0000 (11:21 +0200)
Lib/test/test_bz2.py

index 3b7897b8a88a45447463d650058d366ba4e6435e..d8e3b671ec229f9e90f5cc6b3435e7a3c2b22369 100644 (file)
@@ -66,18 +66,28 @@ class BaseTest(unittest.TestCase):
     EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
     BAD_DATA = b'this is not a valid bzip2 file'
 
-    # Some tests need more than one block of uncompressed data. Since one block
-    # is at least 100,000 bytes, we gather some data dynamically and compress it.
-    # Note that this assumes that compression works correctly, so we cannot
-    # simply use the bigger test data for all tests.
+    # Some tests need more than one block of data. The bz2 module does not
+    # support flushing a block during compression, so we must read in data until
+    # there are at least 2 blocks. Since different orderings of Python files may
+    # be compressed differently, we need to check the compression output for
+    # more than one bzip2 block header magic, a hex encoding of Pi
+    # (0x314159265359)
+    bz2_block_magic = bytes.fromhex('314159265359')
     test_size = 0
-    BIG_TEXT = bytearray(128*1024)
+    BIG_TEXT = b''
+    BIG_DATA = b''
+    compressor = BZ2Compressor(1)
     for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
         with open(fname, 'rb') as fh:
-            test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
-        if test_size > 128*1024:
+            data = fh.read()
+            BIG_DATA += compressor.compress(data)
+            BIG_TEXT += data
+        # TODO(emmatyping): if it is impossible for a block header to cross
+        # multiple outputs, we can just search the output of each compress call
+        # which should be more efficient
+        if BIG_DATA.count(bz2_block_magic) > 1:
+            BIG_DATA += compressor.flush()
             break
-    BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
 
     def setUp(self):
         fd, self.filename = tempfile.mkstemp()