From: Štěpán Balážik Date: Wed, 12 Nov 2025 15:35:40 +0000 (+0100) Subject: Use new AsyncDnsServer features in cookie system test X-Git-Tag: v9.21.17~25^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f5f84a649b89d095b508950512fcab8f84e168ff;p=thirdparty%2Fbind9.git Use new AsyncDnsServer features in cookie system test Take advantage of `default_aa`, `default_rcode` and `keyring` arguments. --- diff --git a/bin/tests/system/cookie/cookie_ans.py b/bin/tests/system/cookie/cookie_ans.py index 5656faa164f..50b06f2c161 100644 --- a/bin/tests/system/cookie/cookie_ans.py +++ b/bin/tests/system/cookie/cookie_ans.py @@ -12,8 +12,8 @@ from typing import AsyncGenerator import dns.edns -import dns.message import dns.name +import dns.rcode import dns.rdatatype import dns.rrset import dns.tsigkeyring @@ -37,16 +37,6 @@ KEYRING = dns.tsigkeyring.from_text( ) -def _reparse_with_keyring(qctx: QueryContext) -> None: - """ - `isctest.asyncserver` doesn't support TSIG signing and validation properly - and hacks around it. However, here we need to be able to sign responses with - TSIG, so we reparse the query and recreate the response stub here. - """ - qctx.query = dns.message.from_wire(qctx.query.to_wire(), keyring=KEYRING) - qctx.response = dns.message.make_response(qctx.query) - - def _first_label(qctx: QueryContext) -> str: return qctx.qname.labels[0].decode("ascii") @@ -112,14 +102,13 @@ class NsHandler(_SpoofableHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) _add_cookie(qctx) qctx.response.answer.append(_ns(qctx)) if self.evil_server: qctx.response.authority.append(_spoofed_a(qctx)) else: qctx.response.authority.append(_legit_a(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) class GlueHandler(_SpoofableHandler): @@ -129,13 +118,12 @@ class GlueHandler(_SpoofableHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) _add_cookie(qctx) if self.evil_server: qctx.response.answer.append(_spoofed_a(qctx)) else: qctx.response.answer.append(_legit_a(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) class TcpAHandler(ResponseHandler): @@ -145,11 +133,10 @@ class TcpAHandler(ResponseHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) if _first_label(qctx) != "nocookie": _add_cookie(qctx) qctx.response.answer.append(_legit_a(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) class WithtsigUdpAHandler(ResponseHandler): @@ -163,16 +150,15 @@ class WithtsigUdpAHandler(ResponseHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) qctx.response.answer.append(_legit_a(qctx)) qctx.response.answer.append(_spoofed_a(qctx)) qctx.response.use_tsig(keyring=KEYRING, keyname="fake") - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) - _reparse_with_keyring(qctx) + qctx.prepare_new_response() _add_cookie(qctx) qctx.response.answer.append(_legit_a(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) class UdpAHandler(ResponseHandler): @@ -182,31 +168,31 @@ class UdpAHandler(ResponseHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) qctx.response.answer.append(_legit_a(qctx)) if _first_label(qctx) not in ("nocookie", "tcponly"): _add_cookie(qctx) else: qctx.response.answer.append(_spoofed_a(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) class FallbackHandler(ResponseHandler): async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - _reparse_with_keyring(qctx) _add_cookie(qctx) if qctx.qtype == dns.rdatatype.SOA: qctx.response.answer.append(_soa(qctx)) else: qctx.response.authority.append(_soa(qctx)) - yield DnsResponseSend(qctx.response, authoritative=True) + yield DnsResponseSend(qctx.response) def cookie_server(evil: bool) -> AsyncDnsServer: - server = AsyncDnsServer(keyring=None) + server = AsyncDnsServer( + keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR + ) server.install_response_handlers( [ NsHandler(evil),