]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.13] gh-145607: Ensure BIG_DATA has two compressed blocks in test_bz2 (GH-145730...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Wed, 11 Mar 2026 04:08:06 +0000 (05:08 +0100)
committerGitHub <noreply@github.com>
Wed, 11 Mar 2026 04:08:06 +0000 (21:08 -0700)
gh-145607: Ensure BIG_DATA has two compressed blocks in test_bz2 (GH-145730)
(cherry picked from commit 19676e5fc28bdee8325a062a31ddeee60960cf75)

Co-authored-by: Emma Smith <emma@emmatyping.dev>
Lib/test/test_bz2.py

index 7d786be1d25b1c7ce04d633cb921d92d51772eb6..b5cd202a613725e90a6f2d2d3398ea01369a8961 100644 (file)
@@ -66,18 +66,28 @@ class BaseTest(unittest.TestCase):
     EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
     BAD_DATA = b'this is not a valid bzip2 file'
 
-    # Some tests need more than one block of uncompressed data. Since one block
-    # is at least 100,000 bytes, we gather some data dynamically and compress it.
-    # Note that this assumes that compression works correctly, so we cannot
-    # simply use the bigger test data for all tests.
+    # Some tests need more than one block of data. The bz2 module does not
+    # support flushing a block during compression, so we must read in data until
+    # there are at least 2 blocks. Since different orderings of Python files may
+    # be compressed differently, we need to check the compression output for
+    # more than one bzip2 block header magic, a hex encoding of Pi
+    # (0x314159265359)
+    bz2_block_magic = bytes.fromhex('314159265359')
     test_size = 0
-    BIG_TEXT = bytearray(128*1024)
+    BIG_TEXT = b''
+    BIG_DATA = b''
+    compressor = BZ2Compressor(1)
     for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
         with open(fname, 'rb') as fh:
-            test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
-        if test_size > 128*1024:
+            data = fh.read()
+            BIG_DATA += compressor.compress(data)
+            BIG_TEXT += data
+        # TODO(emmatyping): if it is impossible for a block header to cross
+        # multiple outputs, we can just search the output of each compress call
+        # which should be more efficient
+        if BIG_DATA.count(bz2_block_magic) > 1:
+            BIG_DATA += compressor.flush()
             break
-    BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
 
     def setUp(self):
         fd, self.filename = tempfile.mkstemp()