Add a new "dnssec_wildcard_additional" system test.
Co-Authored-By: Evan Hunt <each@isc.org>
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from collections.abc import AsyncGenerator
+from dataclasses import dataclass
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+
+import dns.dnssec
+import dns.flags
+import dns.message
+import dns.name
+import dns.rdata
+import dns.rdataclass
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
+
+from isctest.asyncserver import (
+ AsyncDnsServer,
+ DnsResponseSend,
+ QueryContext,
+ ResponseHandler,
+)
+
+TTL = 300
+ZONE = "f043.test."
+QUERY = f"svc.{ZONE}"
+VICTIM = f"victim.{ZONE}"
+WILDCARD = f"*.{ZONE}"
+FORGED_A = "198.51.100.90"
+LEGIT_A = "192.0.2.50"
+
+
+@dataclass(frozen=True)
+class Key:
+ zone: dns.name.Name
+ private_key: object
+ dnskey: dns.rdata.Rdata
+
+
+def name(text: str) -> dns.name.Name:
+ return dns.name.from_text(text)
+
+
+def load_key() -> Key:
+ path = Path(__file__).resolve().parent / "keys.json"
+ with path.open(encoding="utf-8") as keys_file:
+ raw_key = json.load(keys_file)[ZONE]
+
+ private_key = serialization.load_pem_private_key(
+ raw_key["private_pem"].encode("ascii"),
+ password=None,
+ )
+ dnskey = dns.rdata.from_text(
+ dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
+ )
+ return Key(name(ZONE), private_key, dnskey)
+
+
+def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
+ return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
+
+
+def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
+ return dns.rrset.from_rdata(name(owner), TTL, rdata)
+
+
+def add_signed(
+ section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
+) -> None:
+ rrsig = dns.dnssec.sign(
+ covered,
+ signer.private_key,
+ signer.zone,
+ signer.dnskey,
+ lifetime=86400,
+ verify=True,
+ )
+ section.append(covered)
+ section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
+
+
+def soa_rrset() -> dns.rrset.RRset:
+ return rrset(
+ ZONE,
+ dns.rdatatype.SOA,
+ f"ns.{ZONE} hostmaster.{ZONE} 1 3600 600 86400 300",
+ )
+
+
+def add_dnskey(response: dns.message.Message, key: Key) -> None:
+ add_signed(response.answer, rrset_from_rdata(ZONE, key.dnskey), key)
+
+
+def wildcard_rrsig(owner: str, key: Key) -> dns.rrset.RRset:
+ wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
+ rrsig = dns.dnssec.sign(
+ wildcard,
+ key.private_key,
+ key.zone,
+ key.dnskey,
+ lifetime=86400,
+ verify=True,
+ )
+ return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
+
+
+def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None:
+ section.append(rrset(owner, dns.rdatatype.A, FORGED_A))
+ section.append(wildcard_rrsig(owner, key))
+
+
+class FromWildcardAdditionalHandler(ResponseHandler):
+ def __init__(self, key: Key) -> None:
+ self.key = key
+ self.zone = name(ZONE)
+ self.query = name(QUERY)
+ self.victim = name(VICTIM)
+
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.qname.is_subdomain(self.zone)
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ qctx.prepare_new_response(with_zone_data=False)
+ qctx.response.flags |= dns.flags.AA
+ qctx.response.set_rcode(dns.rcode.NOERROR)
+
+ if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY:
+ add_dnskey(qctx.response, self.key)
+ elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA:
+ add_signed(qctx.response.answer, soa_rrset(), self.key)
+ elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+ add_signed(
+ qctx.response.answer,
+ rrset(QUERY, dns.rdatatype.MX, f"10 {VICTIM}"),
+ self.key,
+ )
+ add_wildcard_a(qctx.response.additional, VICTIM, self.key)
+ elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A:
+ add_signed(
+ qctx.response.answer,
+ rrset(VICTIM, dns.rdatatype.A, LEGIT_A),
+ self.key,
+ )
+ else:
+ add_signed(qctx.response.authority, soa_rrset(), self.key)
+
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+def main() -> None:
+ server = AsyncDnsServer(default_aa=True)
+ server.install_response_handlers(FromWildcardAdditionalHandler(load_key()))
+ server.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+// validating resolver
+
+options {
+ query-source address 10.53.0.2;
+ notify-source 10.53.0.2;
+ transfer-source 10.53.0.2;
+ port @PORT@;
+ pid-file "named.pid";
+ listen-on { 10.53.0.2; };
+ listen-on-v6 { none; };
+ recursion yes;
+ dnssec-validation yes;
+ minimal-responses no;
+};
+
+controls {
+ inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
+};
+
+include "../../_common/rndc.key";
+
+zone "." {
+ type hint;
+ file "../../_common/root.hint";
+};
+
+zone "f043.test" {
+ type static-stub;
+ server-addresses { 10.53.0.1; };
+};
+
+trust-anchors {
+ f043.test. static-key 257 3 13 "@ZONE_DNSKEY@";
+};
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ec
+
+import dns.dnssec
+import dns.name
+import dns.rdataclass
+import dns.rdatatype
+import pytest
+
+import isctest
+
+ZONE = "f043.test."
+QUERY = f"svc.{ZONE}"
+VICTIM = f"victim.{ZONE}"
+FORGED_A = "198.51.100.90"
+LEGIT_A = "192.0.2.50"
+AUTH = "10.53.0.1"
+RESOLVER = "10.53.0.2"
+
+pytestmark = pytest.mark.extra_artifacts(
+ [
+ "ans*/ans.run",
+ "ans*/keys.json",
+ ]
+)
+
+
+def _make_key():
+ private_key = ec.generate_private_key(ec.SECP256R1())
+ dnskey = dns.dnssec.make_dnskey(
+ private_key.public_key(),
+ algorithm="ECDSAP256SHA256",
+ flags=257,
+ )
+ private_pem = private_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ ).decode("ascii")
+ return {
+ "private_pem": private_pem,
+ "dnskey": dnskey.to_text(),
+ }
+
+
+def bootstrap():
+ keys = {ZONE: _make_key()}
+ Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii")
+ zone_dnskey = "".join(keys[ZONE]["dnskey"].split()[3:])
+ return {"ZONE_DNSKEY": zone_dnskey}
+
+
+def _query(server, qname, qtype, cd=False):
+ query = isctest.query.create(qname, qtype, cd=cd)
+ return isctest.query.tcp(query, server)
+
+
+def _rrset(response, section, owner, rdtype, covers=None):
+ if covers is None:
+ return response.get_rrset(
+ section, dns.name.from_text(owner), dns.rdataclass.IN, rdtype
+ )
+ return response.get_rrset(
+ section,
+ dns.name.from_text(owner),
+ dns.rdataclass.IN,
+ rdtype,
+ covers=covers,
+ )
+
+
+def _has_a(response, section, owner, address):
+ rrset = _rrset(response, section, owner, dns.rdatatype.A)
+ return rrset is not None and any(rdata.address == address for rdata in rrset)
+
+
+def _check_rrsig(response, section, owner, rdtype, signer, labels=None):
+ rrsig = _rrset(response, section, owner, dns.rdatatype.RRSIG, covers=rdtype)
+ assert rrsig is not None, response.to_text()
+ assert rrsig[0].signer == dns.name.from_text(signer), response.to_text()
+ if labels is not None:
+ assert rrsig[0].labels == labels, response.to_text()
+
+
+def test_direct_fromwildcard_additional_fixture():
+ carrier = _query(AUTH, QUERY, "MX")
+ isctest.check.noerror(carrier)
+ assert _rrset(carrier, carrier.answer, QUERY, dns.rdatatype.MX)
+ assert _has_a(carrier, carrier.additional, VICTIM, FORGED_A), carrier.to_text()
+ _check_rrsig(
+ carrier,
+ carrier.additional,
+ VICTIM,
+ dns.rdatatype.A,
+ ZONE,
+ labels=2,
+ )
+
+
+def test_resolver_rejects_fromwildcard_additional_replay():
+ soa = _query(RESOLVER, ZONE, "SOA")
+ isctest.check.noerror(soa)
+ isctest.check.adflag(soa)
+
+ carrier = _query(RESOLVER, QUERY, "MX", cd=True)
+ isctest.check.noerror(carrier)
+
+ response = _query(RESOLVER, VICTIM, "A")
+ isctest.check.noerror(response)
+ isctest.check.adflag(response)
+ assert not _has_a(response, response.answer, VICTIM, FORGED_A), response.to_text()
+ assert _has_a(response, response.answer, VICTIM, LEGIT_A), response.to_text()
+ _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, ZONE)