from typing import AsyncGenerator
import dns.edns
-import dns.message
import dns.name
+import dns.rcode
import dns.rdatatype
import dns.rrset
import dns.tsigkeyring
)
-def _reparse_with_keyring(qctx: QueryContext) -> None:
- """
- `isctest.asyncserver` doesn't support TSIG signing and validation properly
- and hacks around it. However, here we need to be able to sign responses with
- TSIG, so we reparse the query and recreate the response stub here.
- """
- qctx.query = dns.message.from_wire(qctx.query.to_wire(), keyring=KEYRING)
- qctx.response = dns.message.make_response(qctx.query)
-
-
def _first_label(qctx: QueryContext) -> str:
return qctx.qname.labels[0].decode("ascii")
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
_add_cookie(qctx)
qctx.response.answer.append(_ns(qctx))
if self.evil_server:
qctx.response.authority.append(_spoofed_a(qctx))
else:
qctx.response.authority.append(_legit_a(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
class GlueHandler(_SpoofableHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
_add_cookie(qctx)
if self.evil_server:
qctx.response.answer.append(_spoofed_a(qctx))
else:
qctx.response.answer.append(_legit_a(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
class TcpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
if _first_label(qctx) != "nocookie":
_add_cookie(qctx)
qctx.response.answer.append(_legit_a(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
class WithtsigUdpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
qctx.response.answer.append(_legit_a(qctx))
qctx.response.answer.append(_spoofed_a(qctx))
qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
- _reparse_with_keyring(qctx)
+ qctx.prepare_new_response()
_add_cookie(qctx)
qctx.response.answer.append(_legit_a(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
class UdpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
qctx.response.answer.append(_legit_a(qctx))
if _first_label(qctx) not in ("nocookie", "tcponly"):
_add_cookie(qctx)
else:
qctx.response.answer.append(_spoofed_a(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
class FallbackHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
- _reparse_with_keyring(qctx)
_add_cookie(qctx)
if qctx.qtype == dns.rdatatype.SOA:
qctx.response.answer.append(_soa(qctx))
else:
qctx.response.authority.append(_soa(qctx))
- yield DnsResponseSend(qctx.response, authoritative=True)
+ yield DnsResponseSend(qctx.response)
def cookie_server(evil: bool) -> AsyncDnsServer:
- server = AsyncDnsServer(keyring=None)
+ server = AsyncDnsServer(
+ keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR
+ )
server.install_response_handlers(
[
NsHandler(evil),