-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-############################################################################
-# ans.py: See README.anspy for details.
-############################################################################
-
-from __future__ import print_function
-import os
-import sys
-import signal
-import socket
-import select
-from datetime import datetime, timedelta
-import functools
-
-import dns, dns.message, dns.query
-from dns.rdatatype import *
-from dns.rdataclass import *
-from dns.rcode import *
-from dns.name import *
-
-############################################################################
-# set up the RRs to be returned in the next answer
-#
-# the message contains up to two pipe-separated ('|') fields.
-#
-# the first field of the message is a comma-separated list
-# of actions indicating what to put into the answer set
-# (e.g., a dname, a cname, another cname, etc)
-#
-# supported actions:
-# - cname (cname from the current name to a new one in the same domain)
-# - dname (dname to a new domain, plus a synthesized cname)
-# - xname ("external" cname, to a new name in a new domain)
-#
-# example: xname, dname, cname represents a CNAME to an external
-# domain which is then answered by a DNAME and synthesized
-# CNAME pointing to yet another domain, which is then answered
-# by a CNAME within the same domain, and finally an answer
-# to the query. each RR in the answer set has a corresponding
-# RRSIG. these signatures are not valid, but will exercise the
-# response parser.
-#
-# the second field is a comma-separated list of which RRs in the
-# answer set to include in the answer, in which order. if prepended
-# with 's', the number indicates which signature to include.
-#
-# examples: for the answer set "cname, cname, cname", an rr set
-# '1, s1, 2, s2, 3, s3, 4, s4' indicates that all four RRs should
-# be included in the answer, with siagntures, in the original
-# order, while 4, s4, 3, s3, 2, s2, 1, s1' indicates the order
-# should be reversed, 's3, s3, s3, s3' indicates that the third
-# RRSIG should be repeated four times and everything else should
-# be omitted, and so on.
-#
-# if there is no second field (i.e., no pipe symbol appears in
-# the line) , the default is to send all answers and signatures.
-# if a pipe symbol exists but the second field is empty, then
-# nothing is sent at all.
-############################################################################
-actions = []
-rrs = []
-
-
-def ctl_channel(msg):
- global actions, rrs
-
- msg = msg.splitlines().pop(0)
- print("received control message: %s" % msg)
-
- msg = msg.split(b"|")
- if len(msg) == 0:
- return
-
- actions = [x.strip() for x in msg[0].split(b",")]
- n = functools.reduce(
- lambda n, act: (n + (2 if act == b"dname" else 1)), [0] + actions
- )
-
- if len(msg) == 1:
- rrs = []
- for i in range(n):
- for b in [False, True]:
- rrs.append((i, b))
- return
-
- rlist = [x.strip() for x in msg[1].split(b",")]
- rrs = []
- for item in rlist:
- if item[0] == b"s"[0]:
- i = int(item[1:].strip()) - 1
- if i > n:
- print("invalid index %d" + (i + 1))
- continue
- rrs.append((int(item[1:]) - 1, True))
- else:
- i = int(item) - 1
- if i > n:
- print("invalid index %d" % (i + 1))
- continue
- rrs.append((i, False))
-
-
-############################################################################
-# Respond to a DNS query.
-############################################################################
-def create_response(msg):
- m = dns.message.from_wire(msg)
- qname = m.question[0].name.to_text()
- labels = qname.lower().split(".")
- wantsigs = True if m.ednsflags & dns.flags.DO else False
-
- # get qtype
- rrtype = m.question[0].rdtype
- typename = dns.rdatatype.to_text(rrtype)
-
- # for 'www.example.com.'...
- # - name is 'www'
- # - domain is 'example.com.'
- # - sld is 'example'
- # - tld is 'com.'
- name = labels.pop(0)
- domain = ".".join(labels)
- sld = labels.pop(0)
- tld = ".".join(labels)
-
- print("query: " + qname + "/" + typename)
- print("domain: " + domain)
-
- # default answers, depending on QTYPE.
- # currently only A, AAAA, TXT and NS are supported.
- ttl = 86400
- additionalA = "10.53.0.4"
- additionalAAAA = "fd92:7065:b8e:ffff::4"
- if typename == "A":
- final = "10.53.0.4"
- elif typename == "AAAA":
- final = "fd92:7065:b8e:ffff::4"
- elif typename == "TXT":
- final = "Some\ text\ here"
- elif typename == "NS":
- domain = qname
- final = "ns1.%s" % domain
- else:
- final = None
-
- # RRSIG rdata - won't validate but will exercise response parsing
- t = datetime.now()
- delta = timedelta(30)
- t1 = t - delta
- t2 = t + delta
- inception = t1.strftime("%Y%m%d000000")
- expiry = t2.strftime("%Y%m%d000000")
- sigdata = "OCXH2De0yE4NMTl9UykvOsJ4IBGs/ZIpff2rpaVJrVG7jQfmj50otBAp A0Zo7dpBU4ofv0N/F2Ar6LznCncIojkWptEJIAKA5tHegf/jY39arEpO cevbGp6DKxFhlkLXNcw7k9o7DSw14OaRmgAjXdTFbrl4AiAa0zAttFko Tso="
-
- # construct answer set.
- answers = []
- sigs = []
- curdom = domain
- curname = name
- i = 0
-
- for action in actions:
- if name != "test":
- continue
- if action == b"xname":
- owner = curname + "." + curdom
- newname = "cname%d" % i
- i += 1
- newdom = "domain%d.%s" % (i, tld)
- i += 1
- target = newname + "." + newdom
- print("add external CNAME %s to %s" % (owner, target))
- answers.append(dns.rrset.from_text(owner, ttl, IN, CNAME, target))
- rrsig = "CNAME 5 3 %d %s %s 12345 %s %s" % (
- ttl,
- expiry,
- inception,
- domain,
- sigdata,
- )
- print("add external RRISG(CNAME) %s to %s" % (owner, target))
- sigs.append(dns.rrset.from_text(owner, ttl, IN, RRSIG, rrsig))
- curname = newname
- curdom = newdom
- continue
-
- if action == b"cname":
- owner = curname + "." + curdom
- newname = "cname%d" % i
- target = newname + "." + curdom
- i += 1
- print("add CNAME %s to %s" % (owner, target))
- answers.append(dns.rrset.from_text(owner, ttl, IN, CNAME, target))
- rrsig = "CNAME 5 3 %d %s %s 12345 %s %s" % (
- ttl,
- expiry,
- inception,
- domain,
- sigdata,
- )
- print("add RRSIG(CNAME) %s to %s" % (owner, target))
- sigs.append(dns.rrset.from_text(owner, ttl, IN, RRSIG, rrsig))
- curname = newname
- continue
-
- if action == b"dname":
- owner = curdom
- newdom = "domain%d.%s" % (i, tld)
- i += 1
- print("add DNAME %s to %s" % (owner, newdom))
- answers.append(dns.rrset.from_text(owner, ttl, IN, DNAME, newdom))
- rrsig = "DNAME 5 3 %d %s %s 12345 %s %s" % (
- ttl,
- expiry,
- inception,
- domain,
- sigdata,
- )
- print("add RRSIG(DNAME) %s to %s" % (owner, newdom))
- sigs.append(dns.rrset.from_text(owner, ttl, IN, RRSIG, rrsig))
- owner = curname + "." + curdom
- target = curname + "." + newdom
- print("add synthesized CNAME %s to %s" % (owner, target))
- answers.append(dns.rrset.from_text(owner, ttl, IN, CNAME, target))
- rrsig = "CNAME 5 3 %d %s %s 12345 %s %s" % (
- ttl,
- expiry,
- inception,
- domain,
- sigdata,
- )
- print("add synthesized RRSIG(CNAME) %s to %s" % (owner, target))
- sigs.append(dns.rrset.from_text(owner, ttl, IN, RRSIG, rrsig))
- curdom = newdom
- continue
-
- # now add the final answer
- owner = curname + "." + curdom
- answers.append(dns.rrset.from_text(owner, ttl, IN, rrtype, final))
- rrsig = "%s 5 3 %d %s %s 12345 %s %s" % (
- typename,
- ttl,
- expiry,
- inception,
- domain,
- sigdata,
- )
- sigs.append(dns.rrset.from_text(owner, ttl, IN, RRSIG, rrsig))
-
- # prepare the response and convert to wire format
- r = dns.message.make_response(m)
-
- if name != "test":
- r.answer.append(answers[-1])
- if wantsigs:
- r.answer.append(sigs[-1])
- else:
- for i, sig in rrs:
- if sig and not wantsigs:
- continue
- elif sig:
- r.answer.append(sigs[i])
- else:
- r.answer.append(answers[i])
-
- if typename != "NS":
- r.authority.append(
- dns.rrset.from_text(domain, ttl, IN, "NS", ("ns1.%s" % domain))
- )
- r.additional.append(
- dns.rrset.from_text(("ns1.%s" % domain), 86400, IN, A, additionalA)
- )
- r.additional.append(
- dns.rrset.from_text(("ns1.%s" % domain), 86400, IN, AAAA, additionalAAAA)
- )
-
- r.flags |= dns.flags.AA
- r.use_edns()
- return r.to_wire()
-
-
-def sigterm(signum, frame):
- print("Shutting down now...")
- os.remove("ans.pid")
- running = False
- sys.exit(0)
-
-
-############################################################################
-# Main
-#
-# Set up responder and control channel, open the pid file, and start
-# the main loop, listening for queries on the query channel or commands
-# on the control channel and acting on them.
-############################################################################
-ip4 = "10.53.0.4"
-ip6 = "fd92:7065:b8e:ffff::4"
+"""
+Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+
+SPDX-License-Identifier: MPL-2.0
+
+This Source Code Form is subject to the terms of the Mozilla Public
+License, v. 2.0. If a copy of the MPL was not distributed with this
+file, you can obtain one at https://mozilla.org/MPL/2.0/.
+
+See the COPYRIGHT file distributed with this work for additional
+information regarding copyright ownership.
+"""
+
+from dataclasses import dataclass
+from enum import Enum
+from typing import AsyncGenerator, List, Optional, Tuple
+
+import abc
+import logging
+import re
+
+import dns.rcode
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+from isctest.asyncserver import (
+ ControlCommand,
+ ControllableAsyncDnsServer,
+ DnsResponseSend,
+ DomainHandler,
+ QueryContext,
+ ResponseAction,
+)
try:
- port = int(os.environ["PORT"])
-except:
- port = 5300
+ RdataType = dns.rdatatype.RdataType
+except AttributeError: # dnspython < 2.0.0 compat
+ RdataType = int # type: ignore
+
+
+class ChainNameGenerator:
+ """
+ Convenience class generating sequential owner/target names used in chained
+ responses.
+
+ >>> name_generator = ChainNameGenerator()
+ >>> name_generator.current_name
+ <DNS name test.domain.nil.>
+ >>> name_generator.generate_next_name()
+ <DNS name cname0.domain.nil.>
+ >>> name_generator.generate_next_name()
+ <DNS name cname1.domain.nil.>
+ >>> name_generator.generate_next_sld()
+ <DNS name domain2.nil.>
+ >>> name_generator.generate_next_sld()
+ <DNS name domain3.nil.>
+ >>> name_generator.current_name
+ <DNS name cname1.domain3.nil.>
+ >>> name_generator.generate_next_name()
+ <DNS name cname4.domain3.nil.>
+ >>> name_generator.generate_next_name_in_next_sld()
+ <DNS name cname5.domain6.nil.>
+ >>> name_generator.generate_next_name_in_next_sld()
+ <DNS name cname7.domain8.nil.>
+ """
+
+ def __init__(self) -> None:
+ self._i = 0
+ self._current_label = dns.name.Name(["test"])
+ self._current_sld = dns.name.Name(["domain"])
+ self._tld = dns.name.Name(["nil", ""])
+
+ @property
+ def current_name(self) -> dns.name.Name:
+ return self._current_label.concatenate(self.current_domain)
+
+ @property
+ def current_domain(self) -> dns.name.Name:
+ return self._current_sld.concatenate(self._tld)
+
+ def generate_next_name(self) -> dns.name.Name:
+ self._current_label = dns.name.Name([f"cname{self._i}"])
+ self._i += 1
+ return self.current_name
+
+ def generate_next_sld(self) -> dns.name.Name:
+ self._current_sld = dns.name.Name([f"domain{self._i}"])
+ self._i += 1
+ return self.current_domain
+
+ def generate_next_name_in_next_sld(self) -> dns.name.Name:
+ self.generate_next_name()
+ self.generate_next_sld()
+ return self.current_name
+
+
+class RecordGenerator(abc.ABC):
+ """
+ An abstract class used as a base class for RRset generators (see the
+ description of "actions" in `ChainSetupCommand`) and as a convenience class
+ for creating RRsets in `ChainResponseHandler`.
+ """
+
+ @classmethod
+ def create_rrset(
+ cls, owner: dns.name.Name, rrtype: RdataType, rdata: str
+ ) -> dns.rrset.RRset:
+ return dns.rrset.from_text(owner, 86400, dns.rdataclass.IN, rrtype, rdata)
+
+ @classmethod
+ def create_rrset_signature(
+ cls, owner: dns.name.Name, rrtype: RdataType
+ ) -> dns.rrset.RRset:
+ covers = dns.rdatatype.to_text(rrtype)
+ ttl = "86400"
+ expiry = "20900101000000"
+ inception = "20250101000000"
+ domain = "domain.nil."
+ sigdata = "OCXH2De0yE4NMTl9UykvOsJ4IBGs/ZIpff2rpaVJrVG7jQfmj50otBAp "
+ sigdata += "A0Zo7dpBU4ofv0N/F2Ar6LznCncIojkWptEJIAKA5tHegf/jY39arEpO "
+ sigdata += "cevbGp6DKxFhlkLXNcw7k9o7DSw14OaRmgAjXdTFbrl4AiAa0zAttFko "
+ sigdata += "Tso="
+ rdata = f"{covers} 5 3 {ttl} {expiry} {inception} 12345 {domain} {sigdata}"
+ return cls.create_rrset(owner, dns.rdatatype.RRSIG, rdata)
+
+ def __init__(self, name_generator: ChainNameGenerator) -> None:
+ self._name_generator = name_generator
+
+ def get_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ """
+ Return the lists of records and their signatures that should be
+ generated in response to a given "action".
+
+ This method is a wrapper around `generate_rrsets()` that ensures all
+ derived classes obey their promises about the number of records they
+ generate.
+ """
+ responses, signatures = self.generate_rrsets()
+ assert len(responses) == self.response_count
+ assert len(signatures) == self.response_count
+ return responses, signatures
+
+ @property
+ @abc.abstractmethod
+ def response_count(self) -> int:
+ """
+ How many records this generator creates each time the "action"
+ associated with it is used. Every generated record needs to be
+ accompanied by its corresponding signature, so e.g. setting this to 1
+ causes `get_rrsets()` callers to expect it to return two RRset lists,
+ each containing one RRset.
+
+ This property could be derived from the size of the lists returned by
+ `generate_rrsets()`, but it is left as a separate value to enable early
+ detection of invalid "selector" indexes when the control commands are
+ first parsed.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ """
+ Return the lists of records and their signatures that should be
+ generated in response to a given "action".
+
+ This method must be defined by every derived class, but RecordGenerator
+ users should call `get_rrsets()` instead.
+ """
+ raise NotImplementedError
+
+
+class CnameRecordGenerator(RecordGenerator):
+
+ response_count = 1
+
+ def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ owner = self._name_generator.current_name
+ target = self._name_generator.generate_next_name().to_text()
+ response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
+ signature = self.create_rrset_signature(owner, response.rdtype)
+ return [response], [signature]
+
+
+class DnameRecordGenerator(RecordGenerator):
+
+ response_count = 2
+
+ def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ dname_owner = self._name_generator.current_domain
+ cname_owner = self._name_generator.current_name
+ dname_target = self._name_generator.generate_next_sld().to_text()
+ cname_target = self._name_generator.current_name.to_text()
+ dname_response = self.create_rrset(
+ dname_owner, dns.rdatatype.DNAME, dname_target
+ )
+ cname_response = self.create_rrset(
+ cname_owner, dns.rdatatype.CNAME, cname_target
+ )
+ dname_signature = self.create_rrset_signature(
+ dname_owner, dname_response.rdtype
+ )
+ cname_signature = self.create_rrset_signature(
+ cname_owner, cname_response.rdtype
+ )
+ return [dname_response, cname_response], [dname_signature, cname_signature]
-try:
- ctrlport = int(os.environ["EXTRAPORT1"])
-except:
- ctrlport = 5300
-query4_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-query4_socket.bind((ip4, port))
+class XnameRecordGenerator(RecordGenerator):
-havev6 = True
-try:
- query6_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- try:
- query6_socket.bind((ip6, port))
- except:
- query6_socket.close()
- havev6 = False
-except:
- havev6 = False
-
-ctrl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-ctrl_socket.bind((ip4, ctrlport))
-ctrl_socket.listen(5)
-
-signal.signal(signal.SIGTERM, sigterm)
-
-f = open("ans.pid", "w")
-pid = os.getpid()
-print(pid, file=f)
-f.close()
-
-running = True
-
-print("Listening on %s port %d" % (ip4, port))
-if havev6:
- print("Listening on %s port %d" % (ip6, port))
-print("Control channel on %s port %d" % (ip4, ctrlport))
-print("Ctrl-c to quit")
-
-if havev6:
- input = [query4_socket, query6_socket, ctrl_socket]
-else:
- input = [query4_socket, ctrl_socket]
-
-while running:
- try:
- inputready, outputready, exceptready = select.select(input, [], [])
- except select.error as e:
- break
- except socket.error as e:
- break
- except KeyboardInterrupt:
- break
-
- for s in inputready:
- if s == ctrl_socket:
- # Handle control channel input
- conn, addr = s.accept()
- print("Control channel connected")
- while True:
- msg = conn.recv(65535)
- if not msg:
- break
- ctl_channel(msg)
- conn.close()
- if s == query4_socket or s == query6_socket:
- print("Query received on %s" % (ip4 if s == query4_socket else ip6))
- # Handle incoming queries
- msg = s.recvfrom(65535)
- rsp = create_response(msg[0])
- if rsp:
- s.sendto(rsp, msg[1])
- if not running:
- break
+ response_count = 1
+
+ def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ owner = self._name_generator.current_name
+ target = self._name_generator.generate_next_name_in_next_sld().to_text()
+ response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
+ signature = self.create_rrset_signature(owner, response.rdtype)
+ return [response], [signature]
+
+
+class FinalRecordGenerator(RecordGenerator):
+
+ response_count = 1
+
+ def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ owner = self._name_generator.current_name
+ response = self.create_rrset(owner, dns.rdatatype.A, "10.53.0.4")
+ signature = self.create_rrset_signature(owner, response.rdtype)
+ return [response], [signature]
+
+
+class ChainAction(Enum):
+ """
+ Chained answer types that this server can send. `ChainSetupCommand` sets
+ up a collection of these for generating responses.
+ """
+
+ CNAME = CnameRecordGenerator
+ DNAME = DnameRecordGenerator
+ XNAME = XnameRecordGenerator
+ FINAL = FinalRecordGenerator
+
+
+@dataclass(frozen=True)
+class ChainSelector:
+ """
+ A "selector" for a specific RRset - one of all possible RRsets generated by
+ `ChainAction`s - to include in responses to queries.
+ """
+
+ response_index: int
+ response_signature: bool
+
+
+class ChainSetupCommand(ControlCommand):
+ """
+ Set up a chained response to return for subsequent queries.
+
+ The control query consists of two label sequences separated by a `_` label.
+
+ The first label sequence is a set of "actions"; these cause a set of
+ response RRsets to be generated. Valid labels in that sequence are:
+
+ - `cname`: CNAME from the current name to a new one in the same domain,
+ - `dname`: DNAME to a new domain, plus a synthesized CNAME,
+ - `xname`: "external" CNAME, to a new name in a new domain.
+
+ The final response to the client query (an A RRset) is automatically
+ appended to the ANSWER section of every response.
+
+ Example: `xname.dname.cname` represents a CNAME to an external domain which
+ is then answered by a DNAME and a synthesized CNAME pointing to yet another
+ domain, which is then answered by a CNAME within the same domain, and
+ finally an answer to the query.
+
+ Each of the generated RRsets is associated with a corresponding RRSIG.
+ These signatures are not valid, but are intended to exercise the response
+ parser.
+
+ The second label sequence is a set of "selectors"; these specify which
+ RRsets out of all the possible RRsets generated by "actions" to actually
+ include in the answer and in what order. The RRsets are indexed starting
+ from 1. If prepended with `s`, the number indicates which signature to
+ include.
+
+ Examples:
+
+ - `cname.cname.cname._.1.s1.2.s2.3.s3.4.s4` indicates that all four
+ RRsets (three CNAME RRsets + one A RRset with the final answer) should
+ be included in the answer, with their corresponding signatures, in the
+ original order,
+
+ - `cname.cname.cname._.4.s4.3.s3.2.s2.1.s1` causes the same RRsets to be
+ returned, but in reverse order,
+
+ - `cname.cname.cname._.s3.s3.s3.s3` causes the RRSIG RRset for the third
+ CNAME to be repeated four times in the response and everything else to
+ be omitted.
+ """
+
+ control_subdomain = "setup-chain"
+
+ def __init__(self) -> None:
+ self._current_handler: Optional[ChainResponseHandler] = None
+
+ def handle(
+ self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ ) -> Optional[str]:
+ try:
+ actions, selectors = self._parse_args(args)
+ except ValueError as exc:
+ qctx.response.set_rcode(dns.rcode.SERVFAIL)
+ logging.error("%s", exc)
+ return str(exc)
+
+ if self._current_handler:
+ server.uninstall_response_handler(self._current_handler)
+
+ answer_rrsets = self._prepare_answer(actions, selectors)
+
+ self._current_handler = ChainResponseHandler(answer_rrsets)
+ server.install_response_handler(self._current_handler)
+
+ return "chain response setup successful"
+
+ def _parse_args(
+ self, args: List[str]
+ ) -> Tuple[List[ChainAction], List[ChainSelector]]:
+ try:
+ delimiter = args.index("_")
+ except ValueError as exc:
+ raise ValueError("chain setup delimiter not found in QNAME") from exc
+
+ args_actions = args[:delimiter]
+ actions = self._parse_args_actions(args_actions)
+
+ args_selectors = args[delimiter + 1 :]
+ selectors = self._parse_args_selectors(args_selectors, actions)
+
+ return actions, selectors
+
+ def _parse_args_actions(self, args_actions: List[str]) -> List[ChainAction]:
+ actions = []
+
+ for action in args_actions + ["FINAL"]:
+ try:
+ actions.append(ChainAction[action.upper()])
+ except KeyError as exc:
+ raise ValueError(f"unsupported action '{action}'") from exc
+
+ return actions
+
+ def _parse_args_selectors(
+ self, args_selectors: List[str], actions: List[ChainAction]
+ ) -> List[ChainSelector]:
+ max_response_index = self._get_max_response_index(actions)
+ selectors = []
+
+ for selector in args_selectors:
+ match = re.match(r"^(?P<signature>s?)(?P<index>[0-9]+)$", selector)
+ if not match:
+ raise ValueError(f"invalid selector '{selector}'")
+ response_index = int(match.group("index"))
+ if response_index > max_response_index:
+ raise ValueError(
+ f"invalid response index {response_index} in '{selector}'"
+ )
+ response_signature = bool(match.group("signature"))
+ selectors.append(ChainSelector(response_index, response_signature))
+
+ return selectors
+
+ def _get_max_response_index(self, actions: List[ChainAction]) -> int:
+ rrset_generator_classes = [a.value for a in actions]
+ return sum(g.response_count for g in rrset_generator_classes)
+
+ def _prepare_answer(
+ self, actions: List[ChainAction], selectors: List[ChainSelector]
+ ) -> List[dns.rrset.RRset]:
+ all_responses, all_signatures = self._generate_rrsets(actions)
+ return self._select_rrsets(all_responses, all_signatures, selectors)
+
+ def _generate_rrsets(
+ self, actions: List[ChainAction]
+ ) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ all_responses = []
+ all_signatures = []
+ name_generator = ChainNameGenerator()
+
+ for action in actions:
+ rrset_generator_class = action.value
+ rrset_generator = rrset_generator_class(name_generator)
+ responses, signatures = rrset_generator.get_rrsets()
+ all_responses.extend(responses)
+ all_signatures.extend(signatures)
+
+ return all_responses, all_signatures
+
+ def _select_rrsets(
+ self,
+ all_responses: List[dns.rrset.RRset],
+ all_signatures: List[dns.rrset.RRset],
+ selectors: List[ChainSelector],
+ ) -> List[dns.rrset.RRset]:
+ rrsets = []
+
+ for selector in selectors:
+ index = selector.response_index - 1
+ source = all_signatures if selector.response_signature else all_responses
+ rrsets.append(source[index])
+
+ return rrsets
+
+
+class ChainResponseHandler(DomainHandler):
+ """
+ For trigger queries (`test.domain.nil`), return a chained response
+ previously prepared by `ChainSetupCommand`.
+
+ For any other query, return a non-chained response (a single A RRset).
+ """
+
+ domains = ["domain.nil."]
+
+ def __init__(self, answer_rrsets: List[dns.rrset.RRset]):
+ super().__init__()
+ self._answer_rrsets = answer_rrsets
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[ResponseAction, None]:
+ trigger_qname = dns.name.from_text("test.domain.nil.")
+ if qctx.qname == trigger_qname:
+ answer_rrsets = self._answer_rrsets
+ else:
+ answer_rrsets = self._non_chain_answer(qctx)
+
+ for rrset in answer_rrsets:
+ qctx.response.answer.append(rrset)
+ for rrset in self._authority_rrsets:
+ qctx.response.authority.append(rrset)
+ for rrset in self._additional_rrsets:
+ qctx.response.additional.append(rrset)
+
+ qctx.response.set_rcode(dns.rcode.NOERROR)
+ qctx.response.use_edns()
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+ def _non_chain_answer(self, qctx: QueryContext) -> List[dns.rrset.RRset]:
+ owner = qctx.qname
+ return [
+ RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
+ RecordGenerator.create_rrset_signature(owner, dns.rdatatype.A),
+ ]
+
+ @property
+ def _authority_rrsets(self) -> List[dns.rrset.RRset]:
+ owner = dns.name.from_text("domain.nil.")
+ return [
+ RecordGenerator.create_rrset(owner, dns.rdatatype.NS, "ns1.domain.nil."),
+ ]
+
+ @property
+ def _additional_rrsets(self) -> List[dns.rrset.RRset]:
+ owner = dns.name.from_text("ns1.domain.nil.")
+ return [
+ RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
+ RecordGenerator.create_rrset(
+ owner, dns.rdatatype.AAAA, "fd92:7065:b8e:ffff::4"
+ ),
+ ]
+
+
+def main() -> None:
+ server = ControllableAsyncDnsServer(commands=[ChainSetupCommand])
+ server.run()
+
+
+if __name__ == "__main__":
+ main()
RNDCCMD="$RNDC -c ../_common/rndc.conf -p ${CONTROLPORT} -s"
sendcmd() {
- send 10.53.0.4 "${EXTRAPORT1}"
+ SERVER="${1}"
+ COMMAND="${2}"
+ COMMAND_ARGS="${3}"
+ $DIG $DIGOPTS "@${SERVER}" "${COMMAND_ARGS}.${COMMAND}._control." TXT +time=5 +tries=1 +tcp >/dev/null 2>&1
}
status=0
echo_i "checking CNAME chains in various orders ($n)"
ret=0
$RNDCCMD 10.53.0.7 null --- start test$n - step 1 --- 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|1,2,3,4,s1,s2,s3,s4" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.1.2.3.4.s1.s2.s3.s4"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.1.$n 2>&1
grep 'status: NOERROR' dig.out.1.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.1.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 2 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|1,1,2,2,3,4,s4,s3,s1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.1.1.2.2.3.4.s4.s3.s1"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.2.$n 2>&1
grep 'status: NOERROR' dig.out.2.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.2.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 3 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|2,1,3,4,s3,s1,s2,s4" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.2.1.3.4.s3.s1.s2.s4"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.3.$n 2>&1
grep 'status: NOERROR' dig.out.3.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.3.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 4 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|4,3,2,1,s4,s3,s2,s1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.4.3.2.1.s4.s3.s2.s1"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.4.$n 2>&1
grep 'status: NOERROR' dig.out.4.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.4.$n >/dev/null 2>&1 || ret=1
-echo "cname,cname,cname|4,3,2,1,s4,s3,s2,s1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.4.3.2.1.s4.s3.s2.s1"
$RNDCCMD 10.53.0.7 null --- start test$n - step 5 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.5.$n 2>&1
grep 'ANSWER: 2' dig.out.5.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 6 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|4,3,3,3,s1,s1,1,3,4" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.4.3.3.3.s1.s1.1.3.4"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.6.$n 2>&1
grep 'status: NOERROR' dig.out.6.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.6.$n >/dev/null 2>&1 || ret=1
echo_i "checking that only the initial CNAME is cached ($n)"
ret=0
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "cname,cname,cname|1,2,3,4,s1,s2,s3,s4" | sendcmd
+sendcmd 10.53.0.4 setup-chain "cname.cname.cname._.1.2.3.4.s1.s2.s3.s4"
$RNDCCMD 10.53.0.7 null --- start test$n --- 2>&1 | sed 's/^/ns7 /' | cat_i
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.1.$n 2>&1
sleep 1
ret=0
$RNDCCMD 10.53.0.7 null --- start test$n - step 1 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "dname,dname|5,4,3,2,1,s5,s4,s3,s2,s1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "dname.dname._.5.4.3.2.1.s5.s4.s3.s2.s1"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.1.$n 2>&1
grep 'status: NOERROR' dig.out.1.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 3' dig.out.1.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 2 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "dname,dname|5,4,3,2,1,s5,s4,s3,s2,s1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "dname.dname._.5.4.3.2.1.s5.s4.s3.s2.s1"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.2.$n 2>&1
grep 'status: NOERROR' dig.out.2.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 3' dig.out.2.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 3 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "dname,dname|2,3,s1,s2,s3,s4,1" | sendcmd
+sendcmd 10.53.0.4 setup-chain "dname.dname._.2.3.s1.s2.s3.s4.1"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.3.$n 2>&1
grep 'status: NOERROR' dig.out.3.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 3' dig.out.3.$n >/dev/null 2>&1 || ret=1
echo_i "checking external CNAME/DNAME chains in various orders ($n)"
ret=0
$RNDCCMD 10.53.0.7 null --- start test$n - step 1 --- 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "xname,dname|1,2,3,4,s1,s2,s3,s4" | sendcmd
+sendcmd 10.53.0.4 setup-chain "xname.dname._.1.2.3.4.s1.s2.s3.s4"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.1.$n 2>&1
grep 'status: NOERROR' dig.out.1.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.1.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 2 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "xname,dname|s2,2,s1,1,4,s4,3" | sendcmd
+sendcmd 10.53.0.4 setup-chain "xname.dname._.s2.2.s1.1.4.s4.3"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.2.$n 2>&1
grep 'status: NOERROR' dig.out.2.$n >/dev/null 2>&1 || ret=1
grep 'ANSWER: 2' dig.out.2.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 null --- start test$n - step 3 --- 2>&1 | sed 's/^/ns7 /' | cat_i
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i
-echo "xname,dname|s2,2,2,2" | sendcmd
+sendcmd 10.53.0.4 setup-chain "xname.dname._.s2.2.2.2"
$DIG $DIGOPTS @10.53.0.7 test.domain.nil >dig.out.3.$n 2>&1
grep 'status: SERVFAIL' dig.out.3.$n >/dev/null 2>&1 || ret=1
$RNDCCMD 10.53.0.7 flush 2>&1 | sed 's/^/ns7 /' | cat_i