From: Alessio Podda Date: Fri, 5 Jun 2026 10:36:12 +0000 (+0200) Subject: Reproducer for #5972 DNS_R_FROMWILDCARD accepted X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=4466bbcb4a94b3f86be01dd844caff7e1b73ad6e;p=thirdparty%2Fbind9.git Reproducer for #5972 DNS_R_FROMWILDCARD accepted Add a new "dnssec_wildcard_additional" system test. Co-Authored-By: Evan Hunt --- diff --git a/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py b/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py new file mode 100644 index 00000000000..8c54872c027 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py @@ -0,0 +1,167 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from pathlib import Path + +import json + +from cryptography.hazmat.primitives import serialization + +import dns.dnssec +import dns.flags +import dns.message +import dns.name +import dns.rdata +import dns.rdataclass +import dns.rcode +import dns.rdatatype +import dns.rrset + +from isctest.asyncserver import ( + AsyncDnsServer, + DnsResponseSend, + QueryContext, + ResponseHandler, +) + +TTL = 300 +ZONE = "f043.test." +QUERY = f"svc.{ZONE}" +VICTIM = f"victim.{ZONE}" +WILDCARD = f"*.{ZONE}" +FORGED_A = "198.51.100.90" +LEGIT_A = "192.0.2.50" + + +@dataclass(frozen=True) +class Key: + zone: dns.name.Name + private_key: object + dnskey: dns.rdata.Rdata + + +def name(text: str) -> dns.name.Name: + return dns.name.from_text(text) + + +def load_key() -> Key: + path = Path(__file__).resolve().parent / "keys.json" + with path.open(encoding="utf-8") as keys_file: + raw_key = json.load(keys_file)[ZONE] + + private_key = serialization.load_pem_private_key( + raw_key["private_pem"].encode("ascii"), + password=None, + ) + dnskey = dns.rdata.from_text( + dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"] + ) + return Key(name(ZONE), private_key, dnskey) + + +def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset: + return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas) + + +def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset: + return dns.rrset.from_rdata(name(owner), TTL, rdata) + + +def add_signed( + section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key +) -> None: + rrsig = dns.dnssec.sign( + covered, + signer.private_key, + signer.zone, + signer.dnskey, + lifetime=86400, + verify=True, + ) + section.append(covered) + section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig)) + + +def soa_rrset() -> dns.rrset.RRset: + return rrset( + ZONE, + dns.rdatatype.SOA, + f"ns.{ZONE} hostmaster.{ZONE} 1 3600 600 86400 300", + ) + + +def add_dnskey(response: dns.message.Message, key: Key) -> None: + add_signed(response.answer, rrset_from_rdata(ZONE, key.dnskey), key) + + +def wildcard_rrsig(owner: str, key: Key) -> dns.rrset.RRset: + wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A) + rrsig = dns.dnssec.sign( + wildcard, + key.private_key, + key.zone, + key.dnskey, + lifetime=86400, + verify=True, + ) + return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig) + + +def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None: + section.append(rrset(owner, dns.rdatatype.A, FORGED_A)) + section.append(wildcard_rrsig(owner, key)) + + +class FromWildcardAdditionalHandler(ResponseHandler): + def __init__(self, key: Key) -> None: + self.key = key + self.zone = name(ZONE) + self.query = name(QUERY) + self.victim = name(VICTIM) + + def match(self, qctx: QueryContext) -> bool: + return qctx.qname.is_subdomain(self.zone) + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + qctx.prepare_new_response(with_zone_data=False) + qctx.response.flags |= dns.flags.AA + qctx.response.set_rcode(dns.rcode.NOERROR) + + if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY: + add_dnskey(qctx.response, self.key) + elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA: + add_signed(qctx.response.answer, soa_rrset(), self.key) + elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: + add_signed( + qctx.response.answer, + rrset(QUERY, dns.rdatatype.MX, f"10 {VICTIM}"), + self.key, + ) + add_wildcard_a(qctx.response.additional, VICTIM, self.key) + elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A: + add_signed( + qctx.response.answer, + rrset(VICTIM, dns.rdatatype.A, LEGIT_A), + self.key, + ) + else: + add_signed(qctx.response.authority, soa_rrset(), self.key) + + yield DnsResponseSend(qctx.response, authoritative=True) + + +def main() -> None: + server = AsyncDnsServer(default_aa=True) + server.install_response_handlers(FromWildcardAdditionalHandler(load_key())) + server.run() + + +if __name__ == "__main__": + main() diff --git a/bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 b/bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 new file mode 100644 index 00000000000..fa7da5c6904 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 @@ -0,0 +1,34 @@ +// validating resolver + +options { + query-source address 10.53.0.2; + notify-source 10.53.0.2; + transfer-source 10.53.0.2; + port @PORT@; + pid-file "named.pid"; + listen-on { 10.53.0.2; }; + listen-on-v6 { none; }; + recursion yes; + dnssec-validation yes; + minimal-responses no; +}; + +controls { + inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; }; +}; + +include "../../_common/rndc.key"; + +zone "." { + type hint; + file "../../_common/root.hint"; +}; + +zone "f043.test" { + type static-stub; + server-addresses { 10.53.0.1; }; +}; + +trust-anchors { + f043.test. static-key 257 3 13 "@ZONE_DNSKEY@"; +}; diff --git a/bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py b/bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py new file mode 100644 index 00000000000..eb38cef1545 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py @@ -0,0 +1,123 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from pathlib import Path + +import json + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec + +import dns.dnssec +import dns.name +import dns.rdataclass +import dns.rdatatype +import pytest + +import isctest + +ZONE = "f043.test." +QUERY = f"svc.{ZONE}" +VICTIM = f"victim.{ZONE}" +FORGED_A = "198.51.100.90" +LEGIT_A = "192.0.2.50" +AUTH = "10.53.0.1" +RESOLVER = "10.53.0.2" + +pytestmark = pytest.mark.extra_artifacts( + [ + "ans*/ans.run", + "ans*/keys.json", + ] +) + + +def _make_key(): + private_key = ec.generate_private_key(ec.SECP256R1()) + dnskey = dns.dnssec.make_dnskey( + private_key.public_key(), + algorithm="ECDSAP256SHA256", + flags=257, + ) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("ascii") + return { + "private_pem": private_pem, + "dnskey": dnskey.to_text(), + } + + +def bootstrap(): + keys = {ZONE: _make_key()} + Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii") + zone_dnskey = "".join(keys[ZONE]["dnskey"].split()[3:]) + return {"ZONE_DNSKEY": zone_dnskey} + + +def _query(server, qname, qtype, cd=False): + query = isctest.query.create(qname, qtype, cd=cd) + return isctest.query.tcp(query, server) + + +def _rrset(response, section, owner, rdtype, covers=None): + if covers is None: + return response.get_rrset( + section, dns.name.from_text(owner), dns.rdataclass.IN, rdtype + ) + return response.get_rrset( + section, + dns.name.from_text(owner), + dns.rdataclass.IN, + rdtype, + covers=covers, + ) + + +def _has_a(response, section, owner, address): + rrset = _rrset(response, section, owner, dns.rdatatype.A) + return rrset is not None and any(rdata.address == address for rdata in rrset) + + +def _check_rrsig(response, section, owner, rdtype, signer, labels=None): + rrsig = _rrset(response, section, owner, dns.rdatatype.RRSIG, covers=rdtype) + assert rrsig is not None, response.to_text() + assert rrsig[0].signer == dns.name.from_text(signer), response.to_text() + if labels is not None: + assert rrsig[0].labels == labels, response.to_text() + + +def test_direct_fromwildcard_additional_fixture(): + carrier = _query(AUTH, QUERY, "MX") + isctest.check.noerror(carrier) + assert _rrset(carrier, carrier.answer, QUERY, dns.rdatatype.MX) + assert _has_a(carrier, carrier.additional, VICTIM, FORGED_A), carrier.to_text() + _check_rrsig( + carrier, + carrier.additional, + VICTIM, + dns.rdatatype.A, + ZONE, + labels=2, + ) + + +def test_resolver_rejects_fromwildcard_additional_replay(): + soa = _query(RESOLVER, ZONE, "SOA") + isctest.check.noerror(soa) + isctest.check.adflag(soa) + + carrier = _query(RESOLVER, QUERY, "MX", cd=True) + isctest.check.noerror(carrier) + + response = _query(RESOLVER, VICTIM, "A") + isctest.check.noerror(response) + isctest.check.adflag(response) + assert not _has_a(response, response.answer, VICTIM, FORGED_A), response.to_text() + assert _has_a(response, response.answer, VICTIM, LEGIT_A), response.to_text() + _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, ZONE)