)
return value
+ def get_signing_state(self, offline_ksk=False, zsk_missing=False) -> (bool, bool):
+ """
+ This returns the signing state derived from the key states, KRRSIGState
+ and ZRRSIGState.
+
+ If 'offline_ksk' is set to True, we determine the signing state from
+ the timing metadata. If 'zsigning' is True, ensure the current time is
+ between the Active and Retired timing metadata.
+
+ If 'zsk_missing' is set to True, it means the ZSK private key file is
+ missing, and the KSK should take over signing the RRset, and the
+ expected zone signing state (zsigning) is reversed.
+ """
+ # Fetch key timing metadata.
+ now = KeyTimingMetadata.now()
+ activate = self.get_timing("Activate")
+ inactive = self.get_timing("Inactive", must_exist=False)
+
+ active = now >= activate
+ retired = inactive is not None and inactive <= now
+ signing = active and not retired
+
+ # Fetch key state metadata.
+ krrsigstate = self.get_metadata("KRRSIGState", must_exist=False)
+ ksigning = krrsigstate in ["rumoured", "omnipresent"]
+ zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False)
+ zsigning = zrrsigstate in ["rumoured", "omnipresent"]
+
+ if ksigning:
+ assert self.is_ksk()
+ if zsigning:
+ assert self.is_zsk()
+
+ # If the ZSK private key file is missing, revers the zone signing state.
+ if zsk_missing:
+ zsigning = not zsigning
+
+ # If testing offline KSK, retrieve the signing state from the key timing
+ # metadata.
+ if offline_ksk and signing and self.is_zsk():
+ assert zsigning
+ if offline_ksk and signing and self.is_ksk():
+ ksigning = signing
+
+ return ksigning, zsigning
+
def ttl(self) -> int:
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
assert f"key: {key.tag}" in response
-def _check_signatures(signatures, covers, fqdn, keys):
- now = KeyTimingMetadata.now()
+def _check_signatures(
+ signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False
+):
numsigs = 0
zrrsig = True
if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]:
krrsig = not zrrsig
for key in keys:
- activate = key.get_timing("Activate")
- inactive = key.get_timing("Inactive", must_exist=False)
+ ksigning, zsigning = key.get_signing_state(
+ offline_ksk=offline_ksk, zsk_missing=zsk_missing
+ )
- active = now >= activate
- retired = inactive is not None and inactive <= now
- signing = active and not retired
alg = key.get_metadata("Algorithm")
rtype = dns.rdatatype.to_text(covers)
expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}"
- if not signing:
- for rrsig in signatures:
- assert re.search(expect, rrsig) is None
- continue
-
- if zrrsig and key.is_zsk():
+ if zrrsig and zsigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
- if zrrsig and not key.is_zsk():
+ if zrrsig and not zsigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
- if krrsig and key.is_ksk():
+ if krrsig and ksigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
- if krrsig and not key.is_ksk():
+ if krrsig and not ksigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
return numsigs
-def check_signatures(rrset, covers, fqdn, ksks, zsks):
+def check_signatures(
+ rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False
+):
# Check if signatures with covering type are signed with the right keys.
# The right keys are the ones that expect a signature and have the
# correct role.
rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
signatures.append(rrsig)
- numsigs += _check_signatures(signatures, covers, fqdn, ksks)
- numsigs += _check_signatures(signatures, covers, fqdn, zsks)
+ numsigs += _check_signatures(
+ signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
+ )
+ numsigs += _check_signatures(
+ signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
+ )
assert numsigs == len(signatures)
return rrs, rrsigs
-def check_apex(server, zone, ksks, zsks, tsig=None):
+def check_apex(
+ server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None
+):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
check_dnskeys(dnskeys, ksks, zsks)
- check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
+ check_signatures(
+ rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
+ )
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
- check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
+ check_signatures(
+ rrsigs,
+ dns.rdatatype.SOA,
+ fqdn,
+ ksks,
+ zsks,
+ offline_ksk=offline_ksk,
+ zsk_missing=zsk_missing,
+ )
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
- check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
+ check_signatures(
+ rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
+ )
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
- check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
+ check_signatures(
+ rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk
+ )
-def check_subdomain(server, zone, ksks, zsks, tsig=None):
+def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
else:
assert match in rrset.to_text()
- check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
+ check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk)
def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):
# - check keys
check_keys(overlapping_zsks, lifetime, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
def test_ksr_lastbundle(servers):
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
# check that last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
# check that no last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
# - check keys
check_keys(zsks, lifetime, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_twotone(servers):
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_kskroll(servers):
# - check keys
check_keys(zsks, None, with_state=True)
# - check apex
- isctest.kasp.check_apex(ns1, zone, ksks, zsks)
+ isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
- isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
+ isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)