]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Reproducer for #5966 replay parent wildcard
authorAlessio Podda <alessio@isc.org>
Wed, 3 Jun 2026 14:27:18 +0000 (16:27 +0200)
committerEvan Hunt <each@isc.org>
Wed, 17 Jun 2026 02:07:06 +0000 (19:07 -0700)
Add a new "dnssec_replayed_parent_wildcard" system test.

Co-Authored-By: Matthijs Mekking <matthijs@isc.org>
bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py [new file with mode: 0644]
bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 [new file with mode: 0644]
bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py [new file with mode: 0644]

diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py b/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py
new file mode 100644 (file)
index 0000000..0002e61
--- /dev/null
@@ -0,0 +1,220 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from collections.abc import AsyncGenerator
+from dataclasses import dataclass
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+
+import dns.dnssec
+import dns.flags
+import dns.message
+import dns.name
+import dns.rcode
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+from isctest.asyncserver import (
+    AsyncDnsServer,
+    DnsResponseSend,
+    QueryContext,
+    ResponseHandler,
+)
+
+# P3: A signed zone P, chained to a configured trust anchor, that
+# (a) publishes a wildcard *.P A or AAAA record and
+# (b) delegates at least one separately-signed child C.P (own DS in P)
+#     operated by a distinct principal.
+TTL = 300
+PARENT = "parent.hack."
+CHILD = f"child.{PARENT}"
+QUERY = f"q.{PARENT}"
+SERVICE = f"svc.{CHILD}"
+WILDCARD = f"*.{PARENT}"
+
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+
+
+@dataclass(frozen=True)
+class Key:
+    zone: dns.name.Name
+    private_key: object
+    dnskey: dns.rdata.Rdata
+    ds: dns.rdata.Rdata
+
+
+def name(text: str) -> dns.name.Name:
+    return dns.name.from_text(text)
+
+
+def load_keys() -> dict[str, Key]:
+    path = Path(__file__).resolve().parent / "keys.json"
+    with path.open(encoding="utf-8") as keys_file:
+        raw_keys = json.load(keys_file)
+
+    keys = {}
+    for zone, raw_key in raw_keys.items():
+        private_key = serialization.load_pem_private_key(
+            raw_key["private_pem"].encode("ascii"),
+            password=None,
+        )
+        dnskey = dns.rdata.from_text(
+            dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
+        )
+        ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"])
+        keys[zone] = Key(name(zone), private_key, dnskey, ds)
+
+    return keys
+
+
+def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
+    return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
+
+
+def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
+    return dns.rrset.from_rdata(name(owner), TTL, rdata)
+
+
+def add_signed(
+    section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
+) -> None:
+    rrsig = dns.dnssec.sign(
+        covered,
+        signer.private_key,
+        signer.zone,
+        signer.dnskey,
+        lifetime=86400,
+        verify=True,
+    )
+    section.append(covered)
+    section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
+
+
+def soa_rrset(zone: str) -> dns.rrset.RRset:
+    return rrset(
+        zone,
+        dns.rdatatype.SOA,
+        f"ns1.{zone} hostmaster.{zone} 1 3600 600 86400 300",
+    )
+
+
+def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None:
+    add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key)
+
+
+def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None:
+    add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent)
+
+
+def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None:
+    add_signed(response.authority, soa_rrset(zone), key)
+
+
+def wildcard_rrsig(owner: str, parent: Key) -> dns.rrset.RRset:
+    wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
+    rrsig = dns.dnssec.sign(
+        wildcard,
+        parent.private_key,
+        parent.zone,
+        parent.dnskey,
+        lifetime=86400,
+        verify=True,
+    )
+    return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
+
+
+def add_parent_mx_with_forged_additional(
+    response: dns.message.Message, parent: Key
+) -> None:
+    add_signed(
+        response.answer,
+        rrset(QUERY, dns.rdatatype.MX, f"10 {SERVICE}"),
+        parent,
+    )
+    response.additional.append(rrset(SERVICE, dns.rdatatype.A, FORGED_A))
+    response.additional.append(wildcard_rrsig(SERVICE, parent))
+
+
+def add_child_a(response: dns.message.Message, child: Key) -> None:
+    add_signed(response.answer, rrset(SERVICE, dns.rdatatype.A, LEGIT_A), child)
+
+
+class QueryCParentWildcardHandler(ResponseHandler):
+    def __init__(self, keys: dict[str, Key]) -> None:
+        self.keys = keys
+        self.parent = name(PARENT)
+        self.child = name(CHILD)
+        self.query = name(QUERY)
+        self.service = name(SERVICE)
+
+    def match(self, qctx: QueryContext) -> bool:
+        return True
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        qctx.prepare_new_response(with_zone_data=False)
+        qctx.response.flags |= dns.flags.AA
+        qctx.response.set_rcode(dns.rcode.NOERROR)
+
+        parent_key = self.keys[PARENT]
+        child_key = self.keys[CHILD]
+
+        if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY:
+            # Priming, DNSKEY RRset
+            add_dnskey(qctx.response, PARENT, parent_key)
+        elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA:
+            # Priming, SOA RRset
+            add_signed(qctx.response.answer, soa_rrset(PARENT), parent_key)
+        elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+            # Trigger query.
+            add_parent_mx_with_forged_additional(qctx.response, parent_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS:
+            # Chain of trust, DS of child.
+            add_ds(qctx.response, CHILD, child_key, parent_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY:
+            # Chain of trust, DNSKEY of child.
+            add_dnskey(qctx.response, CHILD, child_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA:
+            # SOA of child.
+            add_signed(qctx.response.answer, soa_rrset(CHILD), child_key)
+        elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A:
+            # Zone data at child.
+            add_child_a(qctx.response, child_key)
+        elif qctx.qname.is_subdomain(self.child):
+            # No data at child.
+            add_nodata(qctx.response, CHILD, child_key)
+        elif qctx.qname.is_subdomain(self.parent):
+            # No data at parent.
+            add_nodata(qctx.response, PARENT, parent_key)
+        else:
+            qctx.response.set_rcode(dns.rcode.NXDOMAIN)
+
+        yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+def main() -> None:
+    keys = load_keys()
+    server = AsyncDnsServer(default_aa=True)
+    server.install_response_handlers(QueryCParentWildcardHandler(keys))
+    server.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 b/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2
new file mode 100644 (file)
index 0000000..12726a9
--- /dev/null
@@ -0,0 +1,37 @@
+// validating resolver
+
+options {
+       query-source address 10.53.0.2;
+       notify-source 10.53.0.2;
+       transfer-source 10.53.0.2;
+       port @PORT@;
+       pid-file "named.pid";
+       listen-on { 10.53.0.2; };
+       listen-on-v6 { none; };
+       recursion yes;
+
+       // P1: named runs with dnssec-validation auto or yes
+       dnssec-validation yes;
+       // P2: minimal-responses is not set to the explicit value yes
+       minimal-responses no;
+};
+
+controls {
+       inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
+};
+
+include "../../_common/rndc.key";
+
+zone "." {
+       type hint;
+       file "../../_common/root.hint";
+};
+
+zone "parent.hack" {
+       type static-stub;
+       server-addresses { 10.53.0.1; };
+};
+
+trust-anchors {
+       parent.hack. static-key 257 3 13 "@PARENT_DNSKEY@";
+};
diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py b/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py
new file mode 100644 (file)
index 0000000..0b89b8c
--- /dev/null
@@ -0,0 +1,171 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ec
+
+import dns.dnssec
+import dns.flags
+import dns.name
+import dns.rdataclass
+import dns.rdatatype
+import pytest
+
+import isctest
+
+PARENT = "parent.hack."
+CHILD = f"child.{PARENT}"
+QUERY = f"q.{PARENT}"
+SERVICE = f"svc.{CHILD}"
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+AUTH = "10.53.0.1"
+RESOLVER = "10.53.0.2"
+
+pytestmark = pytest.mark.extra_artifacts(
+    [
+        "ans1/ans.run",
+        "ans1/keys.json",
+    ]
+)
+
+
+def _make_key(zone):
+    private_key = ec.generate_private_key(ec.SECP256R1())
+    dnskey = dns.dnssec.make_dnskey(
+        private_key.public_key(),
+        algorithm="ECDSAP256SHA256",
+        flags=257,
+    )
+    ds = dns.dnssec.make_ds(dns.name.from_text(zone), dnskey, "SHA256")
+    private_pem = private_key.private_bytes(
+        encoding=serialization.Encoding.PEM,
+        format=serialization.PrivateFormat.PKCS8,
+        encryption_algorithm=serialization.NoEncryption(),
+    ).decode("ascii")
+    return {
+        "private_pem": private_pem,
+        "dnskey": dnskey.to_text(),
+        "ds": ds.to_text(),
+    }
+
+
+def bootstrap():
+    keys = {zone: _make_key(zone) for zone in [PARENT, CHILD]}
+
+    Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii")
+
+    parent_dnskey = "".join(keys[PARENT]["dnskey"].split()[3:])
+    return {"PARENT_DNSKEY": parent_dnskey}
+
+
+def _query(server, qname, qtype, cd=False):
+    query = isctest.query.create(qname, qtype, cd=cd)
+    return isctest.query.tcp(query, server)
+
+
+def _rrset(response, section, owner, rdtype, covers=None):
+    if covers is None:
+        return response.get_rrset(
+            section,
+            dns.name.from_text(owner),
+            dns.rdataclass.IN,
+            rdtype,
+        )
+    return response.get_rrset(
+        section,
+        dns.name.from_text(owner),
+        dns.rdataclass.IN,
+        rdtype,
+        covers=covers,
+    )
+
+
+def _has_a(response, section, owner, address):
+    rrset = _rrset(response, section, owner, dns.rdatatype.A)
+    return rrset is not None and any(rdata.address == address for rdata in rrset)
+
+
+def _check_signed_rrset(response, section, owner, rdtype, signer, labels=None):
+    rrsig = _rrset(
+        response,
+        section,
+        owner,
+        dns.rdatatype.RRSIG,
+        covers=rdtype,
+    )
+    assert rrsig is not None, response.to_text()
+    assert rrsig[0].signer == dns.name.from_text(signer), response.to_text()
+    if labels is not None:
+        assert rrsig[0].labels == labels, response.to_text()
+
+
+def prime_parent_soa():
+    response = _query(RESOLVER, PARENT, "SOA")
+    isctest.check.noerror(response)
+    isctest.check.adflag(response)
+    assert _rrset(response, response.answer, PARENT, dns.rdatatype.SOA) is not None
+    _check_signed_rrset(response, response.answer, PARENT, dns.rdatatype.SOA, PARENT)
+
+
+def test_malicious_replay():
+    # Trigger query.
+    response = _query(AUTH, QUERY, "MX")
+    isctest.check.noerror(response)
+    isctest.check.aaflag(response)
+
+    assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX)
+    _check_signed_rrset(response, response.answer, QUERY, dns.rdatatype.MX, PARENT)
+
+    # The reply carries in ADDITIONAL.  Note Labels=2, signer=parent.hack.
+    assert _has_a(response, response.additional, SERVICE, FORGED_A), response.to_text()
+    _check_signed_rrset(
+        response,
+        response.additional,
+        SERVICE,
+        dns.rdatatype.A,
+        signer=PARENT,
+        labels=2,
+    )
+
+    # Victim query — any later client, with CD=0:
+    child = _query(AUTH, SERVICE, "A")
+    isctest.check.noerror(child)
+
+    assert _has_a(child, child.answer, SERVICE, LEGIT_A), child.to_text()
+    assert not _has_a(child, child.answer, SERVICE, FORGED_A), child.to_text()
+    _check_signed_rrset(child, child.answer, SERVICE, dns.rdatatype.A, CHILD)
+
+
+def test_replayed_parent_wildcard():
+    # Prime the parent's DNSKEY.
+    prime_parent_soa()
+
+    # Trigger query — the on-path injection happens on the upstream side of
+    # this single fetch.  Requires CD=1.
+    response = _query(RESOLVER, QUERY, "MX", cd=True)
+    isctest.check.noerror(response)
+    assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX)
+
+    # Victim query — any later client, with CD=0:
+    response = _query(RESOLVER, SERVICE, "A")
+    isctest.check.noerror(response)
+    isctest.check.adflag(response)
+
+    assert not _has_a(response, response.answer, SERVICE, FORGED_A)
+    assert _has_a(response, response.answer, SERVICE, LEGIT_A), response.to_text()
+    _check_signed_rrset(response, response.answer, SERVICE, dns.rdatatype.A, CHILD)