]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Move shared test code into isctest.kasp module
authorNicki Křížek <nicki@isc.org>
Mon, 2 Jun 2025 15:20:06 +0000 (17:20 +0200)
committerNicki Křížek <nicki@isc.org>
Fri, 18 Jul 2025 11:37:58 +0000 (13:37 +0200)
Move key calculations and rollover step checks into the shared
isctest.kasp module. Deduplicate the key interval calculations.

bin/tests/system/isctest/kasp.py
bin/tests/system/rollover/tests_rollover.py

index 6a5e8ecde69451967f0aa4514a215e426f44731a..b846786c03c42ccbcd7a264537c77efaaaa2dbf8 100644 (file)
@@ -49,6 +49,55 @@ def _query(server, qname, qtype, tsig=None):
     return response
 
 
+def Ipub(config):
+    return (
+        config["dnskey-ttl"]
+        + config["zone-propagation-delay"]
+        + config["publish-safety"]
+    )
+
+
+def IpubC(config, rollover=True):
+    ttl1 = config["dnskey-ttl"] + config["publish-safety"]
+    ttl2 = timedelta(0)
+
+    if not rollover:
+        # If this is the first key, we also need to wait until the zone
+        # signatures are omnipresent. Use max-zone-ttl instead of
+        # dnskey-ttl, and no publish-safety (because we are looking at
+        # signatures here, not the public key).
+        ttl2 = config["max-zone-ttl"]
+
+    return config["zone-propagation-delay"] + max(ttl1, ttl2)
+
+
+def Iret(config, zsk=True, ksk=False, rollover=True):
+    sign_delay = timedelta(0)
+    safety_interval = timedelta(0)
+    if rollover:
+        sign_delay = config["signatures-validity"] - config["signatures-refresh"]
+        safety_interval = config["retire-safety"]
+
+    iretKSK = timedelta(0)
+    if ksk:
+        # KSK: Double-KSK Method: Iret = DprpP + TTLds
+        iretKSK = (
+            config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
+        )
+
+    iretZSK = timedelta(0)
+    if zsk:
+        # ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig
+        iretZSK = (
+            sign_delay
+            + config["zone-propagation-delay"]
+            + config["max-zone-ttl"]
+            + safety_interval
+        )
+
+    return max(iretKSK, iretZSK)
+
+
 @total_ordering
 class KeyTimingMetadata:
     """
@@ -174,12 +223,7 @@ class KeyProperties:
         ipub = timedelta(0)
 
         if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
-            # Ipub = Dprp + TTLkey
-            ipub = (
-                config["dnskey-ttl"]
-                + config["zone-propagation-delay"]
-                + config["publish-safety"]
-            )
+            ipub = Ipub(config)
 
         self.timing["Active"] = self.timing["Published"] + ipub
 
@@ -187,18 +231,8 @@ class KeyProperties:
         if not self.key.is_ksk():
             return
 
-        ttl1 = config["dnskey-ttl"] + config["publish-safety"]
-        ttl2 = timedelta(0)
-
-        if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
-            # If this is the first key, we also need to wait until the zone
-            # signatures are omnipresent. Use max-zone-ttl instead of
-            # dnskey-ttl, and no publish-safety (because we are looking at
-            # signatures here, not the public key).
-            ttl2 = config["max-zone-ttl"]
-
-        # IpubC = DprpC + TTLkey
-        ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
+        rollover = self.key.get_metadata("Predecessor", must_exist=False) != "undefined"
+        ipubc = IpubC(config, rollover)
 
         self.timing["PublishCDS"] = self.timing["Published"] + ipubc
 
@@ -211,26 +245,8 @@ class KeyProperties:
         if self.metadata["Lifetime"] == 0:
             return
 
-        sign_delay = config["signatures-validity"] - config["signatures-refresh"]
-        safety_interval = config["retire-safety"]
-
-        iretKSK = timedelta(0)
-        iretZSK = timedelta(0)
-        if self.key.is_ksk():
-            # Iret = DprpP + TTLds
-            iretKSK = (
-                config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
-            )
-        if self.key.is_zsk():
-            # Iret = Dsgn + Dprp + TTLsig
-            iretZSK = (
-                sign_delay
-                + config["zone-propagation-delay"]
-                + config["max-zone-ttl"]
-                + safety_interval
-            )
-
-        self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
+        iret = Iret(config, zsk=self.key.is_zsk(), ksk=self.key.is_ksk())
+        self.timing["Removed"] = self.timing["Retired"] + iret
 
     def set_expected_keytimes(self, config, offset=None, pregenerated=False):
         if self.key is None:
@@ -1149,6 +1165,88 @@ def check_subdomain(
     )
 
 
+def check_rollover_step(server, config, policy, step):
+    zone = step["zone"]
+    keyprops = step["keyprops"]
+    nextev = step["nextev"]
+    cdss = step.get("cdss", None)
+    keyrelationships = step.get("keyrelationships", None)
+    smooth = step.get("smooth", False)
+    ds_swap = step.get("ds-swap", True)
+    cds_delete = step.get("cds-delete", False)
+    check_keytimes_flag = step.get("check-keytimes", True)
+    zone_signed = step.get("zone-signed", True)
+
+    isctest.log.info(f"check rollover step {zone}")
+
+    if zone_signed:
+        check_dnssec_verify(server, zone)
+
+    ttl = int(config["dnskey-ttl"].total_seconds())
+    expected = policy_to_properties(ttl, keyprops)
+    keys = keydir_to_keylist(zone, server.identifier)
+    ksks = [k for k in keys if k.is_ksk()]
+    zsks = [k for k in keys if not k.is_ksk()]
+    check_keys(zone, keys, expected)
+
+    for kp in expected:
+        key = kp.key
+
+        # Set expected key timing metadata.
+        kp.set_expected_keytimes(config)
+
+        # Set rollover relationships.
+        if keyrelationships is not None:
+            prd = keyrelationships[0]
+            suc = keyrelationships[1]
+            expected[prd].metadata["Successor"] = expected[suc].key.tag
+            expected[suc].metadata["Predecessor"] = expected[prd].key.tag
+            check_keyrelationships(keys, expected)
+
+        # Policy changes may retire keys, set expected timing metadata.
+        if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing:
+            retired = kp.key.get_timing("Inactive")
+            kp.timing["Retired"] = retired
+            kp.timing["Removed"] = retired + Iret(
+                config, zsk=key.is_zsk(), ksk=key.is_ksk()
+            )
+
+        # Check that CDS publication/withdrawal is logged.
+        if "KSK" not in kp.metadata:
+            continue
+        if kp.metadata["KSK"] == "no":
+            continue
+
+        if ds_swap and kp.metadata["DSState"] == "rumoured":
+            assert cdss is not None
+            for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]:
+                if algstr in cdss:
+                    check_cdslog(server, zone, key, algstr)
+                else:
+                    check_cdslog_prohibit(server, zone, key, algstr)
+
+            # The DS can be introduced. We ignore any parent registration delay,
+            # so set the DS publish time to now.
+            server.rndc(f"dnssec -checkds -key {key.tag} published {zone}")
+
+        if ds_swap and kp.metadata["DSState"] == "unretentive":
+            # The DS can be withdrawn. We ignore any parent registration
+            # delay, so set the DS withdraw time to now.
+            server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}")
+
+    if check_keytimes_flag:
+        check_keytimes(keys, expected)
+
+    check_dnssecstatus(server, zone, keys, policy=policy)
+    check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
+    check_subdomain(server, zone, ksks, zsks, smooth=smooth)
+
+    def check_next_key_event():
+        return next_key_event_equals(server, zone, nextev)
+
+    isctest.run.retry_with_timeout(check_next_key_event, timeout=5)
+
+
 def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):
     """
     Test an RRset below the apex and verify it is updated and signed correctly.
index c7d4bed6ef7eeeadaeac94f162f7bf498e92ef00..ce81bf15cf8568a6902ff5b2ab786b4e917866db 100644 (file)
@@ -19,7 +19,7 @@ pytest.importorskip("dns", minversion="2.0.0")
 import dns.update
 
 import isctest
-from isctest.kasp import KeyTimingMetadata
+from isctest.kasp import KeyTimingMetadata, Ipub, IpubC, Iret
 
 pytestmark = pytest.mark.extra_artifacts(
     [
@@ -46,52 +46,6 @@ pytestmark = pytest.mark.extra_artifacts(
 )
 
 
-def Ipub(config):
-    return (
-        config["dnskey-ttl"]
-        + config["zone-propagation-delay"]
-        + config["publish-safety"]
-    )
-
-
-def IpubC(config, rollover=True):
-    if rollover:
-        ttl = config["dnskey-ttl"]
-        safety_interval = config["publish-safety"]
-    else:
-        ttl = config["max-zone-ttl"]
-        safety_interval = timedelta(0)
-
-    return ttl + config["zone-propagation-delay"] + safety_interval
-
-
-def Iret(config, zsk=True, ksk=False, rollover=True):
-    sign_delay = timedelta(0)
-    safety_interval = timedelta(0)
-    if rollover:
-        sign_delay = config["signatures-validity"] - config["signatures-refresh"]
-        safety_interval = config["retire-safety"]
-
-    iretKSK = timedelta(0)
-    if ksk:
-        # KSK: Double-KSK Method: Iret = DprpP + TTLds
-        iretKSK = (
-            config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
-        )
-
-    iretZSK = timedelta(0)
-    if zsk:
-        # ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig
-        iretZSK = (
-            sign_delay
-            + config["zone-propagation-delay"]
-            + config["max-zone-ttl"]
-            + safety_interval
-        )
-
-    return max(iretKSK, iretZSK)
-
-
 def test_rollover_manual(servers):
     server = servers["ns3"]
     policy = "manual-rollover"
@@ -378,88 +332,6 @@ def test_rollover_multisigner(servers):
     isctest.kasp.check_subdomain(server, zone, ksks, zsks)
 
 
-def check_rollover_step(server, config, policy, step):
-    zone = step["zone"]
-    keyprops = step["keyprops"]
-    nextev = step["nextev"]
-    cdss = step.get("cdss", None)
-    keyrelationships = step.get("keyrelationships", None)
-    smooth = step.get("smooth", False)
-    ds_swap = step.get("ds-swap", True)
-    cds_delete = step.get("cds-delete", False)
-    check_keytimes = step.get("check-keytimes", True)
-    zone_signed = step.get("zone-signed", True)
-
-    isctest.log.info(f"check rollover step {zone}")
-
-    if zone_signed:
-        isctest.kasp.check_dnssec_verify(server, zone)
-
-    ttl = int(config["dnskey-ttl"].total_seconds())
-    expected = isctest.kasp.policy_to_properties(ttl, keyprops)
-    keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
-    ksks = [k for k in keys if k.is_ksk()]
-    zsks = [k for k in keys if not k.is_ksk()]
-    isctest.kasp.check_keys(zone, keys, expected)
-
-    for kp in expected:
-        key = kp.key
-
-        # Set expected key timing metadata.
-        kp.set_expected_keytimes(config)
-
-        # Set rollover relationships.
-        if keyrelationships is not None:
-            prd = keyrelationships[0]
-            suc = keyrelationships[1]
-            expected[prd].metadata["Successor"] = expected[suc].key.tag
-            expected[suc].metadata["Predecessor"] = expected[prd].key.tag
-            isctest.kasp.check_keyrelationships(keys, expected)
-
-        # Policy changes may retire keys, set expected timing metadata.
-        if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing:
-            retired = kp.key.get_timing("Inactive")
-            kp.timing["Retired"] = retired
-            kp.timing["Removed"] = retired + Iret(
-                config, zsk=key.is_zsk(), ksk=key.is_ksk()
-            )
-
-        # Check that CDS publication/withdrawal is logged.
-        if "KSK" not in kp.metadata:
-            continue
-        if kp.metadata["KSK"] == "no":
-            continue
-
-        if ds_swap and kp.metadata["DSState"] == "rumoured":
-            assert cdss is not None
-            for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]:
-                if algstr in cdss:
-                    isctest.kasp.check_cdslog(server, zone, key, algstr)
-                else:
-                    isctest.kasp.check_cdslog_prohibit(server, zone, key, algstr)
-
-            # The DS can be introduced. We ignore any parent registration delay,
-            # so set the DS publish time to now.
-            server.rndc(f"dnssec -checkds -key {key.tag} published {zone}")
-
-        if ds_swap and kp.metadata["DSState"] == "unretentive":
-            # The DS can be withdrawn. We ignore any parent registration
-            # delay, so set the DS withdraw time to now.
-            server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}")
-
-    if check_keytimes:
-        isctest.kasp.check_keytimes(keys, expected)
-
-    isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
-    isctest.kasp.check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
-    isctest.kasp.check_subdomain(server, zone, ksks, zsks, smooth=smooth)
-
-    def check_next_key_event():
-        return isctest.kasp.next_key_event_equals(server, zone, nextev)
-
-    isctest.run.retry_with_timeout(check_next_key_event, timeout=5)
-
-
 def test_rollover_enable_dnssec(servers):
     server = servers["ns3"]
     policy = "enable-dnssec"
@@ -546,7 +418,7 @@ def test_rollover_enable_dnssec(servers):
     ]
 
     for step in steps:
-        check_rollover_step(server, config, policy, step)
+        isctest.kasp.check_rollover_step(server, config, policy, step)
 
 
 def test_rollover_zsk_prepublication(servers):
@@ -685,7 +557,7 @@ def test_rollover_zsk_prepublication(servers):
     ]
 
     for step in steps:
-        check_rollover_step(server, config, policy, step)
+        isctest.kasp.check_rollover_step(server, config, policy, step)
 
 
 def test_rollover_ksk_doubleksk(servers):
@@ -835,7 +707,7 @@ def test_rollover_ksk_doubleksk(servers):
     ]
 
     for step in steps:
-        check_rollover_step(server, config, policy, step)
+        isctest.kasp.check_rollover_step(server, config, policy, step)
 
     # Test #2375: Scheduled rollovers are happening faster than they can finish.
     zone = "three-is-a-crowd.kasp"
@@ -1088,7 +960,7 @@ def test_rollover_csk_roll1(servers):
     ]
 
     for step in steps:
-        check_rollover_step(server, config, policy, step)
+        isctest.kasp.check_rollover_step(server, config, policy, step)
 
 
 def test_rollover_csk_roll2(servers):
@@ -1271,7 +1143,7 @@ def test_rollover_csk_roll2(servers):
     ]
 
     for step in steps:
-        check_rollover_step(server, config, policy, step)
+        isctest.kasp.check_rollover_step(server, config, policy, step)
 
 
 def test_rollover_policy_changes(servers, templates):
@@ -1440,7 +1312,7 @@ def test_rollover_policy_changes(servers, templates):
     steps.append(step)
 
     for step in steps:
-        check_rollover_step(server, step["config"], step["policy"], step)
+        isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step)
 
     # Reconfigure, changing DNSSEC policies and other configuration options,
     # triggering algorithm rollovers and other dnssec-policy changes.
@@ -1811,4 +1683,4 @@ def test_rollover_policy_changes(servers, templates):
     steps = steps + algo_steps
 
     for step in steps:
-        check_rollover_step(server, step["config"], step["policy"], step)
+        isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step)