]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add dynamic update facility to NamedInstance
authorNicki Křížek <nicki@isc.org>
Mon, 26 May 2025 15:10:15 +0000 (17:10 +0200)
committerMatthijs Mekking <matthijs@isc.org>
Mon, 2 Jun 2025 09:21:06 +0000 (09:21 +0000)
Deduplicate the code for dynamic updates and increase code clarity by
using an actual dns.update.UpdateMessage rather than an undefined
intermediary format passed around as a list of arguments.

bin/tests/system/isctest/instance.py
bin/tests/system/kasp/tests_kasp.py
bin/tests/system/rollover/tests_rollover.py

index 5725b3e13219051bcf33ee636333343159547b5d..adbe2f8fb43645dc0fbb1e99d528fb7cd18e9ad4 100644 (file)
@@ -18,9 +18,13 @@ import os
 from pathlib import Path
 import re
 
+import dns.message
+import dns.rcode
+
+from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
 from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
 from .run import perl
-from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere
+from .query import udp
 
 
 class NamedPorts(NamedTuple):
@@ -157,6 +161,28 @@ class NamedInstance:
 
         return response
 
+    def nsupdate(self, update_msg: dns.message.Message):
+        """
+        Issue a dynamic update to a server's zone.
+        """
+        # FUTURE update_msg is actually dns.update.UpdateMessage, but it not
+        # typed properly here in order to support use of this module with
+        # dnspython<2.0.0
+        zone = str(update_msg.zone[0].name)  # type: ignore[attr-defined]
+        try:
+            response = udp(
+                update_msg,
+                self.ip,
+                self.ports.dns,
+                timeout=3,
+                expected_rcode=dns.rcode.NOERROR,
+            )
+        except dns.exception.Timeout as exc:
+            msg = f"update timeout for {zone}"
+            raise dns.exception.Timeout(msg) from exc
+        debug(f"update of zone {zone} to server {self.ip} successful")
+        return response
+
     def watch_log_from_start(self) -> WatchLogFromStart:
         """
         Return an instance of the `WatchLogFromStart` context manager for this
index 6cbb87080b228b5c198967103d7fce3393719535..542f7f496c7ba532501bc9a3d5c14ef28865cc4b 100644 (file)
@@ -938,26 +938,6 @@ def test_kasp_dynamic(servers):
     isctest.kasp.check_keytimes(keys, expected)
     check_all(server, zone, policy, keys, [])
 
-    # Update zone with nsupdate.
-    def nsupdate(updates):
-        message = dns.update.UpdateMessage(zone)
-        for update in updates:
-            if update[0] == "del":
-                message.delete(update[1], update[2], update[3])
-            else:
-                assert update[0] == "add"
-                message.add(update[1], update[2], update[3], update[4])
-
-        try:
-            response = isctest.query.udp(
-                message, server.ip, server.ports.dns, timeout=3
-            )
-            assert response.rcode() == dns.rcode.NOERROR
-        except dns.exception.Timeout:
-            assert False, f"update timeout for {zone}"
-
-        isctest.log.debug(f"update of zone {zone} to server {server.ip} successful")
-
     def update_is_signed():
         parts = update.split()
         qname = parts[0]
@@ -967,24 +947,22 @@ def test_kasp_dynamic(servers):
             server, zone, qname, qtype, rdata, keys, []
         )
 
-    updates = [
-        ["del", f"a.{zone}.", "A", "10.0.0.1"],
-        ["add", f"a.{zone}.", 300, "A", "10.0.0.101"],
-        ["add", f"d.{zone}.", 300, "A", "10.0.0.4"],
-    ]
-    nsupdate(updates)
+    update_msg = dns.update.UpdateMessage(zone)
+    update_msg.delete(f"a.{zone}.", "A", "10.0.0.1")
+    update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.101")
+    update_msg.add(f"d.{zone}.", 300, "A", "10.0.0.4")
+    server.nsupdate(update_msg)
 
     expected_updates = [f"a.{zone}. A 10.0.0.101", f"d.{zone}. A 10.0.0.4"]
     for update in expected_updates:
         isctest.run.retry_with_timeout(update_is_signed, timeout=5)
 
-    # Update zone with nsupdate (reverting the above change).
-    updates = [
-        ["add", f"a.{zone}.", 300, "A", "10.0.0.1"],
-        ["del", f"a.{zone}.", "A", "10.0.0.101"],
-        ["del", f"d.{zone}.", "A", "10.0.0.4"],
-    ]
-    nsupdate(updates)
+    # Update zone (reverting the above change).
+    update_msg = dns.update.UpdateMessage(zone)
+    update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.1")
+    update_msg.delete(f"a.{zone}.", "A", "10.0.0.101")
+    update_msg.delete(f"d.{zone}.", "A", "10.0.0.4")
+    server.nsupdate(update_msg)
 
     update = f"a.{zone}. A 10.0.0.1"
     isctest.run.retry_with_timeout(update_is_signed, timeout=5)
index 0eec2932bd352bc7cce1e810660317c02a95a7cb..a3bb20650ebfa4abc1f2080d91446475d8554f15 100644 (file)
@@ -263,22 +263,6 @@ def test_rollover_multisigner(servers):
 
         return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
 
-    def nsupdate(updates):
-        message = dns.update.UpdateMessage(zone)
-        for update in updates:
-            if update[0] == 0:
-                message.delete(update[1], update[2], update[3])
-            else:
-                message.add(update[1], update[2], update[3], update[4])
-
-        try:
-            response = isctest.query.udp(
-                message, server.ip, server.ports.dns, timeout=3
-            )
-            assert response.rcode() == dns.rcode.NOERROR
-        except dns.exception.Timeout:
-            isctest.log.info(f"error: update timeout for {zone}")
-
     zone = "multisigner-model2.kasp"
 
     isctest.kasp.check_dnssec_verify(server, zone)
@@ -322,8 +306,9 @@ def test_rollover_multisigner(servers):
     dnskey = newkeys[0].dnskey().split()
     rdata = " ".join(dnskey[4:])
 
-    updates = [[1, f"{dnskey[0]}", 3600, "DNSKEY", rdata]]
-    nsupdate(updates)
+    update_msg = dns.update.UpdateMessage(zone)
+    update_msg.add(f"{dnskey[0]}", 3600, "DNSKEY", rdata)
+    server.nsupdate(update_msg)
 
     isctest.kasp.check_dnssec_verify(server, zone)
 
@@ -336,11 +321,10 @@ def test_rollover_multisigner(servers):
     # Remove ZSKs from the other providers for zone.
     dnskey2 = extkeys[0].dnskey().split()
     rdata2 = " ".join(dnskey2[4:])
-    updates = [
-        [0, f"{dnskey[0]}", "DNSKEY", rdata],
-        [0, f"{dnskey2[0]}", "DNSKEY", rdata2],
-    ]
-    nsupdate(updates)
+    update_msg = dns.update.UpdateMessage(zone)
+    update_msg.delete(f"{dnskey[0]}", "DNSKEY", rdata)
+    update_msg.delete(f"{dnskey2[0]}", "DNSKEY", rdata2)
+    server.nsupdate(update_msg)
 
     isctest.kasp.check_dnssec_verify(server, zone)