From: Matthijs Mekking Date: Fri, 10 Oct 2025 16:02:58 +0000 (+0200) Subject: Update multisigner system test to set primary X-Git-Tag: v9.21.16~12^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fdf8a171c5058df7beb3500cad6bdcdbe731410c;p=thirdparty%2Fbind9.git Update multisigner system test to set primary When testing multi-signer as bump-in-the-wire (upcoming test), we want to be able to do dynamically updates to a hidden primary. Update the test functions such that we can set a specific primary server. --- diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index 86610e80dea..2e9e5cd588b 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -111,11 +111,14 @@ def check_no_dnssec_in_journal(server, zone): assert not match, f"{match.group(1)} record found in journal" -def check_add_zsk(server, zone, keys, expected, extra_keys, extra): +def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None): + if primary is None: + primary = server + isctest.log.info("add dnskey record:") isctest.log.info( - f"- zone {zone} {server.identifier}: update zone with ZSK from other providers" + f"- zone {zone} {primary.identifier}: update zone with ZSK from other providers" ) update_msg = dns.update.UpdateMessage(zone) @@ -123,7 +126,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra): dnskey = zsk.dnskey().split() rdata = " ".join(dnskey[4:]) update_msg.add(f"{zone}.", TTL, "DNSKEY", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # Check the new DNSKEY RRset. isctest.log.info( @@ -148,11 +151,14 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra): server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}") -def check_remove_zsk(server, zone, keys, expected, extra_keys, extra): - isctest.log.info("remove dnskey record:") +def _check_remove_zsk_fail( + server, zone, keys, expected, extra_keys, extra, primary=None +): + if primary is None: + primary = server isctest.log.info( - f"- zone {zone} {server.identifier}: try to remove own ZSK (should fail)" + f"- zone {zone} {primary.identifier}: try to remove own ZSK (should fail)" ) zsks = [k for k in keys if not k.is_ksk()] @@ -160,15 +166,15 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra): rdata = " ".join(dnskey[4:]) update_msg = dns.update.UpdateMessage(zone) update_msg.delete(f"{zone}.", "DNSKEY", rdata) - with server.watch_log_from_here() as watcher: - server.nsupdate(update_msg) + with primary.watch_log_from_here() as watcher: + primary.nsupdate(update_msg) watcher.wait_for_line( f"updating zone '{zone}/IN': attempt to delete in use DNSKEY ignored" ) # Both ZSKs should still be published. isctest.log.info( - f"- zone {zone} {server.identifier}: check DNSKEY RRset after update remove" + f"- zone {zone} {server.identifier}: check DNSKEY RRset after ignored remove" ) check_dnssec(server, zone, keys + extra_keys, expected + extra) @@ -181,9 +187,23 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra): isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run") check_dnssec(server, zone, keys + extra_keys, expected + extra) + +def check_remove_zsk( + server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False +): + isctest.log.info("remove dnskey record:") + + if primary is None: + primary = server + + if check_fail: + _check_remove_zsk_fail( + server, zone, keys, expected, extra_keys, extra, primary=primary + ) + # Remove actual ZSK. isctest.log.info( - f"- zone {zone} {server.identifier}: remove ZSK from other providers" + f"- zone {zone} {primary.identifier}: remove ZSK from other providers" ) update_msg = dns.update.UpdateMessage(zone) @@ -191,7 +211,7 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra): dnskey = zsk.dnskey().split() rdata = " ".join(dnskey[4:]) update_msg.delete(f"{zone}.", "DNSKEY", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # We should have only the KSK and ZSK from server. isctest.log.info( @@ -209,11 +229,14 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra): check_dnssec(server, zone, keys, expected) -def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra): +def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra, primary=None): + if primary is None: + primary = server + isctest.log.info("add cdnskey record:") isctest.log.info( - f"- zone {zone} {server.identifier}: update zone with CDNSKEY from other providers" + f"- zone {zone} {primary.identifier}: update zone with CDNSKEY from other providers" ) # Retrieve CDNSKEY records from the other providers. @@ -222,7 +245,7 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra): dnskey = ksk.dnskey().split() rdata = " ".join(dnskey[4:]) update_msg.add(f"{zone}.", TTL, "CDNSKEY", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # Now there should be two CDNSKEY records. isctest.log.info( @@ -240,11 +263,14 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra): check_dnssec(server, zone, keys + extra_keys, expected + extra) -def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra): - isctest.log.info("remove cdnskey record:") +def _check_remove_cdnskey_fail( + server, zone, keys, expected, extra_keys, extra, primary=None +): + if primary is None: + primary = server isctest.log.info( - f"- zone {zone} {server.identifier}: try to remove own CDNSKEY (should fail)" + f"- zone {zone} {primary.identifier}: try to remove own CDNSKEY (should fail)" ) ksks = [k for k in keys if not k.is_ksk()] @@ -252,15 +278,15 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra): rdata = " ".join(dnskey[4:]) update_msg = dns.update.UpdateMessage(zone) update_msg.delete(f"{zone}.", "CDNSKEY", rdata) - with server.watch_log_from_here() as watcher: - server.nsupdate(update_msg) + with primary.watch_log_from_here() as watcher: + primary.nsupdate(update_msg) watcher.wait_for_line( f"updating zone '{zone}/IN': attempt to delete in use CDNSKEY ignored" ) # Both CDNSKEY records should still be published. isctest.log.info( - f"- zone {zone} {server.identifier}: check CDNSKEY RRset after update remove" + f"- zone {zone} {server.identifier}: check CDNSKEY RRset after ignored remove" ) check_dnssec(server, zone, keys + extra_keys, expected + extra) @@ -273,9 +299,23 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra): isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run") check_dnssec(server, zone, keys + extra_keys, expected + extra) + +def check_remove_cdnskey( + server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False +): + isctest.log.info("remove cdnskey record:") + + if primary is None: + primary = server + + if check_fail: + _check_remove_cdnskey_fail( + server, zone, keys, expected, extra_keys, extra, primary=primary + ) + # Remove actual CDNSKEY. isctest.log.info( - f"- zone {zone} {server.identifier}: remove CDNSKEY from other providers" + f"- zone {zone} {primary.identifier}: remove CDNSKEY from other providers" ) update_msg = dns.update.UpdateMessage(zone) @@ -283,7 +323,7 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra): dnskey = ksk.dnskey().split() rdata = " ".join(dnskey[4:]) update_msg.delete(f"{zone}.", "CDNSKEY", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # Now there should be one CDNSKEY record again. isctest.log.info( @@ -301,11 +341,14 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra): check_dnssec(server, zone, keys, expected) -def check_add_cds(server, zone, keys, expected, extra_keys, extra): +def check_add_cds(server, zone, keys, expected, extra_keys, extra, primary=None): isctest.log.info("add cds record:") + if primary is None: + primary = server + isctest.log.info( - f"- zone {zone} {server.identifier}: update zone with CDS from other providers" + f"- zone {zone} {primary.identifier}: update zone with CDS from other providers" ) # Retrieve CDS records from the other providers. @@ -314,7 +357,7 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra): ds = dsfromkey(ksk) rdata = " ".join(ds[4:]) update_msg.add(f"{zone}.", TTL, "CDS", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # Now there should be two CDS records. isctest.log.info( @@ -332,11 +375,14 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra): check_dnssec(server, zone, keys + extra_keys, expected + extra) -def check_remove_cds(server, zone, keys, expected, extra_keys, extra): - isctest.log.info("remove cds record:") +def _check_remove_cds_fail( + server, zone, keys, expected, extra_keys, extra, primary=None +): + if primary is None: + primary = server isctest.log.info( - f"- zone {zone} {server.identifier}: try to remove own CDS (should fail)" + f"- zone {zone} {primary.identifier}: try to remove own CDS (should fail)" ) ksks = [k for k in keys if not k.is_ksk()] @@ -344,15 +390,15 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra): rdata = " ".join(ds[4:]) update_msg = dns.update.UpdateMessage(zone) update_msg.delete(f"{zone}.", "CDS", rdata) - with server.watch_log_from_here() as watcher: - server.nsupdate(update_msg) + with primary.watch_log_from_here() as watcher: + primary.nsupdate(update_msg) watcher.wait_for_line( f"updating zone '{zone}/IN': attempt to delete in use CDS ignored" ) # Both CDS records should still be published. isctest.log.info( - f"- zone {zone} {server.identifier}: check CDS RRset after update remove" + f"- zone {zone} {server.identifier}: check CDS RRset after ignored remove" ) check_dnssec(server, zone, keys + extra_keys, expected + extra) @@ -365,9 +411,23 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra): isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run") check_dnssec(server, zone, keys + extra_keys, expected + extra) + +def check_remove_cds( + server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False +): + isctest.log.info("remove cds record:") + + if primary is None: + primary = server + + if check_fail: + _check_remove_cds_fail( + server, zone, keys, expected, extra_keys, extra, primary=primary + ) + # Remove actual CDS. isctest.log.info( - f"- zone {zone} {server.identifier}: remove CDS from other providers" + f"- zone {zone} {primary.identifier}: remove CDS from other providers" ) update_msg = dns.update.UpdateMessage(zone) @@ -375,7 +435,7 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra): ds = dsfromkey(ksk) rdata = " ".join(ds[4:]) update_msg.delete(f"{zone}.", "CDS", rdata) - server.nsupdate(update_msg) + primary.nsupdate(update_msg) # Now there should be one CDS record again. isctest.log.info( @@ -430,8 +490,8 @@ def test_multisigner(ns3, ns4): check_no_dnssec_in_journal(ns4, zone) # Remove DNSKEY from RRset. - check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra) - check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra) + check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra, check_fail=True) + check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra, check_fail=True) check_no_dnssec_in_journal(ns4, zone) # Add CDNSKEY RRset. @@ -445,8 +505,12 @@ def test_multisigner(ns3, ns4): check_no_dnssec_in_journal(ns4, zone) # Remove CDNSKEY RRset. - check_remove_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra) - check_remove_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra) + check_remove_cdnskey( + ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True + ) + check_remove_cdnskey( + ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True + ) check_no_dnssec_in_journal(ns4, zone) # Update CDS RRset. @@ -455,6 +519,6 @@ def test_multisigner(ns3, ns4): check_no_dnssec_in_journal(ns4, zone) # Remove CDS RRset. - check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra) - check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra) + check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True) + check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True) check_no_dnssec_in_journal(ns4, zone)