import dns.edns
import dns.name
import dns.rcode
+import dns.rdataclass
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
+ QnameHandler,
QueryContext,
ResponseHandler,
+ StaticResponseHandler,
)
return None
-class PrimeHandler(ResponseHandler):
- """
- Specifically handle priming query for "." NS (type 2)
- """
-
- def match(self, qctx: QueryContext) -> bool:
- return len(qctx.qname.labels) == 0 and qctx.qtype == dns.rdatatype.NS
+def rrset(
+ qname: dns.name.Name | str,
+ rtype: dns.rdatatype.RdataType,
+ rdata: str,
+ ttl: int = 300,
+) -> dns.rrset.RRset:
+ return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
- async def get_responses(
- self, qctx: QueryContext
- ) -> AsyncGenerator[DnsResponseSend, None]:
- ns_rrset = dns.rrset.from_text(
- ".", dns.rdatatype.NS, qctx.qclass, "a.root-servers.nil."
- )
- a_rrset = dns.rrset.from_text(
- "a.root-servers.nil.", dns.rdatatype.A, qctx.qclass, "10.53.0.3"
- )
+class RootNSHandler(QnameHandler, StaticResponseHandler):
+ qnames = ["."]
+ answer = [
+ rrset(".", dns.rdatatype.NS, "a.root-servers.nil."),
+ ]
+ additional = [
+ rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3"),
+ ]
- response = qctx.prepare_new_response(with_zone_data=False)
- response.set_rcode(dns.rcode.NOERROR)
- response.answer.append(ns_rrset)
- response.additional.append(a_rrset)
- yield DnsResponseSend(response, authoritative=True)
+class ExampleNSHandler(QnameHandler, StaticResponseHandler):
+ qnames = ["example."]
+ answer = [
+ rrset("example.", dns.rdatatype.NS, "ns.example."),
+ ]
+ additional = [
+ rrset("ns.example.", dns.rdatatype.A, "10.53.0.3"),
+ ]
class CookieHandler(ResponseHandler):
def resend_server() -> AsyncDnsServer:
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
server.install_response_handlers(
- PrimeHandler(),
+ RootNSHandler(),
+ ExampleNSHandler(),
CookieHandler(),
NoErrorHandler(),
)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
+from re import compile as Re
+
import dns.message
import isctest
+# This test verifies the query pattern when the upstream behaves badly.
+# In this scenario, the upstream server (ans3) always responds with a
+# BADCOOKIE error for queries within the "example" zone, even on TCP.
+# The resolver (ns4), should not resend the same queries over and over
+# again, up to the max-query-count threshold. Instead, the expected
+# pattern is:
+# 1. Priming query, getting the NS for .
+# 2. Getting the NS for example.
+# 3. Trying to resolve test.example.
+# 4. Trying again, but now with the server cookie.
+# 5. Trying again, now over TCP.
+#
+# This means we expect 5 recursion queries trying to resolve test.example.
def test_resend_loop_badcookie(ns4):
- expected_log = "exceeded max queries resolving 'test.example/A'"
+ sending_packet = Re("sending packet from 10.53.0.4#[0-9]+ to 10.53.0.3#[0-9]+")
+ received_packet = Re("received packet from 10.53.0.3#[0-9]+ to 10.53.0.4#[0-9]+")
+
+ log_sequence = [
+ # 1. Priming query, getting the NS for .
+ sending_packet,
+ Re("COOKIE: [0-9a-z]{16}$"),
+ Re(".\\s+IN\\s+NS"),
+ # 2. Getting the NS for example.
+ sending_packet,
+ Re("COOKIE: [0-9a-z]{16}$"),
+ Re("example.\\s+IN\\s+NS"),
+ # 3. Trying to resolve test.example.
+ sending_packet,
+ Re("COOKIE: [0-9a-z]{16}$"),
+ Re("test.example.\\s+IN\\s+A"),
+ # Get the first BADCOOKIE error.
+ "UDP response",
+ received_packet,
+ "BADCOOKIE",
+ Re("COOKIE: [0-9a-z]{16}1122334455667788"),
+ Re("test.example.\\s+IN\\s+A"),
+ # 4. Trying again, but now with the server cookie.
+ sending_packet,
+ Re("test.example.\\s+IN\\s+A"),
+ # Get BADCOOKIE error again.
+ "UDP response",
+ received_packet,
+ "BADCOOKIE",
+ Re("COOKIE: [0-9a-z]{16}1122334455667788"),
+ Re("test.example.\\s+IN\\s+A"),
+ # 5. Trying again, now over TCP.
+ sending_packet,
+ Re("test.example.\\s+IN\\s+A"),
+ # Fails and give up.
+ "TCP response",
+ received_packet,
+ "BADCOOKIE",
+ Re("COOKIE: [0-9a-z]{16}1122334455667788"),
+ Re("test.example.\\s+IN\\s+A"),
+ ]
msg = dns.message.make_query("test.example", "A")
with ns4.watch_log_from_here() as watcher:
res = isctest.query.udp(msg, ns4.ip)
- watcher.wait_for_line(expected_log)
+ watcher.wait_for_sequence(log_sequence)
+
+ assert len(ns4.log.grep(sending_packet)) == 5
isctest.check.servfail(res)