return super().to_wire(*args, **kwargs)
+class _NoKeyringType:
+ pass
+
+
class AsyncDnsServer(AsyncServer):
"""
DNS server which responds to queries based on zone data and/or custom
/,
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = True,
+ keyring: Union[
+ Dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
+ ] = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
- acknowledge_tsig_dnspython_hacks: bool = False,
) -> None:
super().__init__(self._handle_udp, self._handle_tcp, "ans.pid")
self._response_handlers: List[ResponseHandler] = []
self._default_rcode = default_rcode
self._default_aa = default_aa
+ self._keyring = keyring
self._acknowledge_manual_dname_handling = acknowledge_manual_dname_handling
- self._acknowledge_tsig_dnspython_hacks = acknowledge_tsig_dnspython_hacks
self._load_zones()
Yield wire data to send as a response over the established transport.
"""
try:
- query = dns.message.from_wire(wire)
- except dns.message.UnknownTSIGKey:
- self._abort_if_tsig_signed_query_received_unless_acknowledged()
- query = _DnsMessageWithTsigDisabled.from_wire(wire)
+ query = self._parse_message(wire)
except dns.exception.DNSException as exc:
logging.error("Invalid query from %s (%s): %s", peer, wire.hex(), exc)
return
response_length = struct.pack("!H", len(response))
yield response_length + response
- def _abort_if_tsig_signed_query_received_unless_acknowledged(self) -> None:
- if self._acknowledge_tsig_dnspython_hacks:
- return
-
- error = "TSIG-signed query received; "
- error += "due to a bug in dnspython, this requires some hacking around; "
- error += "you may experience unexpected behavior when dealing with TSIG; "
- error += "TSIG validation is disabled, so any TSIG handling must be done "
- error += "manually; pass `acknowledge_tsig_dnspython_hacks=True` to the "
- error += "AsyncDnsServer constructor to acknowledge this and continue."
-
- raise ValueError(error)
+ def _parse_message(self, wire: bytes) -> dns.message.Message:
+ try:
+ if isinstance(self._keyring, _NoKeyringType):
+ keyring = None
+ else:
+ keyring = self._keyring
+ return dns.message.from_wire(wire, keyring=keyring)
+ except dns.message.UnknownTSIGKey as exc:
+ if isinstance(self._keyring, _NoKeyringType):
+ error = "TSIG-signed query received but no `keyring` was provided; "
+ error += "either provide a keyring (in which case the server will "
+ error += "ignore any TSIG-invalid queries), or set `keyring=None` "
+ error += "explicitly to disable TSIG validation altogether. "
+ error += "This requires some hacking around a dnspython bug, "
+ error += "so there may be unexpected side effects."
+ raise ValueError(error) from exc
+ if self._keyring is None:
+ return _DnsMessageWithTsigDisabled.from_wire(wire)
+ raise
async def _prepare_responses(
self, qctx: QueryContext