From: Evan Hunt Date: Mon, 15 Jun 2026 23:28:25 +0000 (-0700) Subject: Merge DNSSEC wildcard tests X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=05691b53da2a67e6070ed2f796a4529c19f12c70;p=thirdparty%2Fbind9.git Merge DNSSEC wildcard tests Merge the tests for #5966 (F-043) and #5972 (F-045), previously called dnssec_wildcard_additional and dnssec_replayed_parent_wildcard, into a single directory with two modules. --- diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py b/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py deleted file mode 100644 index 0002e61aade..00000000000 --- a/bin/tests/system/dnssec_replayed_parent_wildcard/ans1/ans.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/python3 - -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, you can obtain one at https://mozilla.org/MPL/2.0/. -# -# See the COPYRIGHT file distributed with this work for additional -# information regarding copyright ownership. - -from collections.abc import AsyncGenerator -from dataclasses import dataclass -from pathlib import Path - -import json - -from cryptography.hazmat.primitives import serialization - -import dns.dnssec -import dns.flags -import dns.message -import dns.name -import dns.rcode -import dns.rdata -import dns.rdataclass -import dns.rdatatype -import dns.rrset - -from isctest.asyncserver import ( - AsyncDnsServer, - DnsResponseSend, - QueryContext, - ResponseHandler, -) - -# P3: A signed zone P, chained to a configured trust anchor, that -# (a) publishes a wildcard *.P A or AAAA record and -# (b) delegates at least one separately-signed child C.P (own DS in P) -# operated by a distinct principal. -TTL = 300 -PARENT = "parent.hack." -CHILD = f"child.{PARENT}" -QUERY = f"q.{PARENT}" -SERVICE = f"svc.{CHILD}" -WILDCARD = f"*.{PARENT}" - -FORGED_A = "198.51.100.45" -LEGIT_A = "192.0.2.113" - - -@dataclass(frozen=True) -class Key: - zone: dns.name.Name - private_key: object - dnskey: dns.rdata.Rdata - ds: dns.rdata.Rdata - - -def name(text: str) -> dns.name.Name: - return dns.name.from_text(text) - - -def load_keys() -> dict[str, Key]: - path = Path(__file__).resolve().parent / "keys.json" - with path.open(encoding="utf-8") as keys_file: - raw_keys = json.load(keys_file) - - keys = {} - for zone, raw_key in raw_keys.items(): - private_key = serialization.load_pem_private_key( - raw_key["private_pem"].encode("ascii"), - password=None, - ) - dnskey = dns.rdata.from_text( - dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"] - ) - ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"]) - keys[zone] = Key(name(zone), private_key, dnskey, ds) - - return keys - - -def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset: - return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas) - - -def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset: - return dns.rrset.from_rdata(name(owner), TTL, rdata) - - -def add_signed( - section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key -) -> None: - rrsig = dns.dnssec.sign( - covered, - signer.private_key, - signer.zone, - signer.dnskey, - lifetime=86400, - verify=True, - ) - section.append(covered) - section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig)) - - -def soa_rrset(zone: str) -> dns.rrset.RRset: - return rrset( - zone, - dns.rdatatype.SOA, - f"ns1.{zone} hostmaster.{zone} 1 3600 600 86400 300", - ) - - -def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None: - add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key) - - -def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None: - add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent) - - -def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None: - add_signed(response.authority, soa_rrset(zone), key) - - -def wildcard_rrsig(owner: str, parent: Key) -> dns.rrset.RRset: - wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A) - rrsig = dns.dnssec.sign( - wildcard, - parent.private_key, - parent.zone, - parent.dnskey, - lifetime=86400, - verify=True, - ) - return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig) - - -def add_parent_mx_with_forged_additional( - response: dns.message.Message, parent: Key -) -> None: - add_signed( - response.answer, - rrset(QUERY, dns.rdatatype.MX, f"10 {SERVICE}"), - parent, - ) - response.additional.append(rrset(SERVICE, dns.rdatatype.A, FORGED_A)) - response.additional.append(wildcard_rrsig(SERVICE, parent)) - - -def add_child_a(response: dns.message.Message, child: Key) -> None: - add_signed(response.answer, rrset(SERVICE, dns.rdatatype.A, LEGIT_A), child) - - -class QueryCParentWildcardHandler(ResponseHandler): - def __init__(self, keys: dict[str, Key]) -> None: - self.keys = keys - self.parent = name(PARENT) - self.child = name(CHILD) - self.query = name(QUERY) - self.service = name(SERVICE) - - def match(self, qctx: QueryContext) -> bool: - return True - - async def get_responses( - self, qctx: QueryContext - ) -> AsyncGenerator[DnsResponseSend, None]: - qctx.prepare_new_response(with_zone_data=False) - qctx.response.flags |= dns.flags.AA - qctx.response.set_rcode(dns.rcode.NOERROR) - - parent_key = self.keys[PARENT] - child_key = self.keys[CHILD] - - if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY: - # Priming, DNSKEY RRset - add_dnskey(qctx.response, PARENT, parent_key) - elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA: - # Priming, SOA RRset - add_signed(qctx.response.answer, soa_rrset(PARENT), parent_key) - elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: - # Trigger query. - add_parent_mx_with_forged_additional(qctx.response, parent_key) - elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS: - # Chain of trust, DS of child. - add_ds(qctx.response, CHILD, child_key, parent_key) - elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY: - # Chain of trust, DNSKEY of child. - add_dnskey(qctx.response, CHILD, child_key) - elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA: - # SOA of child. - add_signed(qctx.response.answer, soa_rrset(CHILD), child_key) - elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A: - # Zone data at child. - add_child_a(qctx.response, child_key) - elif qctx.qname.is_subdomain(self.child): - # No data at child. - add_nodata(qctx.response, CHILD, child_key) - elif qctx.qname.is_subdomain(self.parent): - # No data at parent. - add_nodata(qctx.response, PARENT, parent_key) - else: - qctx.response.set_rcode(dns.rcode.NXDOMAIN) - - yield DnsResponseSend(qctx.response, authoritative=True) - - -def main() -> None: - keys = load_keys() - server = AsyncDnsServer(default_aa=True) - server.install_response_handlers(QueryCParentWildcardHandler(keys)) - server.run() - - -if __name__ == "__main__": - main() diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 b/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 deleted file mode 100644 index 12726a9789d..00000000000 --- a/bin/tests/system/dnssec_replayed_parent_wildcard/ns2/named.conf.j2 +++ /dev/null @@ -1,37 +0,0 @@ -// validating resolver - -options { - query-source address 10.53.0.2; - notify-source 10.53.0.2; - transfer-source 10.53.0.2; - port @PORT@; - pid-file "named.pid"; - listen-on { 10.53.0.2; }; - listen-on-v6 { none; }; - recursion yes; - - // P1: named runs with dnssec-validation auto or yes - dnssec-validation yes; - // P2: minimal-responses is not set to the explicit value yes - minimal-responses no; -}; - -controls { - inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; }; -}; - -include "../../_common/rndc.key"; - -zone "." { - type hint; - file "../../_common/root.hint"; -}; - -zone "parent.hack" { - type static-stub; - server-addresses { 10.53.0.1; }; -}; - -trust-anchors { - parent.hack. static-key 257 3 13 "@PARENT_DNSKEY@"; -}; diff --git a/bin/tests/system/dnssec_wildcard/ans1/ans.py b/bin/tests/system/dnssec_wildcard/ans1/ans.py new file mode 100644 index 00000000000..1955c14aebe --- /dev/null +++ b/bin/tests/system/dnssec_wildcard/ans1/ans.py @@ -0,0 +1,20 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from dnssec_wildcard.ans1 import common, f043, f045 +from isctest.asyncserver import AsyncDnsServer + + +def main() -> None: + keys = common.load_keys() + server = AsyncDnsServer(default_aa=True) + server.install_response_handler(f043.F043Handler(keys)) + server.install_response_handler(f045.F045Handler(keys)) + server.run() + + +if __name__ == "__main__": + main() diff --git a/bin/tests/system/dnssec_wildcard/ans1/common.py b/bin/tests/system/dnssec_wildcard/ans1/common.py new file mode 100644 index 00000000000..051d7df1501 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard/ans1/common.py @@ -0,0 +1,102 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from dataclasses import dataclass +from pathlib import Path + +import json + +from cryptography.hazmat.primitives import serialization + +import dns.dnssec +import dns.message +import dns.name +import dns.rdata +import dns.rdataclass +import dns.rdatatype +import dns.rrset + +TTL = 300 + + +@dataclass(frozen=True) +class Key: + zone: dns.name.Name + private_key: object + dnskey: dns.rdata.Rdata + ds: dns.rdata.Rdata + + +def name(text: str) -> dns.name.Name: + return dns.name.from_text(text) + + +def load_keys() -> dict[str, Key]: + path = Path(".") / "keys.json" + with path.open(encoding="utf-8") as keys_file: + raw_keys = json.load(keys_file) + + keys = {} + for zone, raw_key in raw_keys.items(): + private_key = serialization.load_pem_private_key( + raw_key["private_pem"].encode("ascii"), + password=None, + ) + dnskey = dns.rdata.from_text( + dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"] + ) + ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"]) + keys[zone] = Key(name(zone), private_key, dnskey, ds) + + return keys + + +def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset: + return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas) + + +def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset: + return dns.rrset.from_rdata(owner, TTL, rdata) + + +def add_signed( + section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key +) -> None: + rrsig = dns.dnssec.sign( + covered, + signer.private_key, + signer.zone, + signer.dnskey, + lifetime=86400, + verify=True, + ) + section.append(covered) + section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig)) + + +def soa_rrset(zone) -> dns.rrset.RRset: + return rrset( + zone, + dns.rdatatype.SOA, + f"ns.{zone} hostmaster.{zone} 1 3600 600 86400 300", + ) + + +def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None: + add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key) + + +def wildcard_rrsig(owner: str, a: str, key: Key) -> dns.rrset.RRset: + wildcard = rrset(key.zone, dns.rdatatype.A, a) + rrsig = dns.dnssec.sign( + wildcard, + key.private_key, + key.zone, + key.dnskey, + lifetime=86400, + verify=True, + ) + return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig) diff --git a/bin/tests/system/dnssec_wildcard/ans1/f043.py b/bin/tests/system/dnssec_wildcard/ans1/f043.py new file mode 100644 index 00000000000..a2a99a90d21 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard/ans1/f043.py @@ -0,0 +1,80 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from collections.abc import AsyncGenerator + +import dns.flags +import dns.rcode +import dns.rdatatype +import dns.rrset + +from dnssec_wildcard.ans1.common import ( + Key, + add_dnskey, + add_signed, + name, + rrset, + soa_rrset, + wildcard_rrsig, +) +from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext + +F043_ZONE = "f043.test." +F043_QUERY = f"svc.{F043_ZONE}" +F043_VICTIM = f"victim.{F043_ZONE}" + +FORGED_A = "198.51.100.45" +LEGIT_A = "192.0.2.113" + + +def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None: + section.append(rrset(owner, dns.rdatatype.A, FORGED_A)) + section.append(wildcard_rrsig(owner, FORGED_A, key)) + + +class F043Handler(DomainHandler): # Additional from wildcard + domains = [F043_ZONE] + + def __init__(self, keys: dict[str, Key]) -> None: + super().__init__() + self.keys = keys + + if F043_ZONE not in keys: + return + + self.key = keys[F043_ZONE] + self.zone = name(F043_ZONE) + self.query = name(F043_QUERY) + self.victim = name(F043_VICTIM) + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + qctx.prepare_new_response(with_zone_data=False) + qctx.response.flags |= dns.flags.AA + qctx.response.set_rcode(dns.rcode.NOERROR) + + if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY: + add_dnskey(qctx.response, self.zone, self.key) + elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA: + add_signed(qctx.response.answer, soa_rrset(self.zone), self.key) + elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: + add_signed( + qctx.response.answer, + rrset(F043_QUERY, dns.rdatatype.MX, f"10 {F043_VICTIM}"), + self.key, + ) + add_wildcard_a(qctx.response.additional, F043_VICTIM, self.key) + elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A: + add_signed( + qctx.response.answer, + rrset(F043_VICTIM, dns.rdatatype.A, LEGIT_A), + self.key, + ) + else: + add_signed(qctx.response.authority, soa_rrset(self.zone), self.key) + + yield DnsResponseSend(qctx.response, authoritative=True) diff --git a/bin/tests/system/dnssec_wildcard/ans1/f045.py b/bin/tests/system/dnssec_wildcard/ans1/f045.py new file mode 100644 index 00000000000..74c2a4c2329 --- /dev/null +++ b/bin/tests/system/dnssec_wildcard/ans1/f045.py @@ -0,0 +1,120 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 + +from collections.abc import AsyncGenerator + +import dns.flags +import dns.message +import dns.rcode +import dns.rdatatype + +from dnssec_wildcard.ans1.common import ( + Key, + add_dnskey, + add_signed, + name, + rrset, + rrset_from_rdata, + soa_rrset, + wildcard_rrsig, +) +from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext + +TTL = 300 +FORGED_A = "198.51.100.45" +LEGIT_A = "192.0.2.113" + + +def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None: + add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent) + + +def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None: + add_signed(response.authority, soa_rrset(zone), key) + + +# A signed zone P, chained to a configured trust anchor, that +# (a) publishes a wildcard *.P A or AAAA record and +# (b) delegates at least one separately-signed child C.P (own DS in P) +# operated by a distinct principal. +F045_PARENT = "f045.test." +F045_CHILD = f"child.{F045_PARENT}" +F045_QUERY = f"q.{F045_PARENT}" +F045_SERVICE = f"svc.{F045_CHILD}" + + +def add_parent_mx_with_forged_additional( + response: dns.message.Message, parent: Key +) -> None: + add_signed( + response.answer, + rrset(F045_QUERY, dns.rdatatype.MX, f"10 {F045_SERVICE}"), + parent, + ) + response.additional.append(rrset(F045_SERVICE, dns.rdatatype.A, FORGED_A)) + response.additional.append(wildcard_rrsig(F045_SERVICE, FORGED_A, parent)) + + +def add_child_a(response: dns.message.Message, child: Key) -> None: + add_signed(response.answer, rrset(F045_SERVICE, dns.rdatatype.A, LEGIT_A), child) + + +class F045Handler(DomainHandler): + domains = [F045_PARENT, F045_CHILD] + + def __init__(self, keys: dict[str, Key]) -> None: + super().__init__() + self.keys = keys + + if F045_PARENT not in keys or F045_CHILD not in keys: + return + + self.parent = name(F045_PARENT) + self.child = name(F045_CHILD) + self.query = name(F045_QUERY) + self.service = name(F045_SERVICE) + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + qctx.prepare_new_response(with_zone_data=False) + qctx.response.flags |= dns.flags.AA + qctx.response.set_rcode(dns.rcode.NOERROR) + + parent_key = self.keys[F045_PARENT] + child_key = self.keys[F045_CHILD] + + if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY: + # Priming, DNSKEY RRset + add_dnskey(qctx.response, F045_PARENT, parent_key) + elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA: + # Priming, SOA RRset + add_signed(qctx.response.answer, soa_rrset(F045_PARENT), parent_key) + elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: + # Trigger query. + add_parent_mx_with_forged_additional(qctx.response, parent_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS: + # Chain of trust, DS of child. + add_ds(qctx.response, F045_CHILD, child_key, parent_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY: + # Chain of trust, DNSKEY of child. + add_dnskey(qctx.response, F045_CHILD, child_key) + elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA: + # SOA of child. + add_signed(qctx.response.answer, soa_rrset(F045_CHILD), child_key) + elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A: + # Zone data at child. + add_child_a(qctx.response, child_key) + elif qctx.qname.is_subdomain(self.child): + # No data at child. + add_nodata(qctx.response, F045_CHILD, child_key) + elif qctx.qname.is_subdomain(self.parent): + # No data at parent. + add_nodata(qctx.response, F045_PARENT, parent_key) + else: + qctx.response.set_rcode(dns.rcode.NXDOMAIN) + + yield DnsResponseSend(qctx.response, authoritative=True) diff --git a/bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 b/bin/tests/system/dnssec_wildcard/ns2/named.conf.j2 similarity index 64% rename from bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 rename to bin/tests/system/dnssec_wildcard/ns2/named.conf.j2 index fa7da5c6904..890d1529835 100644 --- a/bin/tests/system/dnssec_wildcard_additional/ns2/named.conf.j2 +++ b/bin/tests/system/dnssec_wildcard/ns2/named.conf.j2 @@ -24,11 +24,23 @@ zone "." { file "../../_common/root.hint"; }; + +{% if PARENT_DNSKEY is defined and PARENT_DNSKEY|length %} +zone "f045.test" { + type static-stub; + server-addresses { 10.53.0.1; }; +}; + +trust-anchors { + f045.test. static-key 257 3 13 "@PARENT_DNSKEY@"; +}; +{% else %} zone "f043.test" { type static-stub; server-addresses { 10.53.0.1; }; }; trust-anchors { - f043.test. static-key 257 3 13 "@ZONE_DNSKEY@"; + f043.test. static-key 257 3 13 "@TESTZONE_DNSKEY@"; }; +{% endif %} diff --git a/bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py b/bin/tests/system/dnssec_wildcard/tests_dnssec_wildcard_additional.py similarity index 86% rename from bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py rename to bin/tests/system/dnssec_wildcard/tests_dnssec_wildcard_additional.py index 6ece46666d6..7c24f671d98 100644 --- a/bin/tests/system/dnssec_wildcard_additional/tests_dnssec_wildcard_additional.py +++ b/bin/tests/system/dnssec_wildcard/tests_dnssec_wildcard_additional.py @@ -20,16 +20,16 @@ import pytest import isctest import isctest.mark -ZONE = "f043.test." -QUERY = f"svc.{ZONE}" -VICTIM = f"victim.{ZONE}" -FORGED_A = "198.51.100.90" -LEGIT_A = "192.0.2.50" +TESTZONE = "f043.test." +QUERY = f"svc.{TESTZONE}" +VICTIM = f"victim.{TESTZONE}" +FORGED_A = "198.51.100.45" +LEGIT_A = "192.0.2.113" AUTH = "10.53.0.1" RESOLVER = "10.53.0.2" pytestmark = [ - isctest.mark.with_algorithm("ECDSAP256SHA256"), + isctest.mark.with_ecdsa_deterministic, pytest.mark.extra_artifacts( [ "ans*/ans.run", @@ -39,13 +39,14 @@ pytestmark = [ ] -def _make_key(): +def _make_key(zone): private_key = ec.generate_private_key(ec.SECP256R1()) dnskey = dns.dnssec.make_dnskey( private_key.public_key(), algorithm="ECDSAP256SHA256", flags=257, ) + ds = dns.dnssec.make_ds(dns.name.from_text(zone), dnskey, "SHA256") private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, @@ -54,14 +55,15 @@ def _make_key(): return { "private_pem": private_pem, "dnskey": dnskey.to_text(), + "ds": ds.to_text(), } def bootstrap(): - keys = {ZONE: _make_key()} + keys = {TESTZONE: _make_key(TESTZONE)} Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii") - zone_dnskey = "".join(keys[ZONE]["dnskey"].split()[3:]) - return {"ZONE_DNSKEY": zone_dnskey} + zone_dnskey = "".join(keys[TESTZONE]["dnskey"].split()[3:]) + return {"TESTZONE_DNSKEY": zone_dnskey} def _query(server, qname, qtype, cd=False): @@ -106,13 +108,13 @@ def test_direct_fromwildcard_additional_fixture(): carrier.additional, VICTIM, dns.rdatatype.A, - ZONE, + TESTZONE, labels=2, ) def test_resolver_rejects_fromwildcard_additional_replay(): - soa = _query(RESOLVER, ZONE, "SOA") + soa = _query(RESOLVER, TESTZONE, "SOA") isctest.check.noerror(soa) isctest.check.adflag(soa) @@ -124,4 +126,4 @@ def test_resolver_rejects_fromwildcard_additional_replay(): isctest.check.adflag(response) assert not _has_a(response, response.answer, VICTIM, FORGED_A), response.to_text() assert _has_a(response, response.answer, VICTIM, LEGIT_A), response.to_text() - _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, ZONE) + _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, TESTZONE) diff --git a/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py b/bin/tests/system/dnssec_wildcard/tests_replayed_parent_wildcard.py similarity index 96% rename from bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py rename to bin/tests/system/dnssec_wildcard/tests_replayed_parent_wildcard.py index 135b21078ae..f96e7974649 100644 --- a/bin/tests/system/dnssec_replayed_parent_wildcard/tests_replayed_parent_wildcard.py +++ b/bin/tests/system/dnssec_wildcard/tests_replayed_parent_wildcard.py @@ -19,7 +19,6 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec import dns.dnssec -import dns.flags import dns.name import dns.rdataclass import dns.rdatatype @@ -28,7 +27,7 @@ import pytest import isctest import isctest.mark -PARENT = "parent.hack." +PARENT = "f045.test." CHILD = f"child.{PARENT}" QUERY = f"q.{PARENT}" SERVICE = f"svc.{CHILD}" @@ -38,7 +37,7 @@ AUTH = "10.53.0.1" RESOLVER = "10.53.0.2" pytestmark = [ - isctest.mark.with_algorithm("ECDSAP256SHA256"), + isctest.mark.with_ecdsa_deterministic, pytest.mark.extra_artifacts( [ "ans*/ans.run", @@ -135,7 +134,7 @@ def test_malicious_replay(): assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX) _check_signed_rrset(response, response.answer, QUERY, dns.rdatatype.MX, PARENT) - # The reply carries in ADDITIONAL. Note Labels=2, signer=parent.hack. + # The reply carries in ADDITIONAL. Note Labels=2, signer=f045.test. assert _has_a(response, response.additional, SERVICE, FORGED_A), response.to_text() _check_signed_rrset( response, diff --git a/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py b/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py deleted file mode 100644 index 8c54872c027..00000000000 --- a/bin/tests/system/dnssec_wildcard_additional/ans1/ans.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/python3 - -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 - -from collections.abc import AsyncGenerator -from dataclasses import dataclass -from pathlib import Path - -import json - -from cryptography.hazmat.primitives import serialization - -import dns.dnssec -import dns.flags -import dns.message -import dns.name -import dns.rdata -import dns.rdataclass -import dns.rcode -import dns.rdatatype -import dns.rrset - -from isctest.asyncserver import ( - AsyncDnsServer, - DnsResponseSend, - QueryContext, - ResponseHandler, -) - -TTL = 300 -ZONE = "f043.test." -QUERY = f"svc.{ZONE}" -VICTIM = f"victim.{ZONE}" -WILDCARD = f"*.{ZONE}" -FORGED_A = "198.51.100.90" -LEGIT_A = "192.0.2.50" - - -@dataclass(frozen=True) -class Key: - zone: dns.name.Name - private_key: object - dnskey: dns.rdata.Rdata - - -def name(text: str) -> dns.name.Name: - return dns.name.from_text(text) - - -def load_key() -> Key: - path = Path(__file__).resolve().parent / "keys.json" - with path.open(encoding="utf-8") as keys_file: - raw_key = json.load(keys_file)[ZONE] - - private_key = serialization.load_pem_private_key( - raw_key["private_pem"].encode("ascii"), - password=None, - ) - dnskey = dns.rdata.from_text( - dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"] - ) - return Key(name(ZONE), private_key, dnskey) - - -def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset: - return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas) - - -def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset: - return dns.rrset.from_rdata(name(owner), TTL, rdata) - - -def add_signed( - section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key -) -> None: - rrsig = dns.dnssec.sign( - covered, - signer.private_key, - signer.zone, - signer.dnskey, - lifetime=86400, - verify=True, - ) - section.append(covered) - section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig)) - - -def soa_rrset() -> dns.rrset.RRset: - return rrset( - ZONE, - dns.rdatatype.SOA, - f"ns.{ZONE} hostmaster.{ZONE} 1 3600 600 86400 300", - ) - - -def add_dnskey(response: dns.message.Message, key: Key) -> None: - add_signed(response.answer, rrset_from_rdata(ZONE, key.dnskey), key) - - -def wildcard_rrsig(owner: str, key: Key) -> dns.rrset.RRset: - wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A) - rrsig = dns.dnssec.sign( - wildcard, - key.private_key, - key.zone, - key.dnskey, - lifetime=86400, - verify=True, - ) - return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig) - - -def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None: - section.append(rrset(owner, dns.rdatatype.A, FORGED_A)) - section.append(wildcard_rrsig(owner, key)) - - -class FromWildcardAdditionalHandler(ResponseHandler): - def __init__(self, key: Key) -> None: - self.key = key - self.zone = name(ZONE) - self.query = name(QUERY) - self.victim = name(VICTIM) - - def match(self, qctx: QueryContext) -> bool: - return qctx.qname.is_subdomain(self.zone) - - async def get_responses( - self, qctx: QueryContext - ) -> AsyncGenerator[DnsResponseSend, None]: - qctx.prepare_new_response(with_zone_data=False) - qctx.response.flags |= dns.flags.AA - qctx.response.set_rcode(dns.rcode.NOERROR) - - if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY: - add_dnskey(qctx.response, self.key) - elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA: - add_signed(qctx.response.answer, soa_rrset(), self.key) - elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX: - add_signed( - qctx.response.answer, - rrset(QUERY, dns.rdatatype.MX, f"10 {VICTIM}"), - self.key, - ) - add_wildcard_a(qctx.response.additional, VICTIM, self.key) - elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A: - add_signed( - qctx.response.answer, - rrset(VICTIM, dns.rdatatype.A, LEGIT_A), - self.key, - ) - else: - add_signed(qctx.response.authority, soa_rrset(), self.key) - - yield DnsResponseSend(qctx.response, authoritative=True) - - -def main() -> None: - server = AsyncDnsServer(default_aa=True) - server.install_response_handlers(FromWildcardAdditionalHandler(load_key())) - server.run() - - -if __name__ == "__main__": - main()