From: Alessio Podda Date: Wed, 3 Jun 2026 14:27:18 +0000 (+0200) Subject: Reproducer for #5966 replay parent wildcard X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=1d4d3e09adddd58310870f7de9f796470fc78ec5;p=thirdparty%2Fbind9.git Reproducer for #5966 replay parent wildcard Add a new "dnssec_replayed_parent_wildcard" system test. Co-Authored-By: Matthijs Mekking --- diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py b/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py new file mode 100644 index 00000000000..0002e61aade --- /dev/null +++ b/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py @@ -0,0 +1,220 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from pathlib import Path + +import json + +from cryptography.hazmat.primitives import serialization + +import dns.dnssec +import dns.flags +import dns.message +import dns.name +import dns.rcode +import dns.rdata +import dns.rdataclass +import dns.rdatatype +import dns.rrset + +from isctest.asyncserver import ( + AsyncDnsServer, + DnsResponseSend, + QueryContext, + ResponseHandler, +) + +# P3: A signed zone P, chained to a configured trust anchor, that +# (a) publishes a wildcard *.P A or AAAA record and +# (b) delegates at least one separately-signed child C.P (own DS in P) +# operated by a distinct principal. +TTL = 300 +PARENT = "parent.hack." +CHILD = f"child.{PARENT}" +QUERY = f"q.{PARENT}" +SERVICE = f"svc.{CHILD}" +WILDCARD = f"*.{PARENT}" + +FORGED_A = "198.51.100.45" +LEGIT_A = "192.0.2.113" + + +@dataclass(frozen=True) +class Key: + zone: dns.name.Name + private_key: object + dnskey: dns.rdata.Rdata + ds: dns.rdata.Rdata + + +def name(text: str) -> dns.name.Name: + return dns.name.from_text(text) + + +def load_keys() -> dict[str, Key]: + path = Path(__file__).resolve().parent / "keys.json" + with path.open(encoding="utf-8") as keys_file: + raw_keys = json.load(keys_file) + + keys = {} + for zone, raw_key in raw_keys.items(): + private_key = serialization.load_pem_private_key( + raw_key["private_pem"].encode("ascii"), + password=None, + ) + dnskey = dns.rdata.from_text( + dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"] + ) + ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"]) + keys[zone] = Key(name(zone), private_key, dnskey, ds) + + return keys + + +def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset: + return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas) + + +def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset: + return dns.rrset.from_rdata(name(owner), TTL, rdata) + + +def add_signed( + section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key +) -> None: + rrsig = dns.dnssec.sign( + covered, + signer.private_key, + signer.zone, + signer.dnskey, + lifetime=86400, + verify=True, + ) + section.append(covered) + section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig)) + + +def soa_rrset(zone: str) -> dns.rrset.RRset: + return rrset( + zone, + dns.rdatatype.SOA, + f"ns1.{zone} hostmaster.{zone} 1 3600 600 86400 300", + ) + + +def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None: + add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key) + + +def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None: + add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent) + + +def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None: + add_signed(response.authority, soa_rrset(zone), key) + + +def wildcard_rrsig(owner: str, parent: Key) -> dns.rrset.RRset: + wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A) + rrsig = dns.dnssec.sign( + wildcard, + parent.private_key, + parent.zone, + parent.dnskey, + lifetime=86400, + verify=True, + ) + return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig) + + +def add_parent_mx_with_forged_additional( + response: dns.message.Message, parent: Key +) -> None: + add_signed( + response.answer, + rrset(QUERY, dns.rdatatype.MX, f"10 {SERVICE}"), + parent, + ) + response.additional.append(rrset(SERVICE, dns.rdatatype.A, FORGED_A)) + response.additional.append(wildcard_rrsig(SERVICE, parent)) + + +def add_child_a(response: dns.message.Message, child: Key) -> None: + add_signed(response.answer, rrset(SERVICE, dns.rdatatype.A, LEGIT_A), child) + + +class QueryCParentWildcardHandler(ResponseHandler): + def __init__(self, keys: dict[str, Key]) -> None: + self.keys = keys + self.parent = name(PARENT) + self.child = name(CHILD) + self.query = name(QUERY) + self.service = name(SERVICE) + + def match(self, qctx: QueryContext) -> bool: + return True + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + qctx.prepare_new_response(with_zone_data=False) + qctx.response.flags |= dns.flags.AA + qctx.response.set_rcode(dns.rcode.NOERROR) + + parent_key = self.keys[PARENT] + child_key = self.keys[CHILD] + + if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY: + # Priming, DNSKEY RRset + add_dnskey(qctx.response, PARENT, parent_key) + elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA: + # Priming, SOA RRset + add_signed(qctx.response.answer, soa_rrset(PARENT), parent_key) + elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: + # Trigger query. + add_parent_mx_with_forged_additional(qctx.response, parent_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS: + # Chain of trust, DS of child. + add_ds(qctx.response, CHILD, child_key, parent_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY: + # Chain of trust, DNSKEY of child. + add_dnskey(qctx.response, CHILD, child_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA: + # SOA of child. + add_signed(qctx.response.answer, soa_rrset(CHILD), child_key) + elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A: + # Zone data at child. + add_child_a(qctx.response, child_key) + elif qctx.qname.is_subdomain(self.child): + # No data at child. + add_nodata(qctx.response, CHILD, child_key) + elif qctx.qname.is_subdomain(self.parent): + # No data at parent. + add_nodata(qctx.response, PARENT, parent_key) + else: + qctx.response.set_rcode(dns.rcode.NXDOMAIN) + + yield DnsResponseSend(qctx.response, authoritative=True) + + +def main() -> None: + keys = load_keys() + server = AsyncDnsServer(default_aa=True) + server.install_response_handlers(QueryCParentWildcardHandler(keys)) + server.run() + + +if __name__ == "__main__": + main() diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 b/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 new file mode 100644 index 00000000000..12726a9789d --- /dev/null +++ b/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 @@ -0,0 +1,37 @@ +// validating resolver + +options { + query-source address 10.53.0.2; + notify-source 10.53.0.2; + transfer-source 10.53.0.2; + port @PORT@; + pid-file "named.pid"; + listen-on { 10.53.0.2; }; + listen-on-v6 { none; }; + recursion yes; + + // P1: named runs with dnssec-validation auto or yes + dnssec-validation yes; + // P2: minimal-responses is not set to the explicit value yes + minimal-responses no; +}; + +controls { + inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; }; +}; + +include "../../_common/rndc.key"; + +zone "." { + type hint; + file "../../_common/root.hint"; +}; + +zone "parent.hack" { + type static-stub; + server-addresses { 10.53.0.1; }; +}; + +trust-anchors { + parent.hack. static-key 257 3 13 "@PARENT_DNSKEY@"; +}; diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py b/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py new file mode 100644 index 00000000000..0b89b8ceb23 --- /dev/null +++ b/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py @@ -0,0 +1,171 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +from pathlib import Path + +import json + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec + +import dns.dnssec +import dns.flags +import dns.name +import dns.rdataclass +import dns.rdatatype +import pytest + +import isctest + +PARENT = "parent.hack." +CHILD = f"child.{PARENT}" +QUERY = f"q.{PARENT}" +SERVICE = f"svc.{CHILD}" +FORGED_A = "198.51.100.45" +LEGIT_A = "192.0.2.113" +AUTH = "10.53.0.1" +RESOLVER = "10.53.0.2" + +pytestmark = pytest.mark.extra_artifacts( + [ + "ans1/ans.run", + "ans1/keys.json", + ] +) + + +def _make_key(zone): + private_key = ec.generate_private_key(ec.SECP256R1()) + dnskey = dns.dnssec.make_dnskey( + private_key.public_key(), + algorithm="ECDSAP256SHA256", + flags=257, + ) + ds = dns.dnssec.make_ds(dns.name.from_text(zone), dnskey, "SHA256") + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("ascii") + return { + "private_pem": private_pem, + "dnskey": dnskey.to_text(), + "ds": ds.to_text(), + } + + +def bootstrap(): + keys = {zone: _make_key(zone) for zone in [PARENT, CHILD]} + + Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii") + + parent_dnskey = "".join(keys[PARENT]["dnskey"].split()[3:]) + return {"PARENT_DNSKEY": parent_dnskey} + + +def _query(server, qname, qtype, cd=False): + query = isctest.query.create(qname, qtype, cd=cd) + return isctest.query.tcp(query, server) + + +def _rrset(response, section, owner, rdtype, covers=None): + if covers is None: + return response.get_rrset( + section, + dns.name.from_text(owner), + dns.rdataclass.IN, + rdtype, + ) + return response.get_rrset( + section, + dns.name.from_text(owner), + dns.rdataclass.IN, + rdtype, + covers=covers, + ) + + +def _has_a(response, section, owner, address): + rrset = _rrset(response, section, owner, dns.rdatatype.A) + return rrset is not None and any(rdata.address == address for rdata in rrset) + + +def _check_signed_rrset(response, section, owner, rdtype, signer, labels=None): + rrsig = _rrset( + response, + section, + owner, + dns.rdatatype.RRSIG, + covers=rdtype, + ) + assert rrsig is not None, response.to_text() + assert rrsig[0].signer == dns.name.from_text(signer), response.to_text() + if labels is not None: + assert rrsig[0].labels == labels, response.to_text() + + +def prime_parent_soa(): + response = _query(RESOLVER, PARENT, "SOA") + isctest.check.noerror(response) + isctest.check.adflag(response) + assert _rrset(response, response.answer, PARENT, dns.rdatatype.SOA) is not None + _check_signed_rrset(response, response.answer, PARENT, dns.rdatatype.SOA, PARENT) + + +def test_malicious_replay(): + # Trigger query. + response = _query(AUTH, QUERY, "MX") + isctest.check.noerror(response) + isctest.check.aaflag(response) + + assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX) + _check_signed_rrset(response, response.answer, QUERY, dns.rdatatype.MX, PARENT) + + # The reply carries in ADDITIONAL. Note Labels=2, signer=parent.hack. + assert _has_a(response, response.additional, SERVICE, FORGED_A), response.to_text() + _check_signed_rrset( + response, + response.additional, + SERVICE, + dns.rdatatype.A, + signer=PARENT, + labels=2, + ) + + # Victim query — any later client, with CD=0: + child = _query(AUTH, SERVICE, "A") + isctest.check.noerror(child) + + assert _has_a(child, child.answer, SERVICE, LEGIT_A), child.to_text() + assert not _has_a(child, child.answer, SERVICE, FORGED_A), child.to_text() + _check_signed_rrset(child, child.answer, SERVICE, dns.rdatatype.A, CHILD) + + +def test_replayed_parent_wildcard(): + # Prime the parent's DNSKEY. + prime_parent_soa() + + # Trigger query — the on-path injection happens on the upstream side of + # this single fetch. Requires CD=1. + response = _query(RESOLVER, QUERY, "MX", cd=True) + isctest.check.noerror(response) + assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX) + + # Victim query — any later client, with CD=0: + response = _query(RESOLVER, SERVICE, "A") + isctest.check.noerror(response) + isctest.check.adflag(response) + + assert not _has_a(response, response.answer, SERVICE, FORGED_A) + assert _has_a(response, response.answer, SERVICE, LEGIT_A), response.to_text() + _check_signed_rrset(response, response.answer, SERVICE, dns.rdatatype.A, CHILD)