]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add common parts of resolver test custom servers
authorŠtěpán Balážik <stepan@isc.org>
Fri, 26 Dec 2025 00:06:28 +0000 (01:06 +0100)
committerŠtěpán Balážik <stepan@isc.org>
Sat, 24 Jan 2026 12:04:09 +0000 (13:04 +0100)
These will be shared by all the ans*/ans.py files.

bin/tests/system/resolver/resolver_ans.py [new file with mode: 0644]

diff --git a/bin/tests/system/resolver/resolver_ans.py b/bin/tests/system/resolver/resolver_ans.py
new file mode 100644 (file)
index 0000000..29d17fc
--- /dev/null
@@ -0,0 +1,146 @@
+"""
+Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+
+SPDX-License-Identifier: MPL-2.0
+
+This Source Code Form is subject to the terms of the Mozilla Public
+License, v. 2.0.  If a copy of the MPL was not distributed with this
+file, you can obtain one at https://mozilla.org/MPL/2.0/.
+
+See the COPYRIGHT file distributed with this work for additional
+information regarding copyright ownership.
+"""
+
+from typing import AsyncGenerator, List, NamedTuple, Union
+
+import abc
+
+import dns.name
+import dns.rdataclass
+import dns.rdatatype
+import dns.rrset
+
+from isctest.asyncserver import (
+    DnsResponseSend,
+    DomainHandler,
+    QnameHandler,
+    QueryContext,
+)
+
+
+def rrset(
+    qname: Union[dns.name.Name, str],
+    rtype: dns.rdatatype.RdataType,
+    rdata: str,
+    ttl: int = 300,
+) -> dns.rrset.RRset:
+    return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
+
+
+def rrset_from_list(
+    qname: Union[dns.name.Name, str],
+    rtype: dns.rdatatype.RdataType,
+    rdata_list: List[str],
+    ttl: int = 300,
+) -> dns.rrset.RRset:
+    return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
+
+
+def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset:
+    return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
+
+
+class DelegationRRsets(NamedTuple):
+    ns_rrset: dns.rrset.RRset
+    a_rrset: dns.rrset.RRset
+
+
+def delegation_rrsets(
+    owner: Union[dns.name.Name, str],
+    server_number: int,
+    ns_name: Union[dns.name.Name, str, None] = None,
+) -> DelegationRRsets:
+    if ns_name is None:
+        ns_name = f"ns.{owner}"
+    ns_rrset = rrset(owner, dns.rdatatype.NS, f"{ns_name}")
+    a_rrset = rrset(ns_name, dns.rdatatype.A, f"10.53.0.{server_number}")
+    return DelegationRRsets(ns_rrset, a_rrset)
+
+
+def setup_delegation(
+    qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int
+) -> None:
+    delegation = delegation_rrsets(owner, server_number)
+    qctx.response.authority.append(delegation.ns_rrset)
+    qctx.response.additional.append(delegation.a_rrset)
+
+
+class DelegationHandler(DomainHandler):
+    @property
+    @abc.abstractmethod
+    def server_number(self) -> int:
+        raise NotImplementedError
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        setup_delegation(qctx, self.matched_domain, self.server_number)
+        yield DnsResponseSend(qctx.response, authoritative=False)
+
+
+class Gl6412AHandler(QnameHandler):
+    qnames = ["a.gl6412.", "a.a.gl6412."]
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        qctx.response.authority.append(soa_rrset(qctx.qname))
+        yield DnsResponseSend(qctx.response)
+
+
+class Gl6412Handler(QnameHandler):
+    qnames = ["gl6412."]
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        if qctx.qtype == dns.rdatatype.SOA:
+            qctx.response.answer.append(soa_rrset(qctx.qname))
+        elif qctx.qtype == dns.rdatatype.NS:
+            # XXX: The delegation is broken here; dot is missing from NS target names.
+            # I don't know if this is intentional, but for now we are chasing behavior parity.
+            ns2_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns2{qctx.qname}")
+            ns3_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns3{qctx.qname}")
+            qctx.response.answer.append(ns2_rrset)
+            qctx.response.answer.append(ns3_rrset)
+        else:
+            qctx.response.authority.append(soa_rrset(qctx.qname))
+        yield DnsResponseSend(qctx.response)
+
+
+class Gl6412Ns2Handler(QnameHandler):
+    qnames = ["ns2.gl6412."]
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        if qctx.qtype == dns.rdatatype.A:
+            a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.2")
+            qctx.response.answer.append(a_rrset)
+        else:
+            qctx.response.authority.append(soa_rrset(qctx.qname))
+        yield DnsResponseSend(qctx.response)
+
+
+class Gl6412Ns3Handler(QnameHandler):
+    qnames = ["ns3.gl6412."]
+
+    async def get_responses(
+        self, qctx: QueryContext
+    ) -> AsyncGenerator[DnsResponseSend, None]:
+        if qctx.qtype == dns.rdatatype.A:
+            a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3")
+            qctx.response.answer.append(a_rrset)
+        else:
+            qctx.response.authority.append(soa_rrset(qctx.qname))
+        yield DnsResponseSend(qctx.response)