+++ /dev/null
-#!/usr/bin/python3
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-from collections.abc import AsyncGenerator
-from dataclasses import dataclass
-from pathlib import Path
-
-import json
-
-from cryptography.hazmat.primitives import serialization
-
-import dns.dnssec
-import dns.flags
-import dns.message
-import dns.name
-import dns.rcode
-import dns.rdata
-import dns.rdataclass
-import dns.rdatatype
-import dns.rrset
-
-from isctest.asyncserver import (
- AsyncDnsServer,
- DnsResponseSend,
- QueryContext,
- ResponseHandler,
-)
-
-# P3: A signed zone P, chained to a configured trust anchor, that
-# (a) publishes a wildcard *.P A or AAAA record and
-# (b) delegates at least one separately-signed child C.P (own DS in P)
-# operated by a distinct principal.
-TTL = 300
-PARENT = "parent.hack."
-CHILD = f"child.{PARENT}"
-QUERY = f"q.{PARENT}"
-SERVICE = f"svc.{CHILD}"
-WILDCARD = f"*.{PARENT}"
-
-FORGED_A = "198.51.100.45"
-LEGIT_A = "192.0.2.113"
-
-
-@dataclass(frozen=True)
-class Key:
- zone: dns.name.Name
- private_key: object
- dnskey: dns.rdata.Rdata
- ds: dns.rdata.Rdata
-
-
-def name(text: str) -> dns.name.Name:
- return dns.name.from_text(text)
-
-
-def load_keys() -> dict[str, Key]:
- path = Path(__file__).resolve().parent / "keys.json"
- with path.open(encoding="utf-8") as keys_file:
- raw_keys = json.load(keys_file)
-
- keys = {}
- for zone, raw_key in raw_keys.items():
- private_key = serialization.load_pem_private_key(
- raw_key["private_pem"].encode("ascii"),
- password=None,
- )
- dnskey = dns.rdata.from_text(
- dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
- )
- ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"])
- keys[zone] = Key(name(zone), private_key, dnskey, ds)
-
- return keys
-
-
-def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
- return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
-
-
-def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
- return dns.rrset.from_rdata(name(owner), TTL, rdata)
-
-
-def add_signed(
- section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
-) -> None:
- rrsig = dns.dnssec.sign(
- covered,
- signer.private_key,
- signer.zone,
- signer.dnskey,
- lifetime=86400,
- verify=True,
- )
- section.append(covered)
- section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
-
-
-def soa_rrset(zone: str) -> dns.rrset.RRset:
- return rrset(
- zone,
- dns.rdatatype.SOA,
- f"ns1.{zone} hostmaster.{zone} 1 3600 600 86400 300",
- )
-
-
-def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None:
- add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key)
-
-
-def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None:
- add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent)
-
-
-def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None:
- add_signed(response.authority, soa_rrset(zone), key)
-
-
-def wildcard_rrsig(owner: str, parent: Key) -> dns.rrset.RRset:
- wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
- rrsig = dns.dnssec.sign(
- wildcard,
- parent.private_key,
- parent.zone,
- parent.dnskey,
- lifetime=86400,
- verify=True,
- )
- return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
-
-
-def add_parent_mx_with_forged_additional(
- response: dns.message.Message, parent: Key
-) -> None:
- add_signed(
- response.answer,
- rrset(QUERY, dns.rdatatype.MX, f"10 {SERVICE}"),
- parent,
- )
- response.additional.append(rrset(SERVICE, dns.rdatatype.A, FORGED_A))
- response.additional.append(wildcard_rrsig(SERVICE, parent))
-
-
-def add_child_a(response: dns.message.Message, child: Key) -> None:
- add_signed(response.answer, rrset(SERVICE, dns.rdatatype.A, LEGIT_A), child)
-
-
-class QueryCParentWildcardHandler(ResponseHandler):
- def __init__(self, keys: dict[str, Key]) -> None:
- self.keys = keys
- self.parent = name(PARENT)
- self.child = name(CHILD)
- self.query = name(QUERY)
- self.service = name(SERVICE)
-
- def match(self, qctx: QueryContext) -> bool:
- return True
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- qctx.prepare_new_response(with_zone_data=False)
- qctx.response.flags |= dns.flags.AA
- qctx.response.set_rcode(dns.rcode.NOERROR)
-
- parent_key = self.keys[PARENT]
- child_key = self.keys[CHILD]
-
- if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY:
- # Priming, DNSKEY RRset
- add_dnskey(qctx.response, PARENT, parent_key)
- elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA:
- # Priming, SOA RRset
- add_signed(qctx.response.answer, soa_rrset(PARENT), parent_key)
- elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
- # Trigger query.
- add_parent_mx_with_forged_additional(qctx.response, parent_key)
- elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS:
- # Chain of trust, DS of child.
- add_ds(qctx.response, CHILD, child_key, parent_key)
- elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY:
- # Chain of trust, DNSKEY of child.
- add_dnskey(qctx.response, CHILD, child_key)
- elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA:
- # SOA of child.
- add_signed(qctx.response.answer, soa_rrset(CHILD), child_key)
- elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A:
- # Zone data at child.
- add_child_a(qctx.response, child_key)
- elif qctx.qname.is_subdomain(self.child):
- # No data at child.
- add_nodata(qctx.response, CHILD, child_key)
- elif qctx.qname.is_subdomain(self.parent):
- # No data at parent.
- add_nodata(qctx.response, PARENT, parent_key)
- else:
- qctx.response.set_rcode(dns.rcode.NXDOMAIN)
-
- yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-def main() -> None:
- keys = load_keys()
- server = AsyncDnsServer(default_aa=True)
- server.install_response_handlers(QueryCParentWildcardHandler(keys))
- server.run()
-
-
-if __name__ == "__main__":
- main()
+++ /dev/null
-// validating resolver
-
-options {
- query-source address 10.53.0.2;
- notify-source 10.53.0.2;
- transfer-source 10.53.0.2;
- port @PORT@;
- pid-file "named.pid";
- listen-on { 10.53.0.2; };
- listen-on-v6 { none; };
- recursion yes;
-
- // P1: named runs with dnssec-validation auto or yes
- dnssec-validation yes;
- // P2: minimal-responses is not set to the explicit value yes
- minimal-responses no;
-};
-
-controls {
- inet 10.53.0.2 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
-};
-
-include "../../_common/rndc.key";
-
-zone "." {
- type hint;
- file "../../_common/root.hint";
-};
-
-zone "parent.hack" {
- type static-stub;
- server-addresses { 10.53.0.1; };
-};
-
-trust-anchors {
- parent.hack. static-key 257 3 13 "@PARENT_DNSKEY@";
-};
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from dnssec_wildcard.ans1 import common, f043, f045
+from isctest.asyncserver import AsyncDnsServer
+
+
+def main() -> None:
+ keys = common.load_keys()
+ server = AsyncDnsServer(default_aa=True)
+ server.install_response_handler(f043.F043Handler(keys))
+ server.install_response_handler(f045.F045Handler(keys))
+ server.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from dataclasses import dataclass
+from pathlib import Path
+
+import json
+
+from cryptography.hazmat.primitives import serialization
+
+import dns.dnssec
+import dns.message
+import dns.name
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+TTL = 300
+
+
+@dataclass(frozen=True)
+class Key:
+ zone: dns.name.Name
+ private_key: object
+ dnskey: dns.rdata.Rdata
+ ds: dns.rdata.Rdata
+
+
+def name(text: str) -> dns.name.Name:
+ return dns.name.from_text(text)
+
+
+def load_keys() -> dict[str, Key]:
+ path = Path(".") / "keys.json"
+ with path.open(encoding="utf-8") as keys_file:
+ raw_keys = json.load(keys_file)
+
+ keys = {}
+ for zone, raw_key in raw_keys.items():
+ private_key = serialization.load_pem_private_key(
+ raw_key["private_pem"].encode("ascii"),
+ password=None,
+ )
+ dnskey = dns.rdata.from_text(
+ dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
+ )
+ ds = dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DS, raw_key["ds"])
+ keys[zone] = Key(name(zone), private_key, dnskey, ds)
+
+ return keys
+
+
+def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
+ return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
+
+
+def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
+ return dns.rrset.from_rdata(owner, TTL, rdata)
+
+
+def add_signed(
+ section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
+) -> None:
+ rrsig = dns.dnssec.sign(
+ covered,
+ signer.private_key,
+ signer.zone,
+ signer.dnskey,
+ lifetime=86400,
+ verify=True,
+ )
+ section.append(covered)
+ section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
+
+
+def soa_rrset(zone) -> dns.rrset.RRset:
+ return rrset(
+ zone,
+ dns.rdatatype.SOA,
+ f"ns.{zone} hostmaster.{zone} 1 3600 600 86400 300",
+ )
+
+
+def add_dnskey(response: dns.message.Message, zone: str, key: Key) -> None:
+ add_signed(response.answer, rrset_from_rdata(zone, key.dnskey), key)
+
+
+def wildcard_rrsig(owner: str, a: str, key: Key) -> dns.rrset.RRset:
+ wildcard = rrset(key.zone, dns.rdatatype.A, a)
+ rrsig = dns.dnssec.sign(
+ wildcard,
+ key.private_key,
+ key.zone,
+ key.dnskey,
+ lifetime=86400,
+ verify=True,
+ )
+ return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from collections.abc import AsyncGenerator
+
+import dns.flags
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
+
+from dnssec_wildcard.ans1.common import (
+ Key,
+ add_dnskey,
+ add_signed,
+ name,
+ rrset,
+ soa_rrset,
+ wildcard_rrsig,
+)
+from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext
+
+F043_ZONE = "f043.test."
+F043_QUERY = f"svc.{F043_ZONE}"
+F043_VICTIM = f"victim.{F043_ZONE}"
+
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+
+
+def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None:
+ section.append(rrset(owner, dns.rdatatype.A, FORGED_A))
+ section.append(wildcard_rrsig(owner, FORGED_A, key))
+
+
+class F043Handler(DomainHandler): # Additional from wildcard
+ domains = [F043_ZONE]
+
+ def __init__(self, keys: dict[str, Key]) -> None:
+ super().__init__()
+ self.keys = keys
+
+ if F043_ZONE not in keys:
+ return
+
+ self.key = keys[F043_ZONE]
+ self.zone = name(F043_ZONE)
+ self.query = name(F043_QUERY)
+ self.victim = name(F043_VICTIM)
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ qctx.prepare_new_response(with_zone_data=False)
+ qctx.response.flags |= dns.flags.AA
+ qctx.response.set_rcode(dns.rcode.NOERROR)
+
+ if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY:
+ add_dnskey(qctx.response, self.zone, self.key)
+ elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA:
+ add_signed(qctx.response.answer, soa_rrset(self.zone), self.key)
+ elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+ add_signed(
+ qctx.response.answer,
+ rrset(F043_QUERY, dns.rdatatype.MX, f"10 {F043_VICTIM}"),
+ self.key,
+ )
+ add_wildcard_a(qctx.response.additional, F043_VICTIM, self.key)
+ elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A:
+ add_signed(
+ qctx.response.answer,
+ rrset(F043_VICTIM, dns.rdatatype.A, LEGIT_A),
+ self.key,
+ )
+ else:
+ add_signed(qctx.response.authority, soa_rrset(self.zone), self.key)
+
+ yield DnsResponseSend(qctx.response, authoritative=True)
--- /dev/null
+#!/usr/bin/python3
+
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+
+from collections.abc import AsyncGenerator
+
+import dns.flags
+import dns.message
+import dns.rcode
+import dns.rdatatype
+
+from dnssec_wildcard.ans1.common import (
+ Key,
+ add_dnskey,
+ add_signed,
+ name,
+ rrset,
+ rrset_from_rdata,
+ soa_rrset,
+ wildcard_rrsig,
+)
+from isctest.asyncserver import DnsResponseSend, DomainHandler, QueryContext
+
+TTL = 300
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
+
+
+def add_ds(response: dns.message.Message, zone: str, child: Key, parent: Key) -> None:
+ add_signed(response.answer, rrset_from_rdata(zone, child.ds), parent)
+
+
+def add_nodata(response: dns.message.Message, zone: str, key: Key) -> None:
+ add_signed(response.authority, soa_rrset(zone), key)
+
+
+# A signed zone P, chained to a configured trust anchor, that
+# (a) publishes a wildcard *.P A or AAAA record and
+# (b) delegates at least one separately-signed child C.P (own DS in P)
+# operated by a distinct principal.
+F045_PARENT = "f045.test."
+F045_CHILD = f"child.{F045_PARENT}"
+F045_QUERY = f"q.{F045_PARENT}"
+F045_SERVICE = f"svc.{F045_CHILD}"
+
+
+def add_parent_mx_with_forged_additional(
+ response: dns.message.Message, parent: Key
+) -> None:
+ add_signed(
+ response.answer,
+ rrset(F045_QUERY, dns.rdatatype.MX, f"10 {F045_SERVICE}"),
+ parent,
+ )
+ response.additional.append(rrset(F045_SERVICE, dns.rdatatype.A, FORGED_A))
+ response.additional.append(wildcard_rrsig(F045_SERVICE, FORGED_A, parent))
+
+
+def add_child_a(response: dns.message.Message, child: Key) -> None:
+ add_signed(response.answer, rrset(F045_SERVICE, dns.rdatatype.A, LEGIT_A), child)
+
+
+class F045Handler(DomainHandler):
+ domains = [F045_PARENT, F045_CHILD]
+
+ def __init__(self, keys: dict[str, Key]) -> None:
+ super().__init__()
+ self.keys = keys
+
+ if F045_PARENT not in keys or F045_CHILD not in keys:
+ return
+
+ self.parent = name(F045_PARENT)
+ self.child = name(F045_CHILD)
+ self.query = name(F045_QUERY)
+ self.service = name(F045_SERVICE)
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ qctx.prepare_new_response(with_zone_data=False)
+ qctx.response.flags |= dns.flags.AA
+ qctx.response.set_rcode(dns.rcode.NOERROR)
+
+ parent_key = self.keys[F045_PARENT]
+ child_key = self.keys[F045_CHILD]
+
+ if qctx.qname == self.parent and qctx.qtype == dns.rdatatype.DNSKEY:
+ # Priming, DNSKEY RRset
+ add_dnskey(qctx.response, F045_PARENT, parent_key)
+ elif qctx.qname == self.parent and qctx.qtype == dns.rdatatype.SOA:
+ # Priming, SOA RRset
+ add_signed(qctx.response.answer, soa_rrset(F045_PARENT), parent_key)
+ elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
+ # Trigger query.
+ add_parent_mx_with_forged_additional(qctx.response, parent_key)
+ elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DS:
+ # Chain of trust, DS of child.
+ add_ds(qctx.response, F045_CHILD, child_key, parent_key)
+ elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.DNSKEY:
+ # Chain of trust, DNSKEY of child.
+ add_dnskey(qctx.response, F045_CHILD, child_key)
+ elif qctx.qname == self.child and qctx.qtype == dns.rdatatype.SOA:
+ # SOA of child.
+ add_signed(qctx.response.answer, soa_rrset(F045_CHILD), child_key)
+ elif qctx.qname == self.service and qctx.qtype == dns.rdatatype.A:
+ # Zone data at child.
+ add_child_a(qctx.response, child_key)
+ elif qctx.qname.is_subdomain(self.child):
+ # No data at child.
+ add_nodata(qctx.response, F045_CHILD, child_key)
+ elif qctx.qname.is_subdomain(self.parent):
+ # No data at parent.
+ add_nodata(qctx.response, F045_PARENT, parent_key)
+ else:
+ qctx.response.set_rcode(dns.rcode.NXDOMAIN)
+
+ yield DnsResponseSend(qctx.response, authoritative=True)
file "../../_common/root.hint";
};
+
+{% if PARENT_DNSKEY is defined and PARENT_DNSKEY|length %}
+zone "f045.test" {
+ type static-stub;
+ server-addresses { 10.53.0.1; };
+};
+
+trust-anchors {
+ f045.test. static-key 257 3 13 "@PARENT_DNSKEY@";
+};
+{% else %}
zone "f043.test" {
type static-stub;
server-addresses { 10.53.0.1; };
};
trust-anchors {
- f043.test. static-key 257 3 13 "@ZONE_DNSKEY@";
+ f043.test. static-key 257 3 13 "@TESTZONE_DNSKEY@";
};
+{% endif %}
import isctest
import isctest.mark
-ZONE = "f043.test."
-QUERY = f"svc.{ZONE}"
-VICTIM = f"victim.{ZONE}"
-FORGED_A = "198.51.100.90"
-LEGIT_A = "192.0.2.50"
+TESTZONE = "f043.test."
+QUERY = f"svc.{TESTZONE}"
+VICTIM = f"victim.{TESTZONE}"
+FORGED_A = "198.51.100.45"
+LEGIT_A = "192.0.2.113"
AUTH = "10.53.0.1"
RESOLVER = "10.53.0.2"
pytestmark = [
- isctest.mark.with_algorithm("ECDSAP256SHA256"),
+ isctest.mark.with_ecdsa_deterministic,
pytest.mark.extra_artifacts(
[
"ans*/ans.run",
]
-def _make_key():
+def _make_key(zone):
private_key = ec.generate_private_key(ec.SECP256R1())
dnskey = dns.dnssec.make_dnskey(
private_key.public_key(),
algorithm="ECDSAP256SHA256",
flags=257,
)
+ ds = dns.dnssec.make_ds(dns.name.from_text(zone), dnskey, "SHA256")
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
return {
"private_pem": private_pem,
"dnskey": dnskey.to_text(),
+ "ds": ds.to_text(),
}
def bootstrap():
- keys = {ZONE: _make_key()}
+ keys = {TESTZONE: _make_key(TESTZONE)}
Path("ans1/keys.json").write_text(json.dumps(keys, indent=2), encoding="ascii")
- zone_dnskey = "".join(keys[ZONE]["dnskey"].split()[3:])
- return {"ZONE_DNSKEY": zone_dnskey}
+ zone_dnskey = "".join(keys[TESTZONE]["dnskey"].split()[3:])
+ return {"TESTZONE_DNSKEY": zone_dnskey}
def _query(server, qname, qtype, cd=False):
carrier.additional,
VICTIM,
dns.rdatatype.A,
- ZONE,
+ TESTZONE,
labels=2,
)
def test_resolver_rejects_fromwildcard_additional_replay():
- soa = _query(RESOLVER, ZONE, "SOA")
+ soa = _query(RESOLVER, TESTZONE, "SOA")
isctest.check.noerror(soa)
isctest.check.adflag(soa)
isctest.check.adflag(response)
assert not _has_a(response, response.answer, VICTIM, FORGED_A), response.to_text()
assert _has_a(response, response.answer, VICTIM, LEGIT_A), response.to_text()
- _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, ZONE)
+ _check_rrsig(response, response.answer, VICTIM, dns.rdatatype.A, TESTZONE)
from cryptography.hazmat.primitives.asymmetric import ec
import dns.dnssec
-import dns.flags
import dns.name
import dns.rdataclass
import dns.rdatatype
import isctest
import isctest.mark
-PARENT = "parent.hack."
+PARENT = "f045.test."
CHILD = f"child.{PARENT}"
QUERY = f"q.{PARENT}"
SERVICE = f"svc.{CHILD}"
RESOLVER = "10.53.0.2"
pytestmark = [
- isctest.mark.with_algorithm("ECDSAP256SHA256"),
+ isctest.mark.with_ecdsa_deterministic,
pytest.mark.extra_artifacts(
[
"ans*/ans.run",
assert _rrset(response, response.answer, QUERY, dns.rdatatype.MX)
_check_signed_rrset(response, response.answer, QUERY, dns.rdatatype.MX, PARENT)
- # The reply carries in ADDITIONAL. Note Labels=2, signer=parent.hack.
+ # The reply carries in ADDITIONAL. Note Labels=2, signer=f045.test.
assert _has_a(response, response.additional, SERVICE, FORGED_A), response.to_text()
_check_signed_rrset(
response,
+++ /dev/null
-#!/usr/bin/python3
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-
-from collections.abc import AsyncGenerator
-from dataclasses import dataclass
-from pathlib import Path
-
-import json
-
-from cryptography.hazmat.primitives import serialization
-
-import dns.dnssec
-import dns.flags
-import dns.message
-import dns.name
-import dns.rdata
-import dns.rdataclass
-import dns.rcode
-import dns.rdatatype
-import dns.rrset
-
-from isctest.asyncserver import (
- AsyncDnsServer,
- DnsResponseSend,
- QueryContext,
- ResponseHandler,
-)
-
-TTL = 300
-ZONE = "f043.test."
-QUERY = f"svc.{ZONE}"
-VICTIM = f"victim.{ZONE}"
-WILDCARD = f"*.{ZONE}"
-FORGED_A = "198.51.100.90"
-LEGIT_A = "192.0.2.50"
-
-
-@dataclass(frozen=True)
-class Key:
- zone: dns.name.Name
- private_key: object
- dnskey: dns.rdata.Rdata
-
-
-def name(text: str) -> dns.name.Name:
- return dns.name.from_text(text)
-
-
-def load_key() -> Key:
- path = Path(__file__).resolve().parent / "keys.json"
- with path.open(encoding="utf-8") as keys_file:
- raw_key = json.load(keys_file)[ZONE]
-
- private_key = serialization.load_pem_private_key(
- raw_key["private_pem"].encode("ascii"),
- password=None,
- )
- dnskey = dns.rdata.from_text(
- dns.rdataclass.IN, dns.rdatatype.DNSKEY, raw_key["dnskey"]
- )
- return Key(name(ZONE), private_key, dnskey)
-
-
-def rrset(owner: str, rdtype: dns.rdatatype.RdataType, *rdatas: str) -> dns.rrset.RRset:
- return dns.rrset.from_text(owner, TTL, dns.rdataclass.IN, rdtype, *rdatas)
-
-
-def rrset_from_rdata(owner: str, rdata: dns.rdata.Rdata) -> dns.rrset.RRset:
- return dns.rrset.from_rdata(name(owner), TTL, rdata)
-
-
-def add_signed(
- section: list[dns.rrset.RRset], covered: dns.rrset.RRset, signer: Key
-) -> None:
- rrsig = dns.dnssec.sign(
- covered,
- signer.private_key,
- signer.zone,
- signer.dnskey,
- lifetime=86400,
- verify=True,
- )
- section.append(covered)
- section.append(dns.rrset.from_rdata(covered.name, covered.ttl, rrsig))
-
-
-def soa_rrset() -> dns.rrset.RRset:
- return rrset(
- ZONE,
- dns.rdatatype.SOA,
- f"ns.{ZONE} hostmaster.{ZONE} 1 3600 600 86400 300",
- )
-
-
-def add_dnskey(response: dns.message.Message, key: Key) -> None:
- add_signed(response.answer, rrset_from_rdata(ZONE, key.dnskey), key)
-
-
-def wildcard_rrsig(owner: str, key: Key) -> dns.rrset.RRset:
- wildcard = rrset(WILDCARD, dns.rdatatype.A, FORGED_A)
- rrsig = dns.dnssec.sign(
- wildcard,
- key.private_key,
- key.zone,
- key.dnskey,
- lifetime=86400,
- verify=True,
- )
- return dns.rrset.from_rdata(name(owner), wildcard.ttl, rrsig)
-
-
-def add_wildcard_a(section: list[dns.rrset.RRset], owner: str, key: Key) -> None:
- section.append(rrset(owner, dns.rdatatype.A, FORGED_A))
- section.append(wildcard_rrsig(owner, key))
-
-
-class FromWildcardAdditionalHandler(ResponseHandler):
- def __init__(self, key: Key) -> None:
- self.key = key
- self.zone = name(ZONE)
- self.query = name(QUERY)
- self.victim = name(VICTIM)
-
- def match(self, qctx: QueryContext) -> bool:
- return qctx.qname.is_subdomain(self.zone)
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- qctx.prepare_new_response(with_zone_data=False)
- qctx.response.flags |= dns.flags.AA
- qctx.response.set_rcode(dns.rcode.NOERROR)
-
- if qctx.qname == self.zone and qctx.qtype == dns.rdatatype.DNSKEY:
- add_dnskey(qctx.response, self.key)
- elif qctx.qname == self.zone and qctx.qtype == dns.rdatatype.SOA:
- add_signed(qctx.response.answer, soa_rrset(), self.key)
- elif qctx.qname == self.query and qctx.qtype == dns.rdatatype.MX:
- add_signed(
- qctx.response.answer,
- rrset(QUERY, dns.rdatatype.MX, f"10 {VICTIM}"),
- self.key,
- )
- add_wildcard_a(qctx.response.additional, VICTIM, self.key)
- elif qctx.qname == self.victim and qctx.qtype == dns.rdatatype.A:
- add_signed(
- qctx.response.answer,
- rrset(VICTIM, dns.rdatatype.A, LEGIT_A),
- self.key,
- )
- else:
- add_signed(qctx.response.authority, soa_rrset(), self.key)
-
- yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-def main() -> None:
- server = AsyncDnsServer(default_aa=True)
- server.install_response_handlers(FromWildcardAdditionalHandler(load_key()))
- server.run()
-
-
-if __name__ == "__main__":
- main()