]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add manual-mode parameter to isctest.kasp
authorMatthijs Mekking <matthijs@isc.org>
Thu, 24 Jul 2025 09:09:01 +0000 (11:09 +0200)
committerMatthijs Mekking <matthijs@isc.org>
Thu, 21 Aug 2025 14:09:55 +0000 (16:09 +0200)
Key state transitions may be blocked by manual-mode, meaning key
timing metadata may not be respected and can be inaccurate. For these
tests use the state values to determine whether the DNSKEY/CDS/CDNSKEY
RRset must be published or not.

bin/tests/system/isctest/kasp.py

index 3419ffefec4137c2cc90d937e858171c6bd72c2d..03dedd9aa96e110bb9eb572d038c4e5b210998ec 100644 (file)
@@ -829,7 +829,13 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None):
 
 
 def _check_signatures(
-    signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False, smooth=False
+    signatures,
+    covers,
+    fqdn,
+    keys,
+    offline_ksk=False,
+    zsk_missing=False,
+    smooth=False,
 ):
     numsigs = 0
     zrrsig = True
@@ -917,7 +923,7 @@ def check_signatures(
     assert numsigs == len(signatures)
 
 
-def _check_dnskeys(dnskeys, keys, cdnskey=False):
+def _check_dnskeys(dnskeys, keys, cdnskey=False, manual_mode=False):
     now = KeyTimingMetadata.now()
     numkeys = 0
 
@@ -928,10 +934,21 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False):
         delete_md = f"Sync{delete_md}"
 
     for key in keys:
-        publish = key.get_timing(publish_md, must_exist=False)
-        delete = key.get_timing(delete_md, must_exist=False)
-        published = publish is not None and now >= publish
-        removed = delete is not None and delete <= now
+        if manual_mode:
+            # State transitions may be blocked, preset key timings may not
+            # be accurate. Use the state values to determine whether the
+            # CDS must be published or not.
+            if cdnskey:
+                md = key.get_metadata("DSState")
+            else:
+                md = key.get_metadata("DNSKEYState")
+            published = md in ["omnipresent", "rumoured"]
+            removed = not published
+        else:
+            publish = key.get_timing(publish_md, must_exist=False)
+            delete = key.get_timing(delete_md, must_exist=False)
+            published = publish is not None and now >= publish
+            removed = delete is not None and delete <= now
 
         if not published or removed:
             for dnskey in dnskeys:
@@ -954,7 +971,7 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False):
     return numkeys
 
 
-def check_dnskeys(rrset, ksks, zsks, cdnskey=False):
+def check_dnskeys(rrset, ksks, zsks, cdnskey=False, manual_mode=False):
     # Check if the correct DNSKEY records are published. If the current time
     # is between the timing metadata 'publish' and 'delete', the key must have
     # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
@@ -969,14 +986,14 @@ def check_dnskeys(rrset, ksks, zsks, cdnskey=False):
             dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
             dnskeys.append(dnskey)
 
-    numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey)
+    numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey, manual_mode=manual_mode)
     if not cdnskey:
-        numkeys += _check_dnskeys(dnskeys, zsks)
+        numkeys += _check_dnskeys(dnskeys, zsks, manual_mode=manual_mode)
 
     assert numkeys == len(dnskeys)
 
 
-def check_cds(cdss, keys, alg):
+def check_cds(cdss, keys, alg, manual_mode=False):
     # Check if the correct CDS records are published. If the current time
     # is between the timing metadata 'publish' and 'delete', the key must have
     # a CDS record published.
@@ -986,10 +1003,19 @@ def check_cds(cdss, keys, alg):
     for key in keys:
         assert key.is_ksk()
 
-        publish = key.get_timing("SyncPublish")
-        delete = key.get_timing("SyncDelete", must_exist=False)
-        published = now >= publish
-        removed = delete is not None and delete <= now
+        if manual_mode:
+            # State transitions may be blocked, preset key timings may not
+            # be accurate. Use the state values to determine whether the
+            # CDS must be published or not.
+            md = key.get_metadata("DSState")
+            published = md in ["omnipresent", "rumoured"]
+            removed = not published
+        else:
+            publish = key.get_timing("SyncPublish")
+            delete = key.get_timing("SyncDelete", must_exist=False)
+            published = now >= publish
+            removed = delete is not None and delete <= now
+
         if not published or removed:
             for cds in cdss:
                 assert not key.cds_equals(cds, alg)
@@ -1069,6 +1095,7 @@ def check_apex(
     zsks,
     cdss=None,
     cds_delete=False,
+    manual_mode=False,
     offline_ksk=False,
     zsk_missing=False,
     tsig=None,
@@ -1082,7 +1109,7 @@ def check_apex(
 
     # test dnskey query
     dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
-    check_dnskeys(dnskeys, ksks, zsks)
+    check_dnskeys(dnskeys, ksks, zsks, manual_mode=manual_mode)
     check_signatures(
         rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
     )
@@ -1108,7 +1135,7 @@ def check_apex(
         check_cdsdelete(cdnskeys, "0 3 0 AA==")
     else:
         if "CDNSKEY" in cdss:
-            check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
+            check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True, manual_mode=manual_mode)
         else:
             assert len(cdnskeys) == 0
 
@@ -1136,7 +1163,7 @@ def check_apex(
 
         for alg in ["SHA-256", "SHA-384"]:
             if f"CDS ({alg})" in cdss:
-                numcds += check_cds(cdsrrs, ksks, alg)
+                numcds += check_cds(cdsrrs, ksks, alg, manual_mode=manual_mode)
             else:
                 check_cds_prohibit(cdsrrs, ksks, alg)
 
@@ -1185,6 +1212,7 @@ def check_rollover_step(server, config, policy, step):
     cds_delete = step.get("cds-delete", False)
     check_keytimes_flag = step.get("check-keytimes", True)
     zone_signed = step.get("zone-signed", True)
+    manual_mode = step.get("manual-mode", False)
 
     isctest.log.info(f"check rollover step {zone}")
 
@@ -1247,7 +1275,15 @@ def check_rollover_step(server, config, policy, step):
         check_keytimes(keys, expected)
 
     check_dnssecstatus(server, zone, keys, policy=policy)
-    check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
+    check_apex(
+        server,
+        zone,
+        ksks,
+        zsks,
+        cdss=cdss,
+        cds_delete=cds_delete,
+        manual_mode=manual_mode,
+    )
     check_subdomain(server, zone, ksks, zsks, smooth=smooth)
 
     def check_next_key_event():