]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-45509: Check gzip headers for corrupted fields (GH-29028)
authorRuben Vorderman <r.h.p.vorderman@lumc.nl>
Wed, 13 May 2026 10:20:33 +0000 (12:20 +0200)
committerGitHub <noreply@github.com>
Wed, 13 May 2026 10:20:33 +0000 (10:20 +0000)
Check the header checksum it the HCRC field is present.

Lib/gzip.py
Lib/test/test_gzip.py
Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst [new file with mode: 0644]

index 971063aa24f8712c6a68770fa28f9da68f204499..a89ebf806c857255882146450fe049876c3532e0 100644 (file)
@@ -484,40 +484,63 @@ def _read_exact(fp, n):
     return data
 
 
+def _read_until_null(fp, append_to):
+    '''Read until the first encountered null byte in fp.
+       Append to given byte array object'''
+    while True:
+        s = fp.read(1)
+        append_to += s
+        if not s or s == b'\000':
+            break
+
+
 def _read_gzip_header(fp):
     '''Read a gzip header from `fp` and progress to the end of the header.
 
     Returns last mtime if header was present or None otherwise.
     '''
     magic = fp.read(2)
-    if magic == b'':
+    if not magic:
         return None
 
     if magic != b'\037\213':
         raise BadGzipFile('Not a gzipped file (%r)' % magic)
-
-    (method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8))
+    base_header = _read_exact(fp, 8)
+    (method, flag, last_mtime) = struct.unpack("<BBIxx", base_header)
     if method != 8:
         raise BadGzipFile('Unknown compression method')
 
-    if flag & FEXTRA:
-        # Read & discard the extra field, if present
-        extra_len, = struct.unpack("<H", _read_exact(fp, 2))
-        _read_exact(fp, extra_len)
-    if flag & FNAME:
+    # Most common cases are no flags (gzip.compress, zlib.compress) or only
+    # FNAME set (GzipFile, gzip command line application). Exit early
+    # in those cases.
+    if not flag:
+        return last_mtime
+    if flag == FNAME:
         # Read and discard a null-terminated string containing the filename
         while True:
             s = fp.read(1)
             if not s or s==b'\000':
                 break
+        return last_mtime
+
+    # Processing for more complex flags. Save header parts for FHCRC checking.
+    header = bytearray(magic + base_header)
+    if flag & FEXTRA:
+        extra_len_bytes = _read_exact(fp, 2)
+        extra_len, = struct.unpack("<H", extra_len_bytes)
+        header += extra_len_bytes
+        header += _read_exact(fp, extra_len)
+    if flag & FNAME:
+        _read_until_null(fp, append_to=header)
     if flag & FCOMMENT:
-        # Read and discard a null-terminated string containing a comment
-        while True:
-            s = fp.read(1)
-            if not s or s==b'\000':
-                break
+        _read_until_null(fp, append_to=header)
     if flag & FHCRC:
-        _read_exact(fp, 2)     # Read & discard the 16-bit header CRC
+        # Header CRC is the last 16 bits of a crc32.
+        header_crc, = struct.unpack("<H", _read_exact(fp, 2))
+        true_crc = zlib.crc32(header) & 0xFFFF
+        if header_crc != true_crc:
+            raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
+                               f"match: {true_crc:04x} != {header_crc:04x}")
     return last_mtime
 
 
index 442d30fc970fa94d164f8956dd8056ce44e8386c..b3b7c8f87e4f9f9af82b6a5bc1b003ef7b966b8b 100644 (file)
@@ -795,6 +795,35 @@ class TestGzip(BaseTest):
         compressed_data = gzip.compress(data1)
         self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
 
+    def test_truncated_header(self):
+        truncated_headers = [
+            b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00",             # Missing OS byte
+            b"\x1f\x8b\x08\x02\x00\x00\x00\x00\x00\xff",         # FHRC, but no checksum
+            b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff",         # FEXTRA, but no xlen
+            b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\xaa\x00", # FEXTRA, xlen, but no data
+            b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff",         # FNAME but no fname
+            b"\x1f\x8b\x08\x10\x00\x00\x00\x00\x00\xff",         # FCOMMENT, but no fcomment
+        ]
+        for header in truncated_headers:
+            with self.subTest(header=header):
+                with self.assertRaises(EOFError):
+                    gzip.decompress(header)
+
+    def test_corrupted_gzip_header(self):
+        header = (b"\x1f\x8b\x08\x1f\x00\x00\x00\x00\x00\xff"  # All flags set
+                  b"\x05\x00"  # Xlen = 5
+                  b"extra"
+                  b"name\x00"
+                  b"comment\x00")
+        true_crc = zlib.crc32(header) & 0xFFFF
+        corrupted_crc = true_crc ^ 0xFFFF
+        corrupted_header = header + corrupted_crc.to_bytes(2, "little")
+        with self.assertRaises(gzip.BadGzipFile) as err:
+            gzip.decompress(corrupted_header)
+        self.assertEqual(str(err.exception),
+                         f"Corrupted gzip header. Checksums do not "
+                         f"match: {true_crc:04x} != {corrupted_crc:04x}")
+
     def test_read_truncated(self):
         data = data1*50
         # Drop the CRC (4 bytes) and file size (4 bytes).
diff --git a/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
new file mode 100644 (file)
index 0000000..80c38c0
--- /dev/null
@@ -0,0 +1 @@
+Gzip headers are now checked for corrupted NAME, COMMENT and HCRC fields.