--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+"""
+GL#5818 Finding 1 regression support — AsyncDnsServer primary.
+
+Serves a minimal zone "sigaxfr.nil." whose AXFR carries two SIG records
+at the same owner with different covered types (A and MX) and different
+TTLs (600 and 1200). A buggy secondary running dns_diff_load() with
+rdata_covers() that only recognises RRSIG will file both rdatas under
+typepair (SIG, 0) with the first tuple's TTL; a fixed secondary keeps
+them under (SIG, A) and (SIG, MX) with their distinct TTLs.
+"""
+
+from collections.abc import AsyncGenerator
+
+import dns.name
+import dns.rcode
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+from isctest.asyncserver import (
+ AsyncDnsServer,
+ DnsResponseSend,
+ DomainHandler,
+ QueryContext,
+ ResponseAction,
+)
+
+ZONE = dns.name.from_text("sigaxfr.nil.")
+NS_NAME = dns.name.from_text("ns.sigaxfr.nil.")
+HOST = dns.name.from_text("host.sigaxfr.nil.")
+
+SOA_TEXT = "ns.sigaxfr.nil. hostmaster.sigaxfr.nil. 1 3600 1200 604800 3600"
+
+
+def _make_sig_rdata(covered_text):
+ """Produce a legacy SIG (24) rdata via RRSIG (46) round-trip."""
+ rrsig = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.RRSIG, covered_text)
+ wire = rrsig.to_digestable()
+ return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.SIG, wire, 0, len(wire))
+
+
+class SigAxfrServer(DomainHandler):
+ """Serve SOA and AXFR for sigaxfr.nil.; other qtypes get NOERROR/NODATA."""
+
+ domains = ["sigaxfr.nil."]
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[ResponseAction, None]:
+ soa_rrset = dns.rrset.from_text(
+ ZONE, 3600, dns.rdataclass.IN, dns.rdatatype.SOA, SOA_TEXT
+ )
+
+ if qctx.qtype == dns.rdatatype.SOA:
+ resp = qctx.response
+ resp.answer.append(soa_rrset)
+ yield DnsResponseSend(resp)
+ return
+
+ if qctx.qtype != dns.rdatatype.AXFR:
+ # Other types: empty NOERROR response.
+ yield DnsResponseSend(qctx.response)
+ return
+
+ # AXFR: opening SOA, NS, NS's A, two SIG RRs at the same owner
+ # with distinct covered types and TTLs, closing SOA.
+ resp = qctx.response
+ resp.answer.append(soa_rrset)
+
+ ns_rrset = dns.rrset.from_text(
+ ZONE, 3600, dns.rdataclass.IN, dns.rdatatype.NS, str(NS_NAME)
+ )
+ resp.answer.append(ns_rrset)
+
+ a_rrset = dns.rrset.from_text(
+ NS_NAME, 3600, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.11"
+ )
+ resp.answer.append(a_rrset)
+
+ sig_a = _make_sig_rdata("A 6 2 600 20260331170000 20260318160000 21831 . 0000")
+ sig_a_rrset = dns.rrset.RRset(HOST, dns.rdataclass.IN, dns.rdatatype.SIG)
+ sig_a_rrset.add(sig_a, ttl=600)
+ resp.answer.append(sig_a_rrset)
+
+ sig_mx = _make_sig_rdata(
+ "MX 6 2 1200 20260331170000 20260318160000 21831 . 0000"
+ )
+ sig_mx_rrset = dns.rrset.RRset(HOST, dns.rdataclass.IN, dns.rdatatype.SIG)
+ sig_mx_rrset.add(sig_mx, ttl=1200)
+ resp.answer.append(sig_mx_rrset)
+
+ # Closing SOA terminates the AXFR.
+ resp.answer.append(soa_rrset)
+
+ yield DnsResponseSend(resp)
+
+
+def main() -> None:
+ server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
+ server.install_response_handler(SigAxfrServer())
+ server.run()
+
+
+if __name__ == "__main__":
+ main()
by the AXFR-based regression test in this file.
"""
+import time
+
import dns.rcode
import dns.rdata
import dns.rdataclass
assert not stored, "SIG record was stored despite REFUSED response"
+def test_sig_covers_preserved_via_axfr(ns6):
+ """Regression test for GL#5818 Finding 1, reached via AXFR.
+
+ ans11 serves an AXFR for sigaxfr.nil. containing two SIG rdatas at
+ the same owner with different covered types (A, MX) and different
+ TTLs (600, 1200). ns6 pulls the zone via dns_diff_load(), which
+ calls diff.c rdata_covers(); before the fix that helper returned 0
+ for SIG, so both tuples were grouped and filed under typepair
+ (SIG, 0) with the first TTL (600) — the MX-covering record's TTL
+ (1200) was silently dropped. With the fix the records land in
+ distinct typepairs and both TTLs survive.
+
+ rndc dumpdb is used to inspect the secondary's stored state
+ directly; the wire-level response can merge same-(owner,type,class)
+ RRs and mask the difference.
+ """
+ zone = "sigaxfr.nil"
+ owner = f"host.{zone}."
+ dump_path = ns6.directory / "named_dump.db"
+
+ # ns6 may have tried to SOA-poll ans11 before it was listening; force
+ # a fresh refresh attempt and wait for the transfer to complete.
+ with ns6.watch_log_from_here() as watcher:
+ ns6.rndc(f"refresh {zone}")
+ watcher.wait_for_line(f"zone {zone}/IN: transferred serial 1")
+
+ # Remove any stale dump and ask named for a fresh one.
+ if dump_path.exists():
+ dump_path.unlink()
+ ns6.rndc("dumpdb -zones")
+
+ # rndc dumpdb is asynchronous; wait for the file and for its
+ # trailing "Dump complete" marker.
+ deadline_marker = "; Dump complete"
+ for _ in range(50):
+ if dump_path.exists():
+ text = dump_path.read_text()
+ if deadline_marker in text:
+ break
+ time.sleep(0.1)
+ else:
+ raise AssertionError(f"{dump_path} never contained {deadline_marker!r}")
+
+ # Collect every SIG line for the owner from the dump. Format is:
+ # <owner>. <ttl> IN SIG <covered> <alg> <labels> ...
+ sig_lines = []
+ for line in text.splitlines():
+ fields = line.split()
+ if len(fields) < 6:
+ continue
+ if not fields[0].lower().startswith("host.sigaxfr.nil"):
+ continue
+ if fields[2] != "IN" or fields[3] != "SIG":
+ continue
+ sig_lines.append(fields)
+
+ assert (
+ len(sig_lines) == 2
+ ), f"expected 2 SIG records at {owner}, got {len(sig_lines)}: {sig_lines}"
+
+ ttl_by_covers = {fields[4]: int(fields[1]) for fields in sig_lines}
+ assert ttl_by_covers == {"A": 600, "MX": 1200}, (
+ f"SIG records lost their covers/TTL binding: {ttl_by_covers}. With "
+ "the Finding 1 bug both records are filed under typepair (SIG, 0) "
+ "and share the first-seen TTL (600)."
+ )
+
+
def test_tcp_self_nxt_record(ns6):
"""NXT (type 30) updates must be refused at the front door.