]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Merge DNSSEC wildcard tests
authorEvan Hunt <each@isc.org>
Mon, 15 Jun 2026 23:28:25 +0000 (16:28 -0700)
committerEvan Hunt <each@isc.org>
Wed, 17 Jun 2026 02:07:06 +0000 (19:07 -0700)
Merge the tests for #5966 (F-043) and #5972 (F-045), previously called
dnssec_wildcard_additional and dnssec_replayed_parent_wildcard, into a
single directory with two modules.

bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py [deleted file]
bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 [deleted file]
bin/tests/system/dnssec_wildcard/ans1/ans.py [new file with mode: 0644]
bin/tests/system/dnssec_wildcard/ans1/common.py [new file with mode: 0644]
bin/tests/system/dnssec_wildcard/ans1/f043.py [new file with mode: 0644]
bin/tests/system/dnssec_wildcard/ans1/f045.py [new file with mode: 0644]
bin/tests/system/dnssec_wildcard/ns2/named.conf.j2 [moved from bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 with 64% similarity]
bin/tests/system/dnssec_wildcard/tests_dnssec_wildcard_additional.py [moved from bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py with 86% similarity]
bin/tests/system/dnssec_wildcard/tests_replayed_parent_wildcard.py [moved from bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py with 96% similarity]
bin/tests/system/dnssec_wildcard_additional/ans1/ans.py [deleted file]

diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py b/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py
deleted file mode 100644 (file)
index 0002e61..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-#!/usr/bin/python3
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0.  If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-from collections.abc import AsyncGenerator
-from dataclasses import dataclass
-from pathlib import Path
-
-import json
-
-from cryptography.hazmat.primitives import serialization
-
-import dns.dnssec
-import dns.flags
-import dns.message
-import dns.name
-import dns.rcode
-import dns.rdata
-import dns.rdataclass
-import dns.rdatatype
-import dns.rrset
-
-from isctest.asyncserver import (
-    AsyncDnsServer,
-    DnsResponseSend,
-    QueryContext,
-    ResponseHandler,
-)
-
-# P3: A signed zone P, chained to a configured trust anchor, that
-# (a) publishes a wildcard *.P A or AAAA record and
-# (b) delegates at least one separately-signed child C.P (own DS in P)
-#     operated by a distinct principal.
-TTL = 300
-PARENT = "parent.hack."
-CHILD = f"child.{PARENT}"
-QUERY = f"q.{PARENT}"
-SERVICE = f"svc.{CHILD}"
-WILDCARD = f"*.{PARENT}"
-
-FORGED_A = "198.51.100.45"
-LEGIT_A = "192.0.2.113"
-
-
-@dataclass(frozen=True)
-class Key:
-    zone: dns.name.Name
-    private_key: object
-    dnskey: dns.rdata.Rdata
-    ds: dns.rdata.Rdata
-
-
-def name(text: str) -> dns.name.Name:
-    return dns.name.from_text(text)
-
-
-def load_keys() -> dict[str, Key]:
-    path = Path(__file__).resolve().parent / "keys.json"
-    with path.open(encoding="utf-8") as keys_file:
-        raw_keys = json.load(keys_file)
-
-    keys = {}
-    for zone, raw_key in raw_keys.items():
-        private_key = serialization.load_pem_private_key(
-            raw_key["private_pem"].encode("ascii"),
-            password=None,
-        )
-        dnskey = dns.rdata.from_text(
-            dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
-        )
-        ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"])
-        keys[zone] = Key(name(zone), private_key, dnskey, ds)
-
-    return keys
-
-
-def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
-    return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
-
-
-def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
-    return dns.rrset.from_rdata(name(owner), TTL, rdata)
-
-
-def add_signed(
-    section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
-) -> None:
-    rrsig = dns.dnssec.sign(
-        covered,
-        signer.private_key,
-        signer.zone,
-        signer.dnskey,
-        lifetime=86400,
-        verify=True,
-    )
-    section.append(covered)
-    section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
-
-
-def soa_rrset(zone: str) -> dns.rrset.RRset:
-    return rrset(
-        zone,
-        dns.rdatatype.SOA,
-        f"ns1.{zone} hostmaster.{zone} 1 3600 600 86400 300",
-    )
-
-
-def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None:
-    add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key)
-
-
-def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None:
-    add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent)
-
-
-def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None:
-    add_signed(response.authority, soa_rrset(zone), key)
-
-
-def wildcard_rrsig(owner: str, parent: Key) -> dns.rrset.RRset:
-    wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
-    rrsig = dns.dnssec.sign(
-        wildcard,
-        parent.private_key,
-        parent.zone,
-        parent.dnskey,
-        lifetime=86400,
-        verify=True,
-    )
-    return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
-
-
-def add_parent_mx_with_forged_additional(
-    response: dns.message.Message, parent: Key
-) -> None:
-    add_signed(
-        response.answer,
-        rrset(QUERY, dns.rdatatype.MX, f"10 {SERVICE}"),
-        parent,
-    )
-    response.additional.append(rrset(SERVICE, dns.rdatatype.A, FORGED_A))
-    response.additional.append(wildcard_rrsig(SERVICE, parent))
-
-
-def add_child_a(response: dns.message.Message, child: Key) -> None:
-    add_signed(response.answer, rrset(SERVICE, dns.rdatatype.A, LEGIT_A), child)
-
-
-class QueryCParentWildcardHandler(ResponseHandler):
-    def __init__(self, keys: dict[str, Key]) -> None:
-        self.keys = keys
-        self.parent = name(PARENT)
-        self.child = name(CHILD)
-        self.query = name(QUERY)
-        self.service = name(SERVICE)
-
-    def match(self, qctx: QueryContext) -> bool:
-        return True
-
-    async def get_responses(
-        self, qctx: QueryContext
-    ) -> AsyncGenerator[DnsResponseSend, None]:
-        qctx.prepare_new_response(with_zone_data=False)
-        qctx.response.flags |= dns.flags.AA
-        qctx.response.set_rcode(dns.rcode.NOERROR)
-
-        parent_key = self.keys[PARENT]
-        child_key = self.keys[CHILD]
-
-        if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY:
-            # Priming, DNSKEY RRset
-            add_dnskey(qctx.response, PARENT, parent_key)
-        elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA:
-            # Priming, SOA RRset
-            add_signed(qctx.response.answer, soa_rrset(PARENT), parent_key)
-        elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
-            # Trigger query.
-            add_parent_mx_with_forged_additional(qctx.response, parent_key)
-        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS:
-            # Chain of trust, DS of child.
-            add_ds(qctx.response, CHILD, child_key, parent_key)
-        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY:
-            # Chain of trust, DNSKEY of child.
-            add_dnskey(qctx.response, CHILD, child_key)
-        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA:
-            # SOA of child.
-            add_signed(qctx.response.answer, soa_rrset(CHILD), child_key)
-        elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A:
-            # Zone data at child.
-            add_child_a(qctx.response, child_key)
-        elif qctx.qname.is_subdomain(self.child):
-            # No data at child.
-            add_nodata(qctx.response, CHILD, child_key)
-        elif qctx.qname.is_subdomain(self.parent):
-            # No data at parent.
-            add_nodata(qctx.response, PARENT, parent_key)
-        else:
-            qctx.response.set_rcode(dns.rcode.NXDOMAIN)
-
-        yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-def main() -> None:
-    keys = load_keys()
-    server = AsyncDnsServer(default_aa=True)
-    server.install_response_handlers(QueryCParentWildcardHandler(keys))
-    server.run()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 b/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2
deleted file mode 100644 (file)
index 12726a9..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-// validating resolver
-
-options {
-       query-source address 10.53.0.2;
-       notify-source 10.53.0.2;
-       transfer-source 10.53.0.2;
-       port @PORT@;
-       pid-file "named.pid";
-       listen-on { 10.53.0.2; };
-       listen-on-v6 { none; };
-       recursion yes;
-
-       // P1: named runs with dnssec-validation auto or yes
-       dnssec-validation yes;
-       // P2: minimal-responses is not set to the explicit value yes
-       minimal-responses no;
-};
-
-controls {
-       inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
-};
-
-include "../../_common/rndc.key";
-
-zone "." {
-       type hint;
-       file "../../_common/root.hint";
-};
-
-zone "parent.hack" {
-       type static-stub;
-       server-addresses { 10.53.0.1; };
-};
-
-trust-anchors {
-       parent.hack. static-key 257 3 13 "@PARENT_DNSKEY@";
-};
diff --git a/bin/tests/system/dnssec_wildcard/ans1/ans.py b/bin/tests/system/dnssec_wildcard/ans1/ans.py
new file mode 100644 (file)
index 0000000..1955c14
--- /dev/null
@@ -0,0 +1,20 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from dnssec_wildcard.ans1 import common, f043, f045
+from isctest.asyncserver import AsyncDnsServer
+
+
+def main() -> None:
+    keys = common.load_keys()
+    server = AsyncDnsServer(default_aa=True)
+    server.install_response_handler(f043.F043Handler(keys))
+    server.install_response_handler(f045.F045Handler(keys))
+    server.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/bin/tests/system/dnssec_wildcard/ans1/common.py b/bin/tests/system/dnssec_wildcard/ans1/common.py
new file mode 100644 (file)
index 0000000..051d7df
--- /dev/null
@@ -0,0 +1,102 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from dataclasses import dataclass
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+
+import dns.dnssec
+import dns.message
+import dns.name
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+TTL = 300
+
+
+@dataclass(frozen=True)
+class Key:
+    zone: dns.name.Name
+    private_key: object
+    dnskey: dns.rdata.Rdata
+    ds: dns.rdata.Rdata
+
+
+def name(text: str) -> dns.name.Name:
+    return dns.name.from_text(text)
+
+
+def load_keys() -> dict[str, Key]:
+    path = Path(".") / "keys.json"
+    with path.open(encoding="utf-8") as keys_file:
+        raw_keys = json.load(keys_file)
+
+    keys = {}
+    for zone, raw_key in raw_keys.items():
+        private_key = serialization.load_pem_private_key(
+            raw_key["private_pem"].encode("ascii"),
+            password=None,
+        )
+        dnskey = dns.rdata.from_text(
+            dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
+        )
+        ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"])
+        keys[zone] = Key(name(zone), private_key, dnskey, ds)
+
+    return keys
+
+
+def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
+    return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
+
+
+def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
+    return dns.rrset.from_rdata(owner, TTL, rdata)
+
+
+def add_signed(
+    section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
+) -> None:
+    rrsig = dns.dnssec.sign(
+        covered,
+        signer.private_key,
+        signer.zone,
+        signer.dnskey,
+        lifetime=86400,
+        verify=True,
+    )
+    section.append(covered)
+    section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
+
+
+def soa_rrset(zone) -> dns.rrset.RRset:
+    return rrset(
+        zone,
+        dns.rdatatype.SOA,
+        f"ns.{zone} hostmaster.{zone} 1 3600 600 86400 300",
+    )
+
+
+def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None:
+    add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key)
+
+
+def wildcard_rrsig(owner: str, a: str, key: Key) -> dns.rrset.RRset:
+    wildcard = rrset(key.zone, dns.rdatatype.A, a)
+    rrsig = dns.dnssec.sign(
+        wildcard,
+        key.private_key,
+        key.zone,
+        key.dnskey,
+        lifetime=86400,
+        verify=True,
+    )
+    return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
diff --git a/bin/tests/system/dnssec_wildcard/ans1/f043.py b/bin/tests/system/dnssec_wildcard/ans1/f043.py
new file mode 100644 (file)
index 0000000..a2a99a9
--- /dev/null
@@ -0,0 +1,80 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from collections.abc import AsyncGenerator
+
+import dns.flags
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
+
+from dnssec_wildcard.ans1.common import (
+    Key,
+    add_dnskey,
+    add_signed,
+    name,
+    rrset,
+    soa_rrset,
+    wildcard_rrsig,
+)
+from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext
+
+F043_ZONE = "f043.test."
+F043_QUERY = f"svc.{F043_ZONE}"
+F043_VICTIM = f"victim.{F043_ZONE}"
+
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+
+
+def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None:
+    section.append(rrset(owner, dns.rdatatype.A, FORGED_A))
+    section.append(wildcard_rrsig(owner, FORGED_A, key))
+
+
+class F043Handler(DomainHandler):  # Additional from wildcard
+    domains = [F043_ZONE]
+
+    def __init__(self, keys: dict[str, Key]) -> None:
+        super().__init__()
+        self.keys = keys
+
+        if F043_ZONE not in keys:
+            return
+
+        self.key = keys[F043_ZONE]
+        self.zone = name(F043_ZONE)
+        self.query = name(F043_QUERY)
+        self.victim = name(F043_VICTIM)
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        qctx.prepare_new_response(with_zone_data=False)
+        qctx.response.flags |= dns.flags.AA
+        qctx.response.set_rcode(dns.rcode.NOERROR)
+
+        if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY:
+            add_dnskey(qctx.response, self.zone, self.key)
+        elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA:
+            add_signed(qctx.response.answer, soa_rrset(self.zone), self.key)
+        elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+            add_signed(
+                qctx.response.answer,
+                rrset(F043_QUERY, dns.rdatatype.MX, f"10 {F043_VICTIM}"),
+                self.key,
+            )
+            add_wildcard_a(qctx.response.additional, F043_VICTIM, self.key)
+        elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A:
+            add_signed(
+                qctx.response.answer,
+                rrset(F043_VICTIM, dns.rdatatype.A, LEGIT_A),
+                self.key,
+            )
+        else:
+            add_signed(qctx.response.authority, soa_rrset(self.zone), self.key)
+
+        yield DnsResponseSend(qctx.response, authoritative=True)
diff --git a/bin/tests/system/dnssec_wildcard/ans1/f045.py b/bin/tests/system/dnssec_wildcard/ans1/f045.py
new file mode 100644 (file)
index 0000000..74c2a4c
--- /dev/null
@@ -0,0 +1,120 @@
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from collections.abc import AsyncGenerator
+
+import dns.flags
+import dns.message
+import dns.rcode
+import dns.rdatatype
+
+from dnssec_wildcard.ans1.common import (
+    Key,
+    add_dnskey,
+    add_signed,
+    name,
+    rrset,
+    rrset_from_rdata,
+    soa_rrset,
+    wildcard_rrsig,
+)
+from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext
+
+TTL = 300
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+
+
+def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None:
+    add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent)
+
+
+def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None:
+    add_signed(response.authority, soa_rrset(zone), key)
+
+
+# A signed zone P, chained to a configured trust anchor, that
+# (a) publishes a wildcard *.P A or AAAA record and
+# (b) delegates at least one separately-signed child C.P (own DS in P)
+#     operated by a distinct principal.
+F045_PARENT = "f045.test."
+F045_CHILD = f"child.{F045_PARENT}"
+F045_QUERY = f"q.{F045_PARENT}"
+F045_SERVICE = f"svc.{F045_CHILD}"
+
+
+def add_parent_mx_with_forged_additional(
+    response: dns.message.Message, parent: Key
+) -> None:
+    add_signed(
+        response.answer,
+        rrset(F045_QUERY, dns.rdatatype.MX, f"10 {F045_SERVICE}"),
+        parent,
+    )
+    response.additional.append(rrset(F045_SERVICE, dns.rdatatype.A, FORGED_A))
+    response.additional.append(wildcard_rrsig(F045_SERVICE, FORGED_A, parent))
+
+
+def add_child_a(response: dns.message.Message, child: Key) -> None:
+    add_signed(response.answer, rrset(F045_SERVICE, dns.rdatatype.A, LEGIT_A), child)
+
+
+class F045Handler(DomainHandler):
+    domains = [F045_PARENT, F045_CHILD]
+
+    def __init__(self, keys: dict[str, Key]) -> None:
+        super().__init__()
+        self.keys = keys
+
+        if F045_PARENT not in keys or F045_CHILD not in keys:
+            return
+
+        self.parent = name(F045_PARENT)
+        self.child = name(F045_CHILD)
+        self.query = name(F045_QUERY)
+        self.service = name(F045_SERVICE)
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        qctx.prepare_new_response(with_zone_data=False)
+        qctx.response.flags |= dns.flags.AA
+        qctx.response.set_rcode(dns.rcode.NOERROR)
+
+        parent_key = self.keys[F045_PARENT]
+        child_key = self.keys[F045_CHILD]
+
+        if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY:
+            # Priming, DNSKEY RRset
+            add_dnskey(qctx.response, F045_PARENT, parent_key)
+        elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA:
+            # Priming, SOA RRset
+            add_signed(qctx.response.answer, soa_rrset(F045_PARENT), parent_key)
+        elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+            # Trigger query.
+            add_parent_mx_with_forged_additional(qctx.response, parent_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS:
+            # Chain of trust, DS of child.
+            add_ds(qctx.response, F045_CHILD, child_key, parent_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY:
+            # Chain of trust, DNSKEY of child.
+            add_dnskey(qctx.response, F045_CHILD, child_key)
+        elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA:
+            # SOA of child.
+            add_signed(qctx.response.answer, soa_rrset(F045_CHILD), child_key)
+        elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A:
+            # Zone data at child.
+            add_child_a(qctx.response, child_key)
+        elif qctx.qname.is_subdomain(self.child):
+            # No data at child.
+            add_nodata(qctx.response, F045_CHILD, child_key)
+        elif qctx.qname.is_subdomain(self.parent):
+            # No data at parent.
+            add_nodata(qctx.response, F045_PARENT, parent_key)
+        else:
+            qctx.response.set_rcode(dns.rcode.NXDOMAIN)
+
+        yield DnsResponseSend(qctx.response, authoritative=True)
similarity index 64%
rename from bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2
rename to bin/tests/system/dnssec_wildcard/ns2/named.conf.j2
index fa7da5c69048a84afd6a111bcb5c092e5b75a8df..890d1529835de933b7db6aca12b2c362f4a06b86 100644 (file)
@@ -24,11 +24,23 @@ zone "." {
        file "../../_common/root.hint";
 };
 
+
+{% if PARENT_DNSKEY is defined and PARENT_DNSKEY|length %}
+zone "f045.test" {
+        type static-stub;
+        server-addresses { 10.53.0.1; };
+};
+
+trust-anchors {
+        f045.test. static-key 257 3 13 "@PARENT_DNSKEY@";
+};
+{% else %}
 zone "f043.test" {
        type static-stub;
        server-addresses { 10.53.0.1; };
 };
 
 trust-anchors {
-       f043.test. static-key 257 3 13 "@ZONE_DNSKEY@";
+       f043.test. static-key 257 3 13 "@TESTZONE_DNSKEY@";
 };
+{% endif %}
similarity index 86%
rename from bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py
rename to bin/tests/system/dnssec_wildcard/tests_dnssec_wildcard_additional.py
index 6ece46666d63326ea20fabae46ab707a51bcd158..7c24f671d98734c0a3a6f8e0ba15129a05f6a2b4 100644 (file)
@@ -20,16 +20,16 @@ import pytest
 import isctest
 import isctest.mark
 
-ZONE = "f043.test."
-QUERY = f"svc.{ZONE}"
-VICTIM = f"victim.{ZONE}"
-FORGED_A = "198.51.100.90"
-LEGIT_A = "192.0.2.50"
+TESTZONE = "f043.test."
+QUERY = f"svc.{TESTZONE}"
+VICTIM = f"victim.{TESTZONE}"
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
 AUTH = "10.53.0.1"
 RESOLVER = "10.53.0.2"
 
 pytestmark = [
-    isctest.mark.with_algorithm("ECDSAP256SHA256"),
+    isctest.mark.with_ecdsa_deterministic,
     pytest.mark.extra_artifacts(
         [
             "ans*/ans.run",
@@ -39,13 +39,14 @@ pytestmark = [
 ]
 
 
-def _make_key():
+def _make_key(zone):
     private_key = ec.generate_private_key(ec.SECP256R1())
     dnskey = dns.dnssec.make_dnskey(
         private_key.public_key(),
         algorithm="ECDSAP256SHA256",
         flags=257,
     )
+    ds = dns.dnssec.make_ds(dns.name.from_text(zone), dnskey, "SHA256")
     private_pem = private_key.private_bytes(
         encoding=serialization.Encoding.PEM,
         format=serialization.PrivateFormat.PKCS8,
@@ -54,14 +55,15 @@ def _make_key():
     return {
         "private_pem": private_pem,
         "dnskey": dnskey.to_text(),
+        "ds": ds.to_text(),
     }
 
 
 def bootstrap():
-    keys = {ZONE: _make_key()}
+    keys = {TESTZONE: _make_key(TESTZONE)}
     Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii")
-    zone_dnskey = "".join(keys[ZONE]["dnskey"].split()[3:])
-    return {"ZONE_DNSKEY": zone_dnskey}
+    zone_dnskey = "".join(keys[TESTZONE]["dnskey"].split()[3:])
+    return {"TESTZONE_DNSKEY": zone_dnskey}
 
 
 def _query(server, qname, qtype, cd=False):
@@ -106,13 +108,13 @@ def test_direct_fromwildcard_additional_fixture():
         carrier.additional,
         VICTIM,
         dns.rdatatype.A,
-        ZONE,
+        TESTZONE,
         labels=2,
     )
 
 
 def test_resolver_rejects_fromwildcard_additional_replay():
-    soa = _query(RESOLVER, ZONE, "SOA")
+    soa = _query(RESOLVER, TESTZONE, "SOA")
     isctest.check.noerror(soa)
     isctest.check.adflag(soa)
 
@@ -124,4 +126,4 @@ def test_resolver_rejects_fromwildcard_additional_replay():
     isctest.check.adflag(response)
     assert not _has_a(response, response.answer, VICTIM, FORGED_A), response.to_text()
     assert _has_a(response, response.answer, VICTIM, LEGIT_A), response.to_text()
-    _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, ZONE)
+    _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, TESTZONE)
similarity index 96%
rename from bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py
rename to bin/tests/system/dnssec_wildcard/tests_replayed_parent_wildcard.py
index 135b21078ae56b81710fa7ec9bb6b8755882fefb..f96e7974649921324a128b6d52b96f6a1cf051f2 100644 (file)
@@ -19,7 +19,6 @@ from cryptography.hazmat.primitives import serialization
 from cryptography.hazmat.primitives.asymmetric import ec
 
 import dns.dnssec
-import dns.flags
 import dns.name
 import dns.rdataclass
 import dns.rdatatype
@@ -28,7 +27,7 @@ import pytest
 import isctest
 import isctest.mark
 
-PARENT = "parent.hack."
+PARENT = "f045.test."
 CHILD = f"child.{PARENT}"
 QUERY = f"q.{PARENT}"
 SERVICE = f"svc.{CHILD}"
@@ -38,7 +37,7 @@ AUTH = "10.53.0.1"
 RESOLVER = "10.53.0.2"
 
 pytestmark = [
-    isctest.mark.with_algorithm("ECDSAP256SHA256"),
+    isctest.mark.with_ecdsa_deterministic,
     pytest.mark.extra_artifacts(
         [
             "ans*/ans.run",
@@ -135,7 +134,7 @@ def test_malicious_replay():
     assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX)
     _check_signed_rrset(response, response.answer, QUERY, dns.rdatatype.MX, PARENT)
 
-    # The reply carries in ADDITIONAL.  Note Labels=2, signer=parent.hack.
+    # The reply carries in ADDITIONAL.  Note Labels=2, signer=f045.test.
     assert _has_a(response, response.additional, SERVICE, FORGED_A), response.to_text()
     _check_signed_rrset(
         response,
diff --git a/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py b/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py
deleted file mode 100644 (file)
index 8c54872..0000000
+++ /dev/null
@@ -1,167 +0,0 @@
-#!/usr/bin/python3
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-
-from collections.abc import AsyncGenerator
-from dataclasses import dataclass
-from pathlib import Path
-
-import json
-
-from cryptography.hazmat.primitives import serialization
-
-import dns.dnssec
-import dns.flags
-import dns.message
-import dns.name
-import dns.rdata
-import dns.rdataclass
-import dns.rcode
-import dns.rdatatype
-import dns.rrset
-
-from isctest.asyncserver import (
-    AsyncDnsServer,
-    DnsResponseSend,
-    QueryContext,
-    ResponseHandler,
-)
-
-TTL = 300
-ZONE = "f043.test."
-QUERY = f"svc.{ZONE}"
-VICTIM = f"victim.{ZONE}"
-WILDCARD = f"*.{ZONE}"
-FORGED_A = "198.51.100.90"
-LEGIT_A = "192.0.2.50"
-
-
-@dataclass(frozen=True)
-class Key:
-    zone: dns.name.Name
-    private_key: object
-    dnskey: dns.rdata.Rdata
-
-
-def name(text: str) -> dns.name.Name:
-    return dns.name.from_text(text)
-
-
-def load_key() -> Key:
-    path = Path(__file__).resolve().parent / "keys.json"
-    with path.open(encoding="utf-8") as keys_file:
-        raw_key = json.load(keys_file)[ZONE]
-
-    private_key = serialization.load_pem_private_key(
-        raw_key["private_pem"].encode("ascii"),
-        password=None,
-    )
-    dnskey = dns.rdata.from_text(
-        dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
-    )
-    return Key(name(ZONE), private_key, dnskey)
-
-
-def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
-    return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
-
-
-def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
-    return dns.rrset.from_rdata(name(owner), TTL, rdata)
-
-
-def add_signed(
-    section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
-) -> None:
-    rrsig = dns.dnssec.sign(
-        covered,
-        signer.private_key,
-        signer.zone,
-        signer.dnskey,
-        lifetime=86400,
-        verify=True,
-    )
-    section.append(covered)
-    section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
-
-
-def soa_rrset() -> dns.rrset.RRset:
-    return rrset(
-        ZONE,
-        dns.rdatatype.SOA,
-        f"ns.{ZONE} hostmaster.{ZONE} 1 3600 600 86400 300",
-    )
-
-
-def add_dnskey(response: dns.message.Message, key: Key) -> None:
-    add_signed(response.answer, rrset_from_rdata(ZONE, key.dnskey), key)
-
-
-def wildcard_rrsig(owner: str, key: Key) -> dns.rrset.RRset:
-    wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
-    rrsig = dns.dnssec.sign(
-        wildcard,
-        key.private_key,
-        key.zone,
-        key.dnskey,
-        lifetime=86400,
-        verify=True,
-    )
-    return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
-
-
-def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None:
-    section.append(rrset(owner, dns.rdatatype.A, FORGED_A))
-    section.append(wildcard_rrsig(owner, key))
-
-
-class FromWildcardAdditionalHandler(ResponseHandler):
-    def __init__(self, key: Key) -> None:
-        self.key = key
-        self.zone = name(ZONE)
-        self.query = name(QUERY)
-        self.victim = name(VICTIM)
-
-    def match(self, qctx: QueryContext) -> bool:
-        return qctx.qname.is_subdomain(self.zone)
-
-    async def get_responses(
-        self, qctx: QueryContext
-    ) -> AsyncGenerator[DnsResponseSend, None]:
-        qctx.prepare_new_response(with_zone_data=False)
-        qctx.response.flags |= dns.flags.AA
-        qctx.response.set_rcode(dns.rcode.NOERROR)
-
-        if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY:
-            add_dnskey(qctx.response, self.key)
-        elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA:
-            add_signed(qctx.response.answer, soa_rrset(), self.key)
-        elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
-            add_signed(
-                qctx.response.answer,
-                rrset(QUERY, dns.rdatatype.MX, f"10 {VICTIM}"),
-                self.key,
-            )
-            add_wildcard_a(qctx.response.additional, VICTIM, self.key)
-        elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A:
-            add_signed(
-                qctx.response.answer,
-                rrset(VICTIM, dns.rdatatype.A, LEGIT_A),
-                self.key,
-            )
-        else:
-            add_signed(qctx.response.authority, soa_rrset(), self.key)
-
-        yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-def main() -> None:
-    server = AsyncDnsServer(default_aa=True)
-    server.install_response_handlers(FromWildcardAdditionalHandler(load_key()))
-    server.run()
-
-
-if __name__ == "__main__":
-    main()