def check_remove_zsk(
- server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+ server,
+ zone,
+ keys,
+ expected,
+ extra_keys,
+ extra,
+ primary=None,
+ check_fail=False,
+ update_any=False,
):
isctest.log.info("remove dnskey record:")
if check_fail:
_check_remove_zsk_fail(
- server, zone, keys, expected, extra_keys, extra, primary=primary
+ server,
+ zone,
+ keys,
+ expected,
+ extra_keys,
+ extra,
+ primary=primary,
)
- # Remove actual ZSK.
- isctest.log.info(
- f"- zone {zone} {primary.identifier}: remove ZSK from other providers"
- )
-
- update_msg = dns.update.UpdateMessage(zone)
- for zsk in extra_keys:
- dnskey = str(zsk.dnskey).split()
- rdata = " ".join(dnskey[4:])
- update_msg.delete(f"{zone}.", "DNSKEY", rdata)
- primary.nsupdate(update_msg)
+ if update_any:
+ # Remove ZSK with update ANY.
+ isctest.log.info(
+ f"- zone {zone} {primary.identifier}: remove DNSKEY RRset with update ANY (expect ours)"
+ )
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.delete(f"{zone}.", "DNSKEY")
+ primary.nsupdate(update_msg)
+ else:
+ # Remove actual ZSK.
+ update_msg = dns.update.UpdateMessage(zone)
+ for zsk in extra_keys:
+ dnskey = str(zsk.dnskey).split()
+ rdata = " ".join(dnskey[4:])
+ update_msg.delete(f"{zone}.", "DNSKEY", rdata)
+ primary.nsupdate(update_msg)
wait_for_serial(primary, server, zone)
def check_remove_cdnskey(
- server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+ server,
+ zone,
+ keys,
+ expected,
+ extra_keys,
+ extra,
+ primary=None,
+ check_fail=False,
+ update_any=False,
):
isctest.log.info("remove cdnskey record:")
if check_fail:
_check_remove_cdnskey_fail(
- server, zone, keys, expected, extra_keys, extra, primary=primary
+ server,
+ zone,
+ keys,
+ expected,
+ extra_keys,
+ extra,
+ primary=primary,
)
- # Remove actual CDNSKEY.
- isctest.log.info(
- f"- zone {zone} {primary.identifier}: remove CDNSKEY from other providers"
- )
+ if update_any:
+ # Remove CDNSKEY with update ANY.
+ isctest.log.info(
+ f"- zone {zone} {primary.identifier}: remove CDNSKEY RRset with update ANY (expect ours)"
+ )
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.delete(f"{zone}.", "CDNSKEY")
+ primary.nsupdate(update_msg)
+ else:
+ # Remove actual CDNSKEY.
+ isctest.log.info(
+ f"- zone {zone} {primary.identifier}: remove CDNSKEY from other providers"
+ )
- update_msg = dns.update.UpdateMessage(zone)
- for ksk in extra_keys:
- dnskey = str(ksk.dnskey).split()
- rdata = " ".join(dnskey[4:])
- update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
- primary.nsupdate(update_msg)
+ update_msg = dns.update.UpdateMessage(zone)
+ for ksk in extra_keys:
+ dnskey = str(ksk.dnskey).split()
+ rdata = " ".join(dnskey[4:])
+ update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
+ primary.nsupdate(update_msg)
wait_for_serial(primary, server, zone)
def check_remove_cds(
- server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+ server,
+ zone,
+ keys,
+ expected,
+ extra_keys,
+ extra,
+ primary=None,
+ check_fail=False,
+ update_any=False,
):
isctest.log.info("remove cds record:")
server, zone, keys, expected, extra_keys, extra, primary=primary
)
- # Remove actual CDS.
- isctest.log.info(
- f"- zone {zone} {primary.identifier}: remove CDS from other providers"
- )
+ if update_any:
+ # Remove CDS with update ANY.
+ isctest.log.info(
+ f"- zone {zone} {primary.identifier}: remove CDS RRset with update ANY (expect ours)"
+ )
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.delete(f"{zone}.", "CDS")
+ primary.nsupdate(update_msg)
+ else:
+ # Remove actual CDS.
+ isctest.log.info(
+ f"- zone {zone} {primary.identifier}: remove CDS from other providers"
+ )
- update_msg = dns.update.UpdateMessage(zone)
- for ksk in extra_keys:
- ds = dsfromkey(ksk)
- rdata = " ".join(ds[4:])
- update_msg.delete(f"{zone}.", "CDS", rdata)
- primary.nsupdate(update_msg)
+ update_msg = dns.update.UpdateMessage(zone)
+ for ksk in extra_keys:
+ ds = dsfromkey(ksk)
+ rdata = " ".join(ds[4:])
+ update_msg.delete(f"{zone}.", "CDS", rdata)
+ primary.nsupdate(update_msg)
wait_for_serial(primary, server, zone)
check_no_dnssec_in_journal(ns4, zone)
+def test_multisigner_update_any(ns2, ns3, ns4, default_algorithm):
+ zone = "model2.update-any"
+ keyprops = [
+ f"ksk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
+ f"zsk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
+ ]
+
+ # First make sure the zone is properly signed.
+ isctest.log.info(f"basic DNSSEC tests for {zone}")
+ isctest.kasp.wait_keymgr_done(ns3, zone)
+ isctest.kasp.wait_keymgr_done(ns4, zone)
+
+ with ns3.watch_log_from_start() as watcher:
+ watcher.wait_for_line(
+ f"zone {zone}/IN: dsyncfetch: send NOTIFY(CDS) query to scanner.update-any"
+ )
+
+ with ns4.watch_log_from_start() as watcher:
+ watcher.wait_for_line(
+ f"zone {zone}/IN (signed): dsyncfetch: send NOTIFY(CDS) query to scanner.update-any"
+ )
+
+ with ns2.watch_log_from_start() as watcher:
+ # Receiving NOTIFY(CDS) has not been implemented yet. Until
+ # then, notifies for child zones towards the parent result in
+ # not authoritative (unless child and parent are served by the
+ # same name server).
+ watcher.wait_for_line(f"received notify for zone '{zone}': NOTAUTH")
+
+ keys3 = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
+ ksks3 = [k for k in keys3 if k.is_ksk()]
+ zsks3 = [k for k in keys3 if not k.is_ksk()]
+ expected3 = isctest.kasp.policy_to_properties(ttl=TTL, keys=keyprops)
+
+ check_dnssec(ns3, zone, keys3, expected3)
+
+ keys4 = isctest.kasp.keydir_to_keylist(zone, ns4.identifier)
+ ksks4 = [k for k in keys4 if k.is_ksk()]
+ zsks4 = [k for k in keys4 if not k.is_ksk()]
+ expected4 = isctest.kasp.policy_to_properties(ttl=TTL, keys=keyprops)
+
+ check_dnssec(ns4, zone, keys4, expected4)
+
+ # Add DNSKEY to RRset.
+ newprops = [f"zsk unlimited {default_algorithm.number} {default_algorithm.bits}"]
+ extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
+ extra[0].private = False
+ extra[0].legacy = True
+
+ check_add_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra)
+ check_add_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra)
+ check_no_dnssec_in_journal(ns4, zone)
+
+ # Remove DNSKEY from RRset.
+ check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra, update_any=True)
+ check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra, update_any=True)
+ check_no_dnssec_in_journal(ns4, zone)
+
+ # Add CDNSKEY RRset.
+ newprops = [f"ksk unlimited {default_algorithm.number} {default_algorithm.bits}"]
+ extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
+ extra[0].private = False
+ extra[0].legacy = True
+
+ check_add_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra)
+ check_add_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra)
+ check_no_dnssec_in_journal(ns4, zone)
+
+ # Remove CDNSKEY RRset.
+ check_remove_cdnskey(
+ ns3, zone, keys3, expected3, [ksks4[0]], extra, update_any=True
+ )
+ check_remove_cdnskey(
+ ns4, zone, keys4, expected4, [ksks3[0]], extra, update_any=True
+ )
+ check_no_dnssec_in_journal(ns4, zone)
+
+ # Update CDS RRset.
+ check_add_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra)
+ check_add_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra)
+ check_no_dnssec_in_journal(ns4, zone)
+
+ # Remove CDS RRset.
+ check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra, update_any=True)
+ check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra, update_any=True)
+ check_no_dnssec_in_journal(ns4, zone)
+
+
def test_multisigner_bad_dsync(ns3, ns4):
zone = "model2.bad-dsync"