response: dns.message.Message
authoritative: Optional[bool] = None
delay: float = 0.0
+ acknowledge_hand_rolled_response: bool = False
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
"""
Yield a potentially delayed response that is a dns.message.Message.
"""
assert isinstance(self.response, dns.message.Message)
+ if not (
+ _is_asyncserver_response(self.response)
+ or self.acknowledge_hand_rolled_response
+ ):
+ error = "The response you are trying to send was not created using "
+ error += "AsyncDnsServer's response preparation methods. "
+ error += "This will break features such as automatic AA flag "
+ error += "and RCODE handling. If you need a fresh copy of a "
+ error += "response, use `QueryContext.prepare_new_response` "
+ error += "instead of `dns.message.make_response`. "
+ error += "To acknowledge this and proceed anyway, set "
+ error += "`acknowledge_hand_rolled_response=True` in "
+ error += "DnsResponseSend's constructor."
+ raise RuntimeError(error)
+
if self.authoritative is not None:
if self.authoritative:
self.response.flags |= dns.flags.AA
pass
+_ASYNCSERVER_RESPONSE_MARKER = "__is_asyncserver_response__"
+
+
+def _make_asyncserver_response(query: dns.message.Message) -> dns.message.Message:
+ response = dns.message.make_response(query)
+ setattr(response, _ASYNCSERVER_RESPONSE_MARKER, True)
+ return response
+
+
+def _is_asyncserver_response(message: dns.message.Message) -> bool:
+ return getattr(message, _ASYNCSERVER_RESPONSE_MARKER, False)
+
+
class AsyncDnsServer(AsyncServer):
"""
DNS server which responds to queries based on zone data and/or custom
except dns.exception.DNSException as exc:
logging.error("Invalid query from %s (%s): %s", peer, wire.hex(), exc)
return
- response_stub = dns.message.make_response(query)
+ response_stub = _make_asyncserver_response(query)
qctx = QueryContext(query, response_stub, peer, protocol)
self._log_query(qctx, peer, protocol)
responses = self._prepare_responses(qctx)
ns_rrset = dns.rrset.from_text(zone_cut, 2, qctx.qclass, dns.rdatatype.NS, ns_name)
a_rrset = dns.rrset.from_text(ns_name, 2, qctx.qclass, dns.rdatatype.A, target_addr)
- response = dns.message.make_response(qctx.query)
+ response = qctx.prepare_new_response(with_zone_data=False)
response.set_rcode(dns.rcode.NOERROR)
response.authority.append(ns_rrset)
response.additional.append(a_rrset)