]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Update multisigner system test to set primary
authorMatthijs Mekking <matthijs@isc.org>
Fri, 10 Oct 2025 16:02:58 +0000 (18:02 +0200)
committerMatthijs Mekking <matthijs@isc.org>
Fri, 28 Nov 2025 14:30:31 +0000 (14:30 +0000)
When testing multi-signer as bump-in-the-wire (upcoming test), we want
to be able to do dynamically updates to a hidden primary. Update the
test functions such that we can set a specific primary server.

bin/tests/system/multisigner/tests_multisigner.py

index 86610e80dea8337f85692a8746bbb04d686a0847..2e9e5cd588b0a45d1dfebecd54bc5c26134b4af2 100644 (file)
@@ -111,11 +111,14 @@ def check_no_dnssec_in_journal(server, zone):
     assert not match, f"{match.group(1)} record found in journal"
 
 
-def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
+def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None):
+    if primary is None:
+        primary = server
+
     isctest.log.info("add dnskey record:")
 
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: update zone with ZSK from other providers"
+        f"- zone {zone} {primary.identifier}: update zone with ZSK from other providers"
     )
 
     update_msg = dns.update.UpdateMessage(zone)
@@ -123,7 +126,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
         dnskey = zsk.dnskey().split()
         rdata = " ".join(dnskey[4:])
         update_msg.add(f"{zone}.", TTL, "DNSKEY", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # Check the new DNSKEY RRset.
     isctest.log.info(
@@ -148,11 +151,14 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
     server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
 
 
-def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
-    isctest.log.info("remove dnskey record:")
+def _check_remove_zsk_fail(
+    server, zone, keys, expected, extra_keys, extra, primary=None
+):
+    if primary is None:
+        primary = server
 
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: try to remove own ZSK (should fail)"
+        f"- zone {zone} {primary.identifier}: try to remove own ZSK (should fail)"
     )
 
     zsks = [k for k in keys if not k.is_ksk()]
@@ -160,15 +166,15 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
     rdata = " ".join(dnskey[4:])
     update_msg = dns.update.UpdateMessage(zone)
     update_msg.delete(f"{zone}.", "DNSKEY", rdata)
-    with server.watch_log_from_here() as watcher:
-        server.nsupdate(update_msg)
+    with primary.watch_log_from_here() as watcher:
+        primary.nsupdate(update_msg)
         watcher.wait_for_line(
             f"updating zone '{zone}/IN': attempt to delete in use DNSKEY ignored"
         )
 
     # Both ZSKs should still be published.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: check DNSKEY RRset after update remove"
+        f"- zone {zone} {server.identifier}: check DNSKEY RRset after ignored remove"
     )
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
@@ -181,9 +187,23 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
     isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
+
+def check_remove_zsk(
+    server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+):
+    isctest.log.info("remove dnskey record:")
+
+    if primary is None:
+        primary = server
+
+    if check_fail:
+        _check_remove_zsk_fail(
+            server, zone, keys, expected, extra_keys, extra, primary=primary
+        )
+
     # Remove actual ZSK.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: remove ZSK from other providers"
+        f"- zone {zone} {primary.identifier}: remove ZSK from other providers"
     )
 
     update_msg = dns.update.UpdateMessage(zone)
@@ -191,7 +211,7 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
         dnskey = zsk.dnskey().split()
         rdata = " ".join(dnskey[4:])
         update_msg.delete(f"{zone}.", "DNSKEY", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # We should have only the KSK and ZSK from server.
     isctest.log.info(
@@ -209,11 +229,14 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
     check_dnssec(server, zone, keys, expected)
 
 
-def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
+def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra, primary=None):
+    if primary is None:
+        primary = server
+
     isctest.log.info("add cdnskey record:")
 
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: update zone with CDNSKEY from other providers"
+        f"- zone {zone} {primary.identifier}: update zone with CDNSKEY from other providers"
     )
 
     # Retrieve CDNSKEY records from the other providers.
@@ -222,7 +245,7 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
         dnskey = ksk.dnskey().split()
         rdata = " ".join(dnskey[4:])
         update_msg.add(f"{zone}.", TTL, "CDNSKEY", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # Now there should be two CDNSKEY records.
     isctest.log.info(
@@ -240,11 +263,14 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
 
-def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
-    isctest.log.info("remove cdnskey record:")
+def _check_remove_cdnskey_fail(
+    server, zone, keys, expected, extra_keys, extra, primary=None
+):
+    if primary is None:
+        primary = server
 
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: try to remove own CDNSKEY (should fail)"
+        f"- zone {zone} {primary.identifier}: try to remove own CDNSKEY (should fail)"
     )
 
     ksks = [k for k in keys if not k.is_ksk()]
@@ -252,15 +278,15 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
     rdata = " ".join(dnskey[4:])
     update_msg = dns.update.UpdateMessage(zone)
     update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
-    with server.watch_log_from_here() as watcher:
-        server.nsupdate(update_msg)
+    with primary.watch_log_from_here() as watcher:
+        primary.nsupdate(update_msg)
         watcher.wait_for_line(
             f"updating zone '{zone}/IN': attempt to delete in use CDNSKEY ignored"
         )
 
     # Both CDNSKEY records should still be published.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: check CDNSKEY RRset after update remove"
+        f"- zone {zone} {server.identifier}: check CDNSKEY RRset after ignored remove"
     )
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
@@ -273,9 +299,23 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
     isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
+
+def check_remove_cdnskey(
+    server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+):
+    isctest.log.info("remove cdnskey record:")
+
+    if primary is None:
+        primary = server
+
+    if check_fail:
+        _check_remove_cdnskey_fail(
+            server, zone, keys, expected, extra_keys, extra, primary=primary
+        )
+
     # Remove actual CDNSKEY.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: remove CDNSKEY from other providers"
+        f"- zone {zone} {primary.identifier}: remove CDNSKEY from other providers"
     )
 
     update_msg = dns.update.UpdateMessage(zone)
@@ -283,7 +323,7 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
         dnskey = ksk.dnskey().split()
         rdata = " ".join(dnskey[4:])
         update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # Now there should be one CDNSKEY record again.
     isctest.log.info(
@@ -301,11 +341,14 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
     check_dnssec(server, zone, keys, expected)
 
 
-def check_add_cds(server, zone, keys, expected, extra_keys, extra):
+def check_add_cds(server, zone, keys, expected, extra_keys, extra, primary=None):
     isctest.log.info("add cds record:")
 
+    if primary is None:
+        primary = server
+
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: update zone with CDS from other providers"
+        f"- zone {zone} {primary.identifier}: update zone with CDS from other providers"
     )
 
     # Retrieve CDS records from the other providers.
@@ -314,7 +357,7 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra):
         ds = dsfromkey(ksk)
         rdata = " ".join(ds[4:])
         update_msg.add(f"{zone}.", TTL, "CDS", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # Now there should be two CDS records.
     isctest.log.info(
@@ -332,11 +375,14 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra):
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
 
-def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
-    isctest.log.info("remove cds record:")
+def _check_remove_cds_fail(
+    server, zone, keys, expected, extra_keys, extra, primary=None
+):
+    if primary is None:
+        primary = server
 
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: try to remove own CDS (should fail)"
+        f"- zone {zone} {primary.identifier}: try to remove own CDS (should fail)"
     )
 
     ksks = [k for k in keys if not k.is_ksk()]
@@ -344,15 +390,15 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
     rdata = " ".join(ds[4:])
     update_msg = dns.update.UpdateMessage(zone)
     update_msg.delete(f"{zone}.", "CDS", rdata)
-    with server.watch_log_from_here() as watcher:
-        server.nsupdate(update_msg)
+    with primary.watch_log_from_here() as watcher:
+        primary.nsupdate(update_msg)
         watcher.wait_for_line(
             f"updating zone '{zone}/IN': attempt to delete in use CDS ignored"
         )
 
     # Both CDS records should still be published.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: check CDS RRset after update remove"
+        f"- zone {zone} {server.identifier}: check CDS RRset after ignored remove"
     )
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
@@ -365,9 +411,23 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
     isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
     check_dnssec(server, zone, keys + extra_keys, expected + extra)
 
+
+def check_remove_cds(
+    server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
+):
+    isctest.log.info("remove cds record:")
+
+    if primary is None:
+        primary = server
+
+    if check_fail:
+        _check_remove_cds_fail(
+            server, zone, keys, expected, extra_keys, extra, primary=primary
+        )
+
     # Remove actual CDS.
     isctest.log.info(
-        f"- zone {zone} {server.identifier}: remove CDS from other providers"
+        f"- zone {zone} {primary.identifier}: remove CDS from other providers"
     )
 
     update_msg = dns.update.UpdateMessage(zone)
@@ -375,7 +435,7 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
         ds = dsfromkey(ksk)
         rdata = " ".join(ds[4:])
         update_msg.delete(f"{zone}.", "CDS", rdata)
-    server.nsupdate(update_msg)
+    primary.nsupdate(update_msg)
 
     # Now there should be one CDS record again.
     isctest.log.info(
@@ -430,8 +490,8 @@ def test_multisigner(ns3, ns4):
     check_no_dnssec_in_journal(ns4, zone)
 
     # Remove DNSKEY from RRset.
-    check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra)
-    check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra)
+    check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra, check_fail=True)
+    check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra, check_fail=True)
     check_no_dnssec_in_journal(ns4, zone)
 
     # Add CDNSKEY RRset.
@@ -445,8 +505,12 @@ def test_multisigner(ns3, ns4):
     check_no_dnssec_in_journal(ns4, zone)
 
     # Remove CDNSKEY RRset.
-    check_remove_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra)
-    check_remove_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra)
+    check_remove_cdnskey(
+        ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True
+    )
+    check_remove_cdnskey(
+        ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True
+    )
     check_no_dnssec_in_journal(ns4, zone)
 
     # Update CDS RRset.
@@ -455,6 +519,6 @@ def test_multisigner(ns3, ns4):
     check_no_dnssec_in_journal(ns4, zone)
 
     # Remove CDS RRset.
-    check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra)
-    check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra)
+    check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True)
+    check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True)
     check_no_dnssec_in_journal(ns4, zone)