def _check_signatures(
- signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False, smooth=False
+ signatures,
+ covers,
+ fqdn,
+ keys,
+ offline_ksk=False,
+ zsk_missing=False,
+ smooth=False,
):
numsigs = 0
zrrsig = True
assert numsigs == len(signatures)
-def _check_dnskeys(dnskeys, keys, cdnskey=False):
+def _check_dnskeys(dnskeys, keys, cdnskey=False, manual_mode=False):
now = KeyTimingMetadata.now()
numkeys = 0
delete_md = f"Sync{delete_md}"
for key in keys:
- publish = key.get_timing(publish_md, must_exist=False)
- delete = key.get_timing(delete_md, must_exist=False)
- published = publish is not None and now >= publish
- removed = delete is not None and delete <= now
+ if manual_mode:
+ # State transitions may be blocked, preset key timings may not
+ # be accurate. Use the state values to determine whether the
+ # CDS must be published or not.
+ if cdnskey:
+ md = key.get_metadata("DSState")
+ else:
+ md = key.get_metadata("DNSKEYState")
+ published = md in ["omnipresent", "rumoured"]
+ removed = not published
+ else:
+ publish = key.get_timing(publish_md, must_exist=False)
+ delete = key.get_timing(delete_md, must_exist=False)
+ published = publish is not None and now >= publish
+ removed = delete is not None and delete <= now
if not published or removed:
for dnskey in dnskeys:
return numkeys
-def check_dnskeys(rrset, ksks, zsks, cdnskey=False):
+def check_dnskeys(rrset, ksks, zsks, cdnskey=False, manual_mode=False):
# Check if the correct DNSKEY records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
dnskeys.append(dnskey)
- numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey)
+ numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey, manual_mode=manual_mode)
if not cdnskey:
- numkeys += _check_dnskeys(dnskeys, zsks)
+ numkeys += _check_dnskeys(dnskeys, zsks, manual_mode=manual_mode)
assert numkeys == len(dnskeys)
-def check_cds(cdss, keys, alg):
+def check_cds(cdss, keys, alg, manual_mode=False):
# Check if the correct CDS records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a CDS record published.
for key in keys:
assert key.is_ksk()
- publish = key.get_timing("SyncPublish")
- delete = key.get_timing("SyncDelete", must_exist=False)
- published = now >= publish
- removed = delete is not None and delete <= now
+ if manual_mode:
+ # State transitions may be blocked, preset key timings may not
+ # be accurate. Use the state values to determine whether the
+ # CDS must be published or not.
+ md = key.get_metadata("DSState")
+ published = md in ["omnipresent", "rumoured"]
+ removed = not published
+ else:
+ publish = key.get_timing("SyncPublish")
+ delete = key.get_timing("SyncDelete", must_exist=False)
+ published = now >= publish
+ removed = delete is not None and delete <= now
+
if not published or removed:
for cds in cdss:
assert not key.cds_equals(cds, alg)
zsks,
cdss=None,
cds_delete=False,
+ manual_mode=False,
offline_ksk=False,
zsk_missing=False,
tsig=None,
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
- check_dnskeys(dnskeys, ksks, zsks)
+ check_dnskeys(dnskeys, ksks, zsks, manual_mode=manual_mode)
check_signatures(
rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
check_cdsdelete(cdnskeys, "0 3 0 AA==")
else:
if "CDNSKEY" in cdss:
- check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
+ check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True, manual_mode=manual_mode)
else:
assert len(cdnskeys) == 0
for alg in ["SHA-256", "SHA-384"]:
if f"CDS ({alg})" in cdss:
- numcds += check_cds(cdsrrs, ksks, alg)
+ numcds += check_cds(cdsrrs, ksks, alg, manual_mode=manual_mode)
else:
check_cds_prohibit(cdsrrs, ksks, alg)
cds_delete = step.get("cds-delete", False)
check_keytimes_flag = step.get("check-keytimes", True)
zone_signed = step.get("zone-signed", True)
+ manual_mode = step.get("manual-mode", False)
isctest.log.info(f"check rollover step {zone}")
check_keytimes(keys, expected)
check_dnssecstatus(server, zone, keys, policy=policy)
- check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
+ check_apex(
+ server,
+ zone,
+ ksks,
+ zsks,
+ cdss=cdss,
+ cds_delete=cds_delete,
+ manual_mode=manual_mode,
+ )
check_subdomain(server, zone, ksks, zsks, smooth=smooth)
def check_next_key_event():