import dns.edns
import dns.name
import dns.rcode
+import dns.rdataclass
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
+ QnameQtypeHandler,
QueryContext,
ResponseHandler,
+ StaticResponseHandler,
)
return None
-class PrimeHandler(ResponseHandler):
- """
- Specifically handle priming query for "." NS (type 2)
- """
-
- def match(self, qctx: QueryContext) -> bool:
- return len(qctx.qname.labels) == 0 and qctx.qtype == dns.rdatatype.NS
-
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
-
- ns_rrset = dns.rrset.from_text(
- ".", dns.rdatatype.NS, qctx.qclass, "a.root-servers.nil."
- )
- a_rrset = dns.rrset.from_text(
- "a.root-servers.nil.", dns.rdatatype.A, qctx.qclass, "10.53.0.3"
- )
+def rrset(
+ qname: dns.name.Name | str,
+ rtype: dns.rdatatype.RdataType,
+ rdata: str,
+ ttl: int = 300,
+) -> dns.rrset.RRset:
+ return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
- response = qctx.prepare_new_response(with_zone_data=False)
- response.set_rcode(dns.rcode.NOERROR)
- response.answer.append(ns_rrset)
- response.additional.append(a_rrset)
- yield DnsResponseSend(response, authoritative=True)
+class RootNSHandler(QnameQtypeHandler, StaticResponseHandler):
+ qnames = ["."]
+ qtypes = [dns.rdatatype.NS]
+ answer = [rrset(".", dns.rdatatype.NS, "a.root-servers.nil.")]
+ additional = [rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3")]
class CookieHandler(ResponseHandler):
def resend_server() -> AsyncDnsServer:
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
server.install_response_handlers(
- PrimeHandler(),
+ RootNSHandler(),
CookieHandler(),
NoErrorHandler(),
)