From: Štěpán Balážik Date: Fri, 26 Dec 2025 00:06:28 +0000 (+0100) Subject: Add common parts of resolver test custom servers X-Git-Tag: v9.21.18~11^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=23d9055617f1911ca2e600d31688e34e48e0993c;p=thirdparty%2Fbind9.git Add common parts of resolver test custom servers These will be shared by all the ans*/ans.py files. --- diff --git a/bin/tests/system/resolver/resolver_ans.py b/bin/tests/system/resolver/resolver_ans.py new file mode 100644 index 00000000000..29d17fc164f --- /dev/null +++ b/bin/tests/system/resolver/resolver_ans.py @@ -0,0 +1,146 @@ +""" +Copyright (C) Internet Systems Consortium, Inc. ("ISC") + +SPDX-License-Identifier: MPL-2.0 + +This Source Code Form is subject to the terms of the Mozilla Public +License, v. 2.0. If a copy of the MPL was not distributed with this +file, you can obtain one at https://mozilla.org/MPL/2.0/. + +See the COPYRIGHT file distributed with this work for additional +information regarding copyright ownership. +""" + +from typing import AsyncGenerator, List, NamedTuple, Union + +import abc + +import dns.name +import dns.rdataclass +import dns.rdatatype +import dns.rrset + +from isctest.asyncserver import ( + DnsResponseSend, + DomainHandler, + QnameHandler, + QueryContext, +) + + +def rrset( + qname: Union[dns.name.Name, str], + rtype: dns.rdatatype.RdataType, + rdata: str, + ttl: int = 300, +) -> dns.rrset.RRset: + return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata) + + +def rrset_from_list( + qname: Union[dns.name.Name, str], + rtype: dns.rdatatype.RdataType, + rdata_list: List[str], + ttl: int = 300, +) -> dns.rrset.RRset: + return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list) + + +def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset: + return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0") + + +class DelegationRRsets(NamedTuple): + ns_rrset: dns.rrset.RRset + a_rrset: dns.rrset.RRset + + +def delegation_rrsets( + owner: Union[dns.name.Name, str], + server_number: int, + ns_name: Union[dns.name.Name, str, None] = None, +) -> DelegationRRsets: + if ns_name is None: + ns_name = f"ns.{owner}" + ns_rrset = rrset(owner, dns.rdatatype.NS, f"{ns_name}") + a_rrset = rrset(ns_name, dns.rdatatype.A, f"10.53.0.{server_number}") + return DelegationRRsets(ns_rrset, a_rrset) + + +def setup_delegation( + qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int +) -> None: + delegation = delegation_rrsets(owner, server_number) + qctx.response.authority.append(delegation.ns_rrset) + qctx.response.additional.append(delegation.a_rrset) + + +class DelegationHandler(DomainHandler): + @property + @abc.abstractmethod + def server_number(self) -> int: + raise NotImplementedError + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + setup_delegation(qctx, self.matched_domain, self.server_number) + yield DnsResponseSend(qctx.response, authoritative=False) + + +class Gl6412AHandler(QnameHandler): + qnames = ["a.gl6412.", "a.a.gl6412."] + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + qctx.response.authority.append(soa_rrset(qctx.qname)) + yield DnsResponseSend(qctx.response) + + +class Gl6412Handler(QnameHandler): + qnames = ["gl6412."] + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + if qctx.qtype == dns.rdatatype.SOA: + qctx.response.answer.append(soa_rrset(qctx.qname)) + elif qctx.qtype == dns.rdatatype.NS: + # XXX: The delegation is broken here; dot is missing from NS target names. + # I don't know if this is intentional, but for now we are chasing behavior parity. + ns2_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns2{qctx.qname}") + ns3_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns3{qctx.qname}") + qctx.response.answer.append(ns2_rrset) + qctx.response.answer.append(ns3_rrset) + else: + qctx.response.authority.append(soa_rrset(qctx.qname)) + yield DnsResponseSend(qctx.response) + + +class Gl6412Ns2Handler(QnameHandler): + qnames = ["ns2.gl6412."] + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + if qctx.qtype == dns.rdatatype.A: + a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.2") + qctx.response.answer.append(a_rrset) + else: + qctx.response.authority.append(soa_rrset(qctx.qname)) + yield DnsResponseSend(qctx.response) + + +class Gl6412Ns3Handler(QnameHandler): + qnames = ["ns3.gl6412."] + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[DnsResponseSend, None]: + if qctx.qtype == dns.rdatatype.A: + a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3") + qctx.response.answer.append(a_rrset) + else: + qctx.response.authority.append(soa_rrset(qctx.qname)) + yield DnsResponseSend(qctx.response)