from pathlib import Path
import re
+import dns.message
+import dns.rcode
+
+from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
-from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere
+from .query import udp
class NamedPorts(NamedTuple):
return response
+ def nsupdate(self, update_msg: dns.message.Message):
+ """
+ Issue a dynamic update to a server's zone.
+ """
+ # FUTURE update_msg is actually dns.update.UpdateMessage, but it not
+ # typed properly here in order to support use of this module with
+ # dnspython<2.0.0
+ zone = str(update_msg.zone[0].name) # type: ignore[attr-defined]
+ try:
+ response = udp(
+ update_msg,
+ self.ip,
+ self.ports.dns,
+ timeout=3,
+ expected_rcode=dns.rcode.NOERROR,
+ )
+ except dns.exception.Timeout as exc:
+ msg = f"update timeout for {zone}"
+ raise dns.exception.Timeout(msg) from exc
+ debug(f"update of zone {zone} to server {self.ip} successful")
+ return response
+
def watch_log_from_start(self) -> WatchLogFromStart:
"""
Return an instance of the `WatchLogFromStart` context manager for this
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
- # Update zone with nsupdate.
- def nsupdate(updates):
- message = dns.update.UpdateMessage(zone)
- for update in updates:
- if update[0] == "del":
- message.delete(update[1], update[2], update[3])
- else:
- assert update[0] == "add"
- message.add(update[1], update[2], update[3], update[4])
-
- try:
- response = isctest.query.udp(
- message, server.ip, server.ports.dns, timeout=3
- )
- assert response.rcode() == dns.rcode.NOERROR
- except dns.exception.Timeout:
- assert False, f"update timeout for {zone}"
-
- isctest.log.debug(f"update of zone {zone} to server {server.ip} successful")
-
def update_is_signed():
parts = update.split()
qname = parts[0]
server, zone, qname, qtype, rdata, keys, []
)
- updates = [
- ["del", f"a.{zone}.", "A", "10.0.0.1"],
- ["add", f"a.{zone}.", 300, "A", "10.0.0.101"],
- ["add", f"d.{zone}.", 300, "A", "10.0.0.4"],
- ]
- nsupdate(updates)
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.delete(f"a.{zone}.", "A", "10.0.0.1")
+ update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.101")
+ update_msg.add(f"d.{zone}.", 300, "A", "10.0.0.4")
+ server.nsupdate(update_msg)
expected_updates = [f"a.{zone}. A 10.0.0.101", f"d.{zone}. A 10.0.0.4"]
for update in expected_updates:
isctest.run.retry_with_timeout(update_is_signed, timeout=5)
- # Update zone with nsupdate (reverting the above change).
- updates = [
- ["add", f"a.{zone}.", 300, "A", "10.0.0.1"],
- ["del", f"a.{zone}.", "A", "10.0.0.101"],
- ["del", f"d.{zone}.", "A", "10.0.0.4"],
- ]
- nsupdate(updates)
+ # Update zone (reverting the above change).
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.1")
+ update_msg.delete(f"a.{zone}.", "A", "10.0.0.101")
+ update_msg.delete(f"d.{zone}.", "A", "10.0.0.4")
+ server.nsupdate(update_msg)
update = f"a.{zone}. A 10.0.0.1"
isctest.run.retry_with_timeout(update_is_signed, timeout=5)
return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
- def nsupdate(updates):
- message = dns.update.UpdateMessage(zone)
- for update in updates:
- if update[0] == 0:
- message.delete(update[1], update[2], update[3])
- else:
- message.add(update[1], update[2], update[3], update[4])
-
- try:
- response = isctest.query.udp(
- message, server.ip, server.ports.dns, timeout=3
- )
- assert response.rcode() == dns.rcode.NOERROR
- except dns.exception.Timeout:
- isctest.log.info(f"error: update timeout for {zone}")
-
zone = "multisigner-model2.kasp"
isctest.kasp.check_dnssec_verify(server, zone)
dnskey = newkeys[0].dnskey().split()
rdata = " ".join(dnskey[4:])
- updates = [[1, f"{dnskey[0]}", 3600, "DNSKEY", rdata]]
- nsupdate(updates)
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.add(f"{dnskey[0]}", 3600, "DNSKEY", rdata)
+ server.nsupdate(update_msg)
isctest.kasp.check_dnssec_verify(server, zone)
# Remove ZSKs from the other providers for zone.
dnskey2 = extkeys[0].dnskey().split()
rdata2 = " ".join(dnskey2[4:])
- updates = [
- [0, f"{dnskey[0]}", "DNSKEY", rdata],
- [0, f"{dnskey2[0]}", "DNSKEY", rdata2],
- ]
- nsupdate(updates)
+ update_msg = dns.update.UpdateMessage(zone)
+ update_msg.delete(f"{dnskey[0]}", "DNSKEY", rdata)
+ update_msg.delete(f"{dnskey2[0]}", "DNSKEY", rdata2)
+ server.nsupdate(update_msg)
isctest.kasp.check_dnssec_verify(server, zone)