]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Update kasp check_signatures for dnssec-policy
authorMatthijs Mekking <matthijs@isc.org>
Mon, 17 Mar 2025 10:52:18 +0000 (11:52 +0100)
committerMatthijs Mekking <matthijs@isc.org>
Wed, 23 Apr 2025 15:22:04 +0000 (15:22 +0000)
The check_signatures code was initially created to be suitable for
the ksr system test, to test the Offline KSK feature. For that, a
key is expected to be signing if the current time is between
the timing metadata Active and Retired.

With dnssec-policy, the key timing metadata is indicative, the key
states determine the actual signing behavior.

Update the check_signatures function so that by default the signing
is derived from the key states (ksigning and zsigning). Add an
argument 'offline_ksk', if set the make sure that the zsigning is set
if the current time is between the Active and Retired timing metadata,
and for ksigning we just use the timing metadata (as the key is offline,
we cannot check the key states).

Another (upcoming) test case is where key files are missing. When the
ZSK private key file is missing, the KSK takes over. Add an argument
'zsk_missing', when set to True the expected zone signing (zsigning)
is reversed.

bin/tests/system/isctest/kasp.py
bin/tests/system/ksr/tests_ksr.py

index 0ea1c774ea6e92c061149eb5db85fc2686b31cc4..5b39b9fffdd43356943df3d47c2f28843ca7c109 100644 (file)
@@ -332,6 +332,52 @@ class Key:
             )
         return value
 
+    def get_signing_state(self, offline_ksk=False, zsk_missing=False) -> (bool, bool):
+        """
+        This returns the signing state derived from the key states, KRRSIGState
+        and ZRRSIGState.
+
+        If 'offline_ksk' is set to True, we determine the signing state from
+        the timing metadata. If 'zsigning' is True, ensure the current time is
+        between the Active and Retired timing metadata.
+
+        If 'zsk_missing' is set to True, it means the ZSK private key file is
+        missing, and the KSK should take over signing the RRset, and the
+        expected zone signing state (zsigning) is reversed.
+        """
+        # Fetch key timing metadata.
+        now = KeyTimingMetadata.now()
+        activate = self.get_timing("Activate")
+        inactive = self.get_timing("Inactive", must_exist=False)
+
+        active = now >= activate
+        retired = inactive is not None and inactive <= now
+        signing = active and not retired
+
+        # Fetch key state metadata.
+        krrsigstate = self.get_metadata("KRRSIGState", must_exist=False)
+        ksigning = krrsigstate in ["rumoured", "omnipresent"]
+        zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False)
+        zsigning = zrrsigstate in ["rumoured", "omnipresent"]
+
+        if ksigning:
+            assert self.is_ksk()
+        if zsigning:
+            assert self.is_zsk()
+
+        # If the ZSK private key file is missing, revers the zone signing state.
+        if zsk_missing:
+            zsigning = not zsigning
+
+        # If testing offline KSK, retrieve the signing state from the key timing
+        # metadata.
+        if offline_ksk and signing and self.is_zsk():
+            assert zsigning
+        if offline_ksk and signing and self.is_ksk():
+            ksigning = signing
+
+        return ksigning, zsigning
+
     def ttl(self) -> int:
         with open(self.keyfile, "r", encoding="utf-8") as file:
             for line in file:
@@ -772,8 +818,9 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None):
         assert f"key: {key.tag}" in response
 
 
-def _check_signatures(signatures, covers, fqdn, keys):
-    now = KeyTimingMetadata.now()
+def _check_signatures(
+    signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False
+):
     numsigs = 0
     zrrsig = True
     if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]:
@@ -781,23 +828,16 @@ def _check_signatures(signatures, covers, fqdn, keys):
     krrsig = not zrrsig
 
     for key in keys:
-        activate = key.get_timing("Activate")
-        inactive = key.get_timing("Inactive", must_exist=False)
+        ksigning, zsigning = key.get_signing_state(
+            offline_ksk=offline_ksk, zsk_missing=zsk_missing
+        )
 
-        active = now >= activate
-        retired = inactive is not None and inactive <= now
-        signing = active and not retired
         alg = key.get_metadata("Algorithm")
         rtype = dns.rdatatype.to_text(covers)
 
         expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}"
 
-        if not signing:
-            for rrsig in signatures:
-                assert re.search(expect, rrsig) is None
-            continue
-
-        if zrrsig and key.is_zsk():
+        if zrrsig and zsigning:
             has_rrsig = False
             for rrsig in signatures:
                 if re.search(expect, rrsig) is not None:
@@ -806,11 +846,11 @@ def _check_signatures(signatures, covers, fqdn, keys):
             assert has_rrsig, f"Expected signature but not found: {expect}"
             numsigs += 1
 
-        if zrrsig and not key.is_zsk():
+        if zrrsig and not zsigning:
             for rrsig in signatures:
                 assert re.search(expect, rrsig) is None
 
-        if krrsig and key.is_ksk():
+        if krrsig and ksigning:
             has_rrsig = False
             for rrsig in signatures:
                 if re.search(expect, rrsig) is not None:
@@ -819,14 +859,16 @@ def _check_signatures(signatures, covers, fqdn, keys):
             assert has_rrsig, f"Expected signature but not found: {expect}"
             numsigs += 1
 
-        if krrsig and not key.is_ksk():
+        if krrsig and not ksigning:
             for rrsig in signatures:
                 assert re.search(expect, rrsig) is None
 
     return numsigs
 
 
-def check_signatures(rrset, covers, fqdn, ksks, zsks):
+def check_signatures(
+    rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False
+):
     # Check if signatures with covering type are signed with the right keys.
     # The right keys are the ones that expect a signature and have the
     # correct role.
@@ -840,8 +882,12 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks):
             rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
             signatures.append(rrsig)
 
-    numsigs += _check_signatures(signatures, covers, fqdn, ksks)
-    numsigs += _check_signatures(signatures, covers, fqdn, zsks)
+    numsigs += _check_signatures(
+        signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
+    )
+    numsigs += _check_signatures(
+        signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
+    )
 
     assert numsigs == len(signatures)
 
@@ -965,7 +1011,9 @@ def _query_rrset(server, fqdn, qtype, tsig=None):
     return rrs, rrsigs
 
 
-def check_apex(server, zone, ksks, zsks, tsig=None):
+def check_apex(
+    server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None
+):
     # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
     # are signed correctly and with the appropriate keys.
     fqdn = f"{zone}."
@@ -973,30 +1021,44 @@ def check_apex(server, zone, ksks, zsks, tsig=None):
     # test dnskey query
     dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
     check_dnskeys(dnskeys, ksks, zsks)
-    check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
+    check_signatures(
+        rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
+    )
 
     # test soa query
     soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
     assert len(soa) == 1
     assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
-    check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
+    check_signatures(
+        rrsigs,
+        dns.rdatatype.SOA,
+        fqdn,
+        ksks,
+        zsks,
+        offline_ksk=offline_ksk,
+        zsk_missing=zsk_missing,
+    )
 
     # test cdnskey query
     cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
     check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
     if len(cdnskeys) > 0:
         assert len(rrsigs) > 0
-        check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
+        check_signatures(
+            rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
+        )
 
     # test cds query
     cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
     check_cds(cds, ksks)
     if len(cds) > 0:
         assert len(rrsigs) > 0
-        check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
+        check_signatures(
+            rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk
+        )
 
 
-def check_subdomain(server, zone, ksks, zsks, tsig=None):
+def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None):
     # Test an RRset below the apex and verify it is signed correctly.
     fqdn = f"{zone}."
     qname = f"a.{zone}."
@@ -1014,7 +1076,7 @@ def check_subdomain(server, zone, ksks, zsks, tsig=None):
         else:
             assert match in rrset.to_text()
 
-    check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
+    check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk)
 
 
 def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):
index 5512f34fa26ecdbe31e8215c04cceee69f332bd0..ae020086f9b113dc6e0f3c8f7a5107824ae49b1d 100644 (file)
@@ -673,9 +673,9 @@ def test_ksr_common(servers):
     # - check keys
     check_keys(overlapping_zsks, lifetime, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
 
 
 def test_ksr_lastbundle(servers):
@@ -748,9 +748,9 @@ def test_ksr_lastbundle(servers):
     # - check keys
     check_keys(zsks, lifetime, offset=offset, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
 
     # check that last bundle warning is logged
     warning = "last bundle in skr, please import new skr file"
@@ -828,9 +828,9 @@ def test_ksr_inthemiddle(servers):
     # - check keys
     check_keys(zsks, lifetime, offset=offset, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
 
     # check that no last bundle warning is logged
     warning = "last bundle in skr, please import new skr file"
@@ -1023,9 +1023,9 @@ def test_ksr_unlimited(servers):
     # - check keys
     check_keys(zsks, lifetime, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
 
 
 def test_ksr_twotone(servers):
@@ -1141,9 +1141,9 @@ def test_ksr_twotone(servers):
     lifetime = timedelta(days=31 * 5)
     check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
 
 
 def test_ksr_kskroll(servers):
@@ -1215,6 +1215,6 @@ def test_ksr_kskroll(servers):
     # - check keys
     check_keys(zsks, None, with_state=True)
     # - check apex
-    isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+    isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
     # - check subdomain
-    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+    isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)