]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Use new AsyncDnsServer features in cookie system test
authorŠtěpán Balážik <stepan@isc.org>
Wed, 12 Nov 2025 15:35:40 +0000 (16:35 +0100)
committerŠtěpán Balážik <stepan@isc.org>
Thu, 18 Dec 2025 12:13:59 +0000 (13:13 +0100)
Take advantage of `default_aa`, `default_rcode` and `keyring` arguments.

bin/tests/system/cookie/cookie_ans.py

index 5656faa164f27dcb034e5dce89c5daef683093c2..50b06f2c16151053c407dac86aa54f8a22f2c855 100644 (file)
@@ -12,8 +12,8 @@
 from typing import AsyncGenerator
 
 import dns.edns
-import dns.message
 import dns.name
+import dns.rcode
 import dns.rdatatype
 import dns.rrset
 import dns.tsigkeyring
@@ -37,16 +37,6 @@ KEYRING = dns.tsigkeyring.from_text(
 )
 
 
-def _reparse_with_keyring(qctx: QueryContext) -> None:
-    """
-    `isctest.asyncserver` doesn't support TSIG signing and validation properly
-    and hacks around it. However, here we need to be able to sign responses with
-    TSIG, so we reparse the query and recreate the response stub here.
-    """
-    qctx.query = dns.message.from_wire(qctx.query.to_wire(), keyring=KEYRING)
-    qctx.response = dns.message.make_response(qctx.query)
-
-
 def _first_label(qctx: QueryContext) -> str:
     return qctx.qname.labels[0].decode("ascii")
 
@@ -112,14 +102,13 @@ class NsHandler(_SpoofableHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         _add_cookie(qctx)
         qctx.response.answer.append(_ns(qctx))
         if self.evil_server:
             qctx.response.authority.append(_spoofed_a(qctx))
         else:
             qctx.response.authority.append(_legit_a(qctx))
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 class GlueHandler(_SpoofableHandler):
@@ -129,13 +118,12 @@ class GlueHandler(_SpoofableHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         _add_cookie(qctx)
         if self.evil_server:
             qctx.response.answer.append(_spoofed_a(qctx))
         else:
             qctx.response.answer.append(_legit_a(qctx))
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 class TcpAHandler(ResponseHandler):
@@ -145,11 +133,10 @@ class TcpAHandler(ResponseHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         if _first_label(qctx) != "nocookie":
             _add_cookie(qctx)
         qctx.response.answer.append(_legit_a(qctx))
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 class WithtsigUdpAHandler(ResponseHandler):
@@ -163,16 +150,15 @@ class WithtsigUdpAHandler(ResponseHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         qctx.response.answer.append(_legit_a(qctx))
         qctx.response.answer.append(_spoofed_a(qctx))
         qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
-        _reparse_with_keyring(qctx)
+        qctx.prepare_new_response()
         _add_cookie(qctx)
         qctx.response.answer.append(_legit_a(qctx))
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 class UdpAHandler(ResponseHandler):
@@ -182,31 +168,31 @@ class UdpAHandler(ResponseHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         qctx.response.answer.append(_legit_a(qctx))
         if _first_label(qctx) not in ("nocookie", "tcponly"):
             _add_cookie(qctx)
         else:
             qctx.response.answer.append(_spoofed_a(qctx))
 
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 class FallbackHandler(ResponseHandler):
     async def get_responses(
         self, qctx: QueryContext
     ) -> AsyncGenerator[DnsResponseSend, None]:
-        _reparse_with_keyring(qctx)
         _add_cookie(qctx)
         if qctx.qtype == dns.rdatatype.SOA:
             qctx.response.answer.append(_soa(qctx))
         else:
             qctx.response.authority.append(_soa(qctx))
-        yield DnsResponseSend(qctx.response, authoritative=True)
+        yield DnsResponseSend(qctx.response)
 
 
 def cookie_server(evil: bool) -> AsyncDnsServer:
-    server = AsyncDnsServer(keyring=None)
+    server = AsyncDnsServer(
+        keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR
+    )
     server.install_response_handlers(
         [
             NsHandler(evil),