From: Nicki Křížek Date: Mon, 2 Jun 2025 15:20:06 +0000 (+0200) Subject: Move shared test code into isctest.kasp module X-Git-Tag: v9.21.11~38^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b4107103549868c5eeb228278cfe6d010d6d5e8c;p=thirdparty%2Fbind9.git Move shared test code into isctest.kasp module Move key calculations and rollover step checks into the shared isctest.kasp module. Deduplicate the key interval calculations. --- diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 6a5e8ecde69..b846786c03c 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -49,6 +49,55 @@ def _query(server, qname, qtype, tsig=None): return response +def Ipub(config): + return ( + config["dnskey-ttl"] + + config["zone-propagation-delay"] + + config["publish-safety"] + ) + + +def IpubC(config, rollover=True): + ttl1 = config["dnskey-ttl"] + config["publish-safety"] + ttl2 = timedelta(0) + + if not rollover: + # If this is the first key, we also need to wait until the zone + # signatures are omnipresent. Use max-zone-ttl instead of + # dnskey-ttl, and no publish-safety (because we are looking at + # signatures here, not the public key). + ttl2 = config["max-zone-ttl"] + + return config["zone-propagation-delay"] + max(ttl1, ttl2) + + +def Iret(config, zsk=True, ksk=False, rollover=True): + sign_delay = timedelta(0) + safety_interval = timedelta(0) + if rollover: + sign_delay = config["signatures-validity"] - config["signatures-refresh"] + safety_interval = config["retire-safety"] + + iretKSK = timedelta(0) + if ksk: + # KSK: Double-KSK Method: Iret = DprpP + TTLds + iretKSK = ( + config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval + ) + + iretZSK = timedelta(0) + if zsk: + # ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig + iretZSK = ( + sign_delay + + config["zone-propagation-delay"] + + config["max-zone-ttl"] + + safety_interval + ) + + return max(iretKSK, iretZSK) + + @total_ordering class KeyTimingMetadata: """ @@ -174,12 +223,7 @@ class KeyProperties: ipub = timedelta(0) if self.key.get_metadata("Predecessor", must_exist=False) != "undefined": - # Ipub = Dprp + TTLkey - ipub = ( - config["dnskey-ttl"] - + config["zone-propagation-delay"] - + config["publish-safety"] - ) + ipub = Ipub(config) self.timing["Active"] = self.timing["Published"] + ipub @@ -187,18 +231,8 @@ class KeyProperties: if not self.key.is_ksk(): return - ttl1 = config["dnskey-ttl"] + config["publish-safety"] - ttl2 = timedelta(0) - - if self.key.get_metadata("Predecessor", must_exist=False) == "undefined": - # If this is the first key, we also need to wait until the zone - # signatures are omnipresent. Use max-zone-ttl instead of - # dnskey-ttl, and no publish-safety (because we are looking at - # signatures here, not the public key). - ttl2 = config["max-zone-ttl"] - - # IpubC = DprpC + TTLkey - ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2) + rollover = self.key.get_metadata("Predecessor", must_exist=False) != "undefined" + ipubc = IpubC(config, rollover) self.timing["PublishCDS"] = self.timing["Published"] + ipubc @@ -211,26 +245,8 @@ class KeyProperties: if self.metadata["Lifetime"] == 0: return - sign_delay = config["signatures-validity"] - config["signatures-refresh"] - safety_interval = config["retire-safety"] - - iretKSK = timedelta(0) - iretZSK = timedelta(0) - if self.key.is_ksk(): - # Iret = DprpP + TTLds - iretKSK = ( - config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval - ) - if self.key.is_zsk(): - # Iret = Dsgn + Dprp + TTLsig - iretZSK = ( - sign_delay - + config["zone-propagation-delay"] - + config["max-zone-ttl"] - + safety_interval - ) - - self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK) + iret = Iret(config, zsk=self.key.is_zsk(), ksk=self.key.is_ksk()) + self.timing["Removed"] = self.timing["Retired"] + iret def set_expected_keytimes(self, config, offset=None, pregenerated=False): if self.key is None: @@ -1149,6 +1165,88 @@ def check_subdomain( ) +def check_rollover_step(server, config, policy, step): + zone = step["zone"] + keyprops = step["keyprops"] + nextev = step["nextev"] + cdss = step.get("cdss", None) + keyrelationships = step.get("keyrelationships", None) + smooth = step.get("smooth", False) + ds_swap = step.get("ds-swap", True) + cds_delete = step.get("cds-delete", False) + check_keytimes_flag = step.get("check-keytimes", True) + zone_signed = step.get("zone-signed", True) + + isctest.log.info(f"check rollover step {zone}") + + if zone_signed: + check_dnssec_verify(server, zone) + + ttl = int(config["dnskey-ttl"].total_seconds()) + expected = policy_to_properties(ttl, keyprops) + keys = keydir_to_keylist(zone, server.identifier) + ksks = [k for k in keys if k.is_ksk()] + zsks = [k for k in keys if not k.is_ksk()] + check_keys(zone, keys, expected) + + for kp in expected: + key = kp.key + + # Set expected key timing metadata. + kp.set_expected_keytimes(config) + + # Set rollover relationships. + if keyrelationships is not None: + prd = keyrelationships[0] + suc = keyrelationships[1] + expected[prd].metadata["Successor"] = expected[suc].key.tag + expected[suc].metadata["Predecessor"] = expected[prd].key.tag + check_keyrelationships(keys, expected) + + # Policy changes may retire keys, set expected timing metadata. + if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing: + retired = kp.key.get_timing("Inactive") + kp.timing["Retired"] = retired + kp.timing["Removed"] = retired + Iret( + config, zsk=key.is_zsk(), ksk=key.is_ksk() + ) + + # Check that CDS publication/withdrawal is logged. + if "KSK" not in kp.metadata: + continue + if kp.metadata["KSK"] == "no": + continue + + if ds_swap and kp.metadata["DSState"] == "rumoured": + assert cdss is not None + for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]: + if algstr in cdss: + check_cdslog(server, zone, key, algstr) + else: + check_cdslog_prohibit(server, zone, key, algstr) + + # The DS can be introduced. We ignore any parent registration delay, + # so set the DS publish time to now. + server.rndc(f"dnssec -checkds -key {key.tag} published {zone}") + + if ds_swap and kp.metadata["DSState"] == "unretentive": + # The DS can be withdrawn. We ignore any parent registration + # delay, so set the DS withdraw time to now. + server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}") + + if check_keytimes_flag: + check_keytimes(keys, expected) + + check_dnssecstatus(server, zone, keys, policy=policy) + check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete) + check_subdomain(server, zone, ksks, zsks, smooth=smooth) + + def check_next_key_event(): + return next_key_event_equals(server, zone, nextev) + + isctest.run.retry_with_timeout(check_next_key_event, timeout=5) + + def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None): """ Test an RRset below the apex and verify it is updated and signed correctly. diff --git a/bin/tests/system/rollover/tests_rollover.py b/bin/tests/system/rollover/tests_rollover.py index c7d4bed6ef7..ce81bf15cf8 100644 --- a/bin/tests/system/rollover/tests_rollover.py +++ b/bin/tests/system/rollover/tests_rollover.py @@ -19,7 +19,7 @@ pytest.importorskip("dns", minversion="2.0.0") import dns.update import isctest -from isctest.kasp import KeyTimingMetadata +from isctest.kasp import KeyTimingMetadata, Ipub, IpubC, Iret pytestmark = pytest.mark.extra_artifacts( [ @@ -46,52 +46,6 @@ pytestmark = pytest.mark.extra_artifacts( ) -def Ipub(config): - return ( - config["dnskey-ttl"] - + config["zone-propagation-delay"] - + config["publish-safety"] - ) - - -def IpubC(config, rollover=True): - if rollover: - ttl = config["dnskey-ttl"] - safety_interval = config["publish-safety"] - else: - ttl = config["max-zone-ttl"] - safety_interval = timedelta(0) - - return ttl + config["zone-propagation-delay"] + safety_interval - - -def Iret(config, zsk=True, ksk=False, rollover=True): - sign_delay = timedelta(0) - safety_interval = timedelta(0) - if rollover: - sign_delay = config["signatures-validity"] - config["signatures-refresh"] - safety_interval = config["retire-safety"] - - iretKSK = timedelta(0) - if ksk: - # KSK: Double-KSK Method: Iret = DprpP + TTLds - iretKSK = ( - config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval - ) - - iretZSK = timedelta(0) - if zsk: - # ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig - iretZSK = ( - sign_delay - + config["zone-propagation-delay"] - + config["max-zone-ttl"] - + safety_interval - ) - - return max(iretKSK, iretZSK) - - def test_rollover_manual(servers): server = servers["ns3"] policy = "manual-rollover" @@ -378,88 +332,6 @@ def test_rollover_multisigner(servers): isctest.kasp.check_subdomain(server, zone, ksks, zsks) -def check_rollover_step(server, config, policy, step): - zone = step["zone"] - keyprops = step["keyprops"] - nextev = step["nextev"] - cdss = step.get("cdss", None) - keyrelationships = step.get("keyrelationships", None) - smooth = step.get("smooth", False) - ds_swap = step.get("ds-swap", True) - cds_delete = step.get("cds-delete", False) - check_keytimes = step.get("check-keytimes", True) - zone_signed = step.get("zone-signed", True) - - isctest.log.info(f"check rollover step {zone}") - - if zone_signed: - isctest.kasp.check_dnssec_verify(server, zone) - - ttl = int(config["dnskey-ttl"].total_seconds()) - expected = isctest.kasp.policy_to_properties(ttl, keyprops) - keys = isctest.kasp.keydir_to_keylist(zone, server.identifier) - ksks = [k for k in keys if k.is_ksk()] - zsks = [k for k in keys if not k.is_ksk()] - isctest.kasp.check_keys(zone, keys, expected) - - for kp in expected: - key = kp.key - - # Set expected key timing metadata. - kp.set_expected_keytimes(config) - - # Set rollover relationships. - if keyrelationships is not None: - prd = keyrelationships[0] - suc = keyrelationships[1] - expected[prd].metadata["Successor"] = expected[suc].key.tag - expected[suc].metadata["Predecessor"] = expected[prd].key.tag - isctest.kasp.check_keyrelationships(keys, expected) - - # Policy changes may retire keys, set expected timing metadata. - if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing: - retired = kp.key.get_timing("Inactive") - kp.timing["Retired"] = retired - kp.timing["Removed"] = retired + Iret( - config, zsk=key.is_zsk(), ksk=key.is_ksk() - ) - - # Check that CDS publication/withdrawal is logged. - if "KSK" not in kp.metadata: - continue - if kp.metadata["KSK"] == "no": - continue - - if ds_swap and kp.metadata["DSState"] == "rumoured": - assert cdss is not None - for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]: - if algstr in cdss: - isctest.kasp.check_cdslog(server, zone, key, algstr) - else: - isctest.kasp.check_cdslog_prohibit(server, zone, key, algstr) - - # The DS can be introduced. We ignore any parent registration delay, - # so set the DS publish time to now. - server.rndc(f"dnssec -checkds -key {key.tag} published {zone}") - - if ds_swap and kp.metadata["DSState"] == "unretentive": - # The DS can be withdrawn. We ignore any parent registration - # delay, so set the DS withdraw time to now. - server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}") - - if check_keytimes: - isctest.kasp.check_keytimes(keys, expected) - - isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy) - isctest.kasp.check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete) - isctest.kasp.check_subdomain(server, zone, ksks, zsks, smooth=smooth) - - def check_next_key_event(): - return isctest.kasp.next_key_event_equals(server, zone, nextev) - - isctest.run.retry_with_timeout(check_next_key_event, timeout=5) - - def test_rollover_enable_dnssec(servers): server = servers["ns3"] policy = "enable-dnssec" @@ -546,7 +418,7 @@ def test_rollover_enable_dnssec(servers): ] for step in steps: - check_rollover_step(server, config, policy, step) + isctest.kasp.check_rollover_step(server, config, policy, step) def test_rollover_zsk_prepublication(servers): @@ -685,7 +557,7 @@ def test_rollover_zsk_prepublication(servers): ] for step in steps: - check_rollover_step(server, config, policy, step) + isctest.kasp.check_rollover_step(server, config, policy, step) def test_rollover_ksk_doubleksk(servers): @@ -835,7 +707,7 @@ def test_rollover_ksk_doubleksk(servers): ] for step in steps: - check_rollover_step(server, config, policy, step) + isctest.kasp.check_rollover_step(server, config, policy, step) # Test #2375: Scheduled rollovers are happening faster than they can finish. zone = "three-is-a-crowd.kasp" @@ -1088,7 +960,7 @@ def test_rollover_csk_roll1(servers): ] for step in steps: - check_rollover_step(server, config, policy, step) + isctest.kasp.check_rollover_step(server, config, policy, step) def test_rollover_csk_roll2(servers): @@ -1271,7 +1143,7 @@ def test_rollover_csk_roll2(servers): ] for step in steps: - check_rollover_step(server, config, policy, step) + isctest.kasp.check_rollover_step(server, config, policy, step) def test_rollover_policy_changes(servers, templates): @@ -1440,7 +1312,7 @@ def test_rollover_policy_changes(servers, templates): steps.append(step) for step in steps: - check_rollover_step(server, step["config"], step["policy"], step) + isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step) # Reconfigure, changing DNSSEC policies and other configuration options, # triggering algorithm rollovers and other dnssec-policy changes. @@ -1811,4 +1683,4 @@ def test_rollover_policy_changes(servers, templates): steps = steps + algo_steps for step in steps: - check_rollover_step(server, step["config"], step["policy"], step) + isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step)