by the AXFR-based regression test in this file.
"""
+from re import compile as Re
+
+import re
import time
import dns.rcode
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
+import dns.tsigkeyring
import dns.update
+import pytest
import isctest
+pytestmark = pytest.mark.extra_artifacts(
+ [
+ "ans*/ans.run",
+ "ns*/*.bk",
+ "ns*/*.conf",
+ "ns*/*.db",
+ "ns*/*.db.jnl",
+ "ns*/*.db.signed",
+ "ns*/*.jnl",
+ "ns*/*.key",
+ "ns*/K*.key",
+ "ns*/K*.private",
+ "ns*/K*.state",
+ "ns*/dsset-*.",
+ "ns6/sigaxfr.bk",
+ "verylarge",
+ ]
+)
+
def _make_sig_rdata(text):
"""Create a SIG rdata from text.
crash-reproducing precondition.
"""
ptr_update = dns.update.UpdateMessage("in-addr.arpa.")
- ptr_update.add("1.0.0.127.in-addr.arpa.", 600, "PTR", "localhost.")
+ ptr_update.add("1.0.53.10.in-addr.arpa.", 600, "PTR", "localhost.")
response = isctest.query.tcp(
- ptr_update, ns6.ip, port=ns6.ports.dns, source="127.0.0.1"
+ ptr_update, ns6.ip, port=ns6.ports.dns, source="10.53.0.1"
)
assert response.rcode() == dns.rcode.NOERROR
sig_update = dns.update.UpdateMessage("in-addr.arpa.")
sig_update.add("1.0.0.127.in-addr.arpa.", rds)
- response = isctest.query.tcp(
- sig_update, ns6.ip, port=ns6.ports.dns, source="127.0.0.1"
- )
- assert response.rcode() == dns.rcode.REFUSED
+ with ns6.watch_log_from_here() as watcher:
+ response = isctest.query.tcp(
+ sig_update, ns6.ip, port=ns6.ports.dns, source="10.53.0.1"
+ )
+ assert response.rcode() == dns.rcode.REFUSED
+
+ watcher.wait_for_line(
+ "updating zone 'in-addr.arpa/IN': update failed: SIG updates are not allowed (REFUSED)"
+ )
# Confirm nothing of type SIG was stored.
- msg = isctest.query.create("1.0.0.127.in-addr.arpa.", "SIG")
+ msg = isctest.query.create("1.0.53.10.in-addr.arpa.", "SIG")
res = isctest.query.tcp(msg, ns6.ip, port=ns6.ports.dns)
stored = any(rrset.rdtype == dns.rdatatype.SIG for rrset in res.answer)
assert not stored, "SIG record was stored despite REFUSED response"
+def test_tcp_self_nxt_record(ns6):
+ """NXT (type 30) updates must be refused at the front door.
+
+ NXT is the legacy DNSSEC denial-of-existence type, obsolete since
+ RFC 3755 replaced it with NSEC. Accepting it via dynamic update
+ would let an authorised updater inject records that the signing
+ and cut-point logic has no provision for.
+ """
+ # A second owner under a source that also matches tcp-self.
+ source = "10.53.0.2"
+ owner = "2.0.53.10.in-addr.arpa."
+
+ ptr_update = dns.update.UpdateMessage("in-addr.arpa.")
+ ptr_update.add(owner, 600, "PTR", "localhost.")
+ response = isctest.query.tcp(ptr_update, ns6.ip, port=ns6.ports.dns, source=source)
+ assert response.rcode() == dns.rcode.NOERROR
+
+ nxt = _make_nxt_rdata()
+ rds = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.NXT)
+ rds.update_ttl(600)
+ rds.add(nxt)
+ nxt_update = dns.update.UpdateMessage("in-addr.arpa.")
+ nxt_update.add(owner, rds)
+
+ with ns6.watch_log_from_here() as watcher:
+ response = isctest.query.tcp(
+ nxt_update, ns6.ip, port=ns6.ports.dns, source="10.53.0.1"
+ )
+ assert response.rcode() == dns.rcode.REFUSED
+
+ watcher.wait_for_line(
+ "updating zone 'in-addr.arpa/IN': update failed: NXT updates are not allowed (REFUSED)"
+ )
+
+
def test_sig_covers_preserved_via_axfr(ns6):
"""Regression test for GL#5818 Finding 1, reached via AXFR.
)
-def test_tcp_self_nxt_record(ns6):
- """NXT (type 30) updates must be refused at the front door.
-
- NXT is the legacy DNSSEC denial-of-existence type, obsolete since
- RFC 3755 replaced it with NSEC. Accepting it via dynamic update
- would let an authorised updater inject records that the signing
- and cut-point logic has no provision for.
+def parse_named_conf_keys(conf_text):
"""
- # A second owner under a source that also matches tcp-self.
- source = "127.0.0.2"
- owner = "2.0.0.127.in-addr.arpa."
+ Extract TSIG keys from a BIND named.conf-style string.
+ Returns a dict suitable for dns.tsigkeyring.from_text().
+ """
+ key_re = Re(
+ r'key\s+"(?P<name>[^"]+)"\s*\{(?P<body>.*?)\};', re.DOTALL | re.IGNORECASE
+ )
+ field_re = Re(r"(?P<field>algorithm|secret)\s+(?P<value>[^;]+);", re.IGNORECASE)
- ptr_update = dns.update.UpdateMessage("in-addr.arpa.")
- ptr_update.add(owner, 600, "PTR", "localhost.")
- response = isctest.query.tcp(ptr_update, ns6.ip, port=ns6.ports.dns, source=source)
+ keys = {}
+
+ for match in key_re.finditer(conf_text):
+ name = match.group("name")
+ body = match.group("body")
+
+ fields = {}
+ for fmatch in field_re.finditer(body):
+ field = fmatch.group("field").lower()
+ value = fmatch.group("value").strip().strip('"')
+ fields[field] = value
+
+ if "secret" not in fields:
+ continue # skip incomplete entries
+
+ algorithm = fields.get("algorithm", "hmac-sha256")
+
+ # Ensure FQDN key name
+ key_name = name if name.endswith(".") else name + "."
+
+ keys[key_name] = (algorithm.lower(), fields["secret"])
+
+ return keys
+
+
+def keyring_from_file(keyfile):
+ with open(keyfile, "r", encoding="utf-8") as file:
+ data = file.read()
+
+ keys = parse_named_conf_keys(data)
+ return dns.tsigkeyring.from_text(keys)
+
+
+def test_prereq_sig_record(ns1):
+ keyring = keyring_from_file("ns1/ddns.key")
+
+ # First, create the node.
+ node_update = dns.update.UpdateMessage("example.nil.", keyring=keyring)
+ node_update.add("sig.example.nil.", 600, "A", "10.53.0.11")
+ response = isctest.query.tcp(
+ node_update, ns1.ip, port=ns1.ports.dns, source="10.53.0.1"
+ )
assert response.rcode() == dns.rcode.NOERROR
- nxt = _make_nxt_rdata()
- rds = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.NXT)
- rds.update_ttl(600)
- rds.add(nxt)
- nxt_update = dns.update.UpdateMessage("in-addr.arpa.")
- nxt_update.add(owner, rds)
+ # Now require a SIG record at the same node — this triggers the
+ # dns_db_findrdataset() call with type=SIG and covers=A.
+ sig = _make_sig_rdata("A 6 0 86400 20260331170000 20260318160000 21831 . 0000")
+ rds = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.SIG)
+ rds.update_ttl(0)
+ rds.add(sig)
- response = isctest.query.tcp(nxt_update, ns6.ip, port=ns6.ports.dns, source=source)
- assert response.rcode() == dns.rcode.REFUSED
+ # First attempt with no matching credentials.
+ sig_update = dns.update.UpdateMessage("example.nil.") # no key
+ sig_update.present("sig.example.nil.", rds)
+ sig_update.add("sig.example.nil.", 600, "TXT", "I require SIG")
+
+ with ns1.watch_log_from_here() as watcher:
+ response = isctest.query.tcp(
+ sig_update, ns1.ip, port=ns1.ports.dns, source="10.53.0.1"
+ )
+ assert response.rcode() == dns.rcode.REFUSED
+
+ watcher.wait_for_sequence(
+ [
+ "update-policy: using: signer= name=sig.example.nil addr=10.53.0.1 tcp=1 type=TXT target=",
+ "update-policy: trying: grant zonesub-key.example.nil zonesub TXT",
+ "update-policy: trying: grant ddns-key.example.nil subdomain example.nil ANY",
+ "update-policy: no match found",
+ "updating zone 'example.nil/IN': update failed: rejected by secure update (REFUSED)",
+ ]
+ )
+
+ # Second attempt with the right key.
+ sig_update = dns.update.UpdateMessage("example.nil.", keyring=keyring)
+ sig_update.present("sig.example.nil.", rds)
+ sig_update.add("sig.example.nil.", 600, "TXT", "I require SIG")
+
+ with ns1.watch_log_from_here() as watcher:
+ response = isctest.query.tcp(
+ sig_update, ns1.ip, port=ns1.ports.dns, source="10.53.0.1"
+ )
+ assert response.rcode() == dns.rcode.NXRRSET
+
+ watcher.wait_for_sequence(
+ [
+ "update-policy: using: signer=ddns-key.example.nil name=sig.example.nil addr=10.53.0.1 tcp=1 type=TXT target=",
+ "update-policy: trying: grant zonesub-key.example.nil zonesub TXT",
+ "update-policy: trying: grant ddns-key.example.nil subdomain example.nil ANY",
+ "update-policy: matched: grant ddns-key.example.nil subdomain example.nil ANY",
+ "updating zone 'example.nil/IN': update section prescan OK",
+ "updating zone 'example.nil/IN': update unsuccessful: sig.example.nil/SIG: 'RRset exists (value dependent)' prerequisite not satisfied (NXRRSET)",
+ "updating zone 'example.nil/IN': rolling back",
+ ]
+ )