]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Don't depend on keys being sorted
authorMark Andrews <marka@isc.org>
Tue, 29 Apr 2025 06:59:15 +0000 (16:59 +1000)
committerMark Andrews <marka@isc.org>
Thu, 1 May 2025 23:44:34 +0000 (09:44 +1000)
Extract each section of the bundle and check that the expected
records are there.  The old code was assuming that the records in
each section where in a particular order which didn't happen in
practice.

bin/tests/system/ksr/tests_ksr.py

index ae020086f9b113dc6e0f3c8f7a5107824ae49b1d..15ccc02b28a68fd681a72c94a5ed97687159d8d6 100644 (file)
@@ -212,6 +212,54 @@ def check_keys(
         num += 1
 
 
+def check_key_bundle(bundle_keys, bundle_lines, cdnskey=False):
+    count = 0
+    for key in bundle_keys:
+        found = False
+        for line in bundle_lines:
+            if key.dnskey_equals(line, cdnskey):
+                found = True
+                count += 1
+        assert found
+
+    assert count == len(bundle_keys)
+    assert count == len(bundle_lines)
+
+
+def check_cds_bundle(bundle_keys, bundle_lines, expected_cds):
+    count = 0
+    for key in bundle_keys:
+        found = False
+        # the cds of this ksk must be in the ksr
+        for line in bundle_lines:
+            for alg in expected_cds:
+                if key.cds_equals(line, alg.strip()):
+                    found = True
+                    count += 1
+        assert found
+
+    assert count == len(expected_cds) * len(bundle_keys)
+    assert count == len(bundle_lines)
+
+
+def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart):
+    count = 0
+    for key in bundle_keys:
+        found = False
+        alg = key.get_metadata("Algorithm")
+        expect = f"{zone}. 3600 IN RRSIG {rrtype} {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
+        # there must be a signature of this ksk
+        for line in bundle_lines:
+            rrsig = " ".join(line.split())
+            if expect in rrsig:
+                found = True
+                count += 1
+        assert found
+
+    assert count == len(bundle_keys)
+    assert count == len(bundle_lines)
+
+
 def check_keysigningrequest(out, zsks, start, end):
     lines = out.split("\n")
     line_no = 0
@@ -222,8 +270,10 @@ def check_keysigningrequest(out, zsks, start, end):
         # expect bundle header
         assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
         line_no += 1
+        bundle_keys = []
+        bundle_lines = []
         # expect zsks
-        for key in sorted(zsks):
+        for key in zsks:
             published = key.get_timing("Publish")
             if between(published, inception, next_bundle):
                 next_bundle = published
@@ -237,10 +287,14 @@ def check_keysigningrequest(out, zsks, start, end):
             if removed is not None and inception >= removed:
                 continue
 
-            # this zsk must be in the ksr
-            assert key.dnskey_equals(lines[line_no])
+            # collect keys that should be in this bundle
+            # collect lines that should be in this bundle
+            bundle_keys.append(key)
+            bundle_lines.append(lines[line_no])
             line_no += 1
 
+        check_key_bundle(bundle_keys, bundle_lines)
+
         inception = next_bundle
 
     # ksr footer
@@ -298,7 +352,9 @@ def check_signedkeyresponse(
         line_no += 1
 
         # expect ksks
-        for key in sorted(ksks):
+        bundle_keys = []
+        bundle_lines = []
+        for key in ksks:
             published = key.get_timing("Publish")
             if between(published, inception, next_bundle):
                 next_bundle = published
@@ -313,12 +369,18 @@ def check_signedkeyresponse(
             if between(removed, inception, next_bundle):
                 next_bundle = removed
 
-            # this ksk must be in the ksr
-            assert key.dnskey_equals(lines[line_no])
+            # collect keys that should be in this bundle
+            # collect lines that should be in this bundle
+            bundle_keys.append(key)
+            bundle_lines.append(lines[line_no])
             line_no += 1
 
+        check_key_bundle(bundle_keys, bundle_lines)
+
         # expect zsks
-        for key in sorted(zsks):
+        bundle_keys = []
+        bundle_lines = []
+        for key in zsks:
             published = key.get_timing("Publish")
             if between(published, inception, next_bundle):
                 next_bundle = published
@@ -332,12 +394,18 @@ def check_signedkeyresponse(
             if removed is not None and inception >= removed:
                 continue
 
-            # this zsk must be in the ksr
-            assert key.dnskey_equals(lines[line_no])
+            # collect keys that should be in this bundle
+            # collect lines that should be in this bundle
+            bundle_keys.append(key)
+            bundle_lines.append(lines[line_no])
             line_no += 1
 
+        check_key_bundle(bundle_keys, bundle_lines)
+
         # expect rrsig(dnskey)
-        for key in sorted(ksks):
+        bundle_keys = []
+        bundle_lines = []
+        for key in ksks:
             active = key.get_timing("Activate")
             inactive = key.get_timing("Inactive", must_exist=False)
             if active > inception:
@@ -345,17 +413,20 @@ def check_signedkeyresponse(
             if inactive is not None and inception >= inactive:
                 continue
 
-            # there must be a signature of this ksk
-            alg = key.get_metadata("Algorithm")
-            expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
-            rrsig = " ".join(lines[line_no].split())
-            assert expect in rrsig
+            # collect keys that should be in this bundle
+            # collect lines that should be in this bundle
+            bundle_keys.append(key)
+            bundle_lines.append(lines[line_no])
             line_no += 1
 
+        check_rrsig_bundle(bundle_keys, bundle_lines, zone, "DNSKEY", sigend, sigstart)
+
         # expect cdnskey
         have_cdnskey = False
         if cdnskey:
-            for key in sorted(ksks):
+            bundle_keys = []
+            bundle_lines = []
+            for key in ksks:
                 published = key.get_timing("SyncPublish")
                 if between(published, inception, next_bundle):
                     next_bundle = published
@@ -369,14 +440,20 @@ def check_signedkeyresponse(
                 if removed is not None and inception >= removed:
                     continue
 
-                # the cdnskey of this ksk must be in the ksr
-                assert key.dnskey_equals(lines[line_no], cdnskey=True)
+                # collect keys that should be in this bundle
+                # collect lines that should be in this bundle
+                bundle_keys.append(key)
+                bundle_lines.append(lines[line_no])
                 line_no += 1
                 have_cdnskey = True
 
+            check_key_bundle(bundle_keys, bundle_lines, cdnskey=True)
+
         if have_cdnskey:
             # expect rrsig(cdnskey)
-            for key in sorted(ksks):
+            bundle_keys = []
+            bundle_lines = []
+            for key in ksks:
                 active = key.get_timing("Activate")
                 inactive = key.get_timing("Inactive", must_exist=False)
                 if active > inception:
@@ -384,17 +461,23 @@ def check_signedkeyresponse(
                 if inactive is not None and inception >= inactive:
                     continue
 
-                # there must be a signature of this ksk
-                alg = key.get_metadata("Algorithm")
-                expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
-                rrsig = " ".join(lines[line_no].split())
-                assert expect in rrsig
+                # collect keys that should be in this bundle
+                # collect lines that should be in this bundle
+                bundle_keys.append(key)
+                bundle_lines.append(lines[line_no])
                 line_no += 1
 
+            check_rrsig_bundle(
+                bundle_keys, bundle_lines, zone, "CDNSKEY", sigend, sigstart
+            )
+
         # expect cds
         have_cds = False
         if cds != "":
-            for key in sorted(ksks):
+            bundle_keys = []
+            bundle_lines = []
+            expected_cds = cds.split(",")
+            for key in ksks:
                 published = key.get_timing("SyncPublish")
                 if between(published, inception, next_bundle):
                     next_bundle = published
@@ -408,16 +491,22 @@ def check_signedkeyresponse(
                 if removed is not None and inception >= removed:
                     continue
 
-                # the cds of this ksk must be in the ksr
-                expected_cds = cds.split(",")
-                for alg in expected_cds:
-                    assert key.cds_equals(lines[line_no], alg.strip())
+                # collect keys that should be in this bundle
+                # collect lines that should be in this bundle
+                bundle_keys.append(key)
+                # pylint: disable=unused-variable
+                for _arg in expected_cds:
+                    bundle_lines.append(lines[line_no])
                     line_no += 1
                     have_cds = True
 
+            check_cds_bundle(bundle_keys, bundle_lines, expected_cds)
+
         if have_cds:
             # expect rrsig(cds)
-            for key in sorted(ksks):
+            bundle_keys = []
+            bundle_lines = []
+            for key in ksks:
                 active = key.get_timing("Activate")
                 inactive = key.get_timing("Inactive", must_exist=False)
                 if active > inception:
@@ -425,13 +514,14 @@ def check_signedkeyresponse(
                 if inactive is not None and inception >= inactive:
                     continue
 
-                # there must be a signature of this ksk
-                alg = key.get_metadata("Algorithm")
-                expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
-                rrsig = " ".join(lines[line_no].split())
-                assert expect in rrsig
+                # collect keys that should be in this bundle
+                # collect lines that should be in this bundle
+                bundle_keys.append(key)
+                bundle_lines.append(lines[line_no])
                 line_no += 1
 
+            check_rrsig_bundle(bundle_keys, bundle_lines, zone, "CDS", sigend, sigstart)
+
         inception = next_bundle
 
     # skr footer