from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
- DomainHandler,
IgnoreAllQueries,
QnameHandler,
QueryContext,
)
-def setup_delegation(qctx: QueryContext, owner: str) -> None:
- ns_name = f"ns.{owner}"
- ns_rrset = dns.rrset.from_text(owner, 300, qctx.qclass, dns.rdatatype.NS, ns_name)
- a_rrset = dns.rrset.from_text(
- ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3"
- )
- qctx.response.authority.append(ns_rrset)
- qctx.response.additional.append(a_rrset)
-
-
-class BadGoodCnameHandler(QnameHandler):
- qnames = [
- "badcname.example.net.",
- "goodcname.example.net.",
- ]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- # Data for CNAME/DNAME filtering. We need to make one-level
- # delegation to avoid automatic acceptance for subdomain aliases
- setup_delegation(qctx, "example.net.")
- yield DnsResponseSend(qctx.response, authoritative=False)
-
-
-class Cname1Handler(QnameHandler):
- qnames = ["cname1.example.com."]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- # Data for the "cname + other data / 1" test
- cname_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname1.example.com."
- )
- a_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
- )
- qctx.response.answer.append(cname_rrset)
- qctx.response.answer.append(a_rrset)
- yield DnsResponseSend(qctx.response, authoritative=False)
-
-
-class Cname2Handler(QnameHandler):
- qnames = ["cname2.example.com."]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- # Data for the "cname + other data / 2" test: same RRs in opposite order
- a_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
- )
- cname_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname2.example.com."
- )
- qctx.response.answer.append(a_rrset)
- qctx.response.answer.append(cname_rrset)
- yield DnsResponseSend(qctx.response, authoritative=False)
-
-
-class ExampleHandler(QnameHandler):
- qnames = [
- "www.example.com.",
- "www.example.net.",
- "badcname.example.org.",
- "goodcname.example.org.",
- "foo.badcname.example.org.",
- "foo.goodcname.example.org.",
- ]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- # Data for address/alias filtering.
- if qctx.qtype == dns.rdatatype.A:
- a_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, qctx.qtype, "192.0.2.1"
- )
- qctx.response.answer.append(a_rrset)
- elif qctx.qtype == dns.rdatatype.AAAA:
- aaaa_rrset = dns.rrset.from_text(
- qctx.qname, 300, qctx.qclass, qctx.qtype, "2001:db8:beef::1"
- )
- qctx.response.answer.append(aaaa_rrset)
- yield DnsResponseSend(qctx.response, authoritative=True)
-
-
class FooInfoHandler(QnameHandler, IgnoreAllQueries):
qnames = ["foo.info."]
-class NoDataHandler(DomainHandler):
- domains = ["nodata.example.net."]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-class NxdomainHandler(DomainHandler):
- domains = ["nxdomain.example.net."]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- qctx.response.set_rcode(dns.rcode.NXDOMAIN)
- yield DnsResponseSend(qctx.response, authoritative=True)
-
-
-class SubHandler(DomainHandler):
- domains = ["sub.example.org."]
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- # Data for CNAME/DNAME filtering. The final answers are
- # expected to be accepted regardless of the filter setting.
- setup_delegation(qctx, "sub.example.org.")
- yield DnsResponseSend(qctx.response, authoritative=False)
-
-
class FallbackHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- setup_delegation(qctx, "below.www.example.com.")
- yield DnsResponseSend(qctx.response, authoritative=False)
+ name = "below.www.example.com."
+ ns_name = f"ns.{name}"
+ ns_rrset = dns.rrset.from_text(
+ name, 300, qctx.qclass, dns.rdatatype.NS, ns_name
+ )
+ a_rrset = dns.rrset.from_text(
+ ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3"
+ )
+ qctx.response.authority.append(ns_rrset)
+ qctx.response.additional.append(a_rrset)
+ yield DnsResponseSend(qctx.response)
def main() -> None:
- server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
- server.install_response_handlers(
- BadGoodCnameHandler(),
- Cname1Handler(),
- Cname2Handler(),
- ExampleHandler(),
- FooInfoHandler(),
- NoDataHandler(),
- NxdomainHandler(),
- SubHandler(),
- )
- server.install_response_handler(FallbackHandler())
+ server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR, default_aa=False)
+ server.install_response_handlers(FooInfoHandler(), FallbackHandler())
server.run()