soa: Optional[dns.rrset.RRset] = None
node: Optional[dns.node.Node] = None
answer: Optional[dns.rdataset.Rdataset] = None
+ alias: Optional[dns.name.Name] = None
@property
def qname(self) -> dns.name.Name:
return self.query.question[0].name
+ @property
+ def current_qname(self) -> dns.name.Name:
+ return self.alias or self.qname
+
@property
def qclass(self) -> RdataClass:
return self.query.question[0].rdclass
if self._nxdomain_response(qctx):
return
+ if self._cname_response(qctx):
+ return
+
if self._nodata_response(qctx):
return
self._noerror_response(qctx)
def _refused_response(self, qctx: QueryContext) -> bool:
- qctx.zone = self._zone_tree.find_best_zone(qctx.qname)
- if qctx.zone:
+ zone = self._zone_tree.find_best_zone(qctx.current_qname)
+ if zone:
+ qctx.zone = zone
return False
- qctx.response.set_rcode(dns.rcode.REFUSED)
+ if not qctx.response.answer:
+ qctx.response.set_rcode(dns.rcode.REFUSED)
return True
def _delegation_response(self, qctx: QueryContext) -> bool:
assert qctx.zone
- name = qctx.qname
+ name = qctx.current_qname
delegation = None
while name != qctx.zone.origin:
qctx.soa = qctx.zone.find_rrset(qctx.zone.origin, dns.rdatatype.SOA)
assert qctx.soa
- qctx.node = qctx.zone.get_node(qctx.qname)
+ qctx.node = qctx.zone.get_node(qctx.current_qname)
if qctx.node or not any(
- n for n in qctx.zone.nodes if n.is_subdomain(qctx.qname)
+ n for n in qctx.zone.nodes if n.is_subdomain(qctx.current_qname)
):
return False
qctx.response.authority.append(qctx.soa)
return True
+ def _cname_response(self, qctx: QueryContext) -> bool:
+ assert qctx.node
+
+ cname = qctx.node.get_rdataset(qctx.qclass, dns.rdatatype.CNAME)
+ if not cname:
+ return False
+
+ cname_rrset = dns.rrset.RRset(qctx.current_qname, qctx.qclass, cname.rdtype)
+ cname_rrset.update(cname)
+ qctx.response.answer.append(cname_rrset)
+
+ qctx.alias = cname[0].target
+ self._prepare_response_from_zone_data(qctx)
+ return True
+
def _nodata_response(self, qctx: QueryContext) -> bool:
assert qctx.node
assert qctx.soa
return False
qctx.response.set_rcode(dns.rcode.NOERROR)
- qctx.response.authority.append(qctx.soa)
+ if not qctx.response.answer:
+ qctx.response.authority.append(qctx.soa)
return True
def _noerror_response(self, qctx: QueryContext) -> None:
assert qctx.answer
- answer_rrset = dns.rrset.RRset(qctx.qname, qctx.qclass, qctx.qtype)
+ answer_rrset = dns.rrset.RRset(qctx.current_qname, qctx.qclass, qctx.qtype)
answer_rrset.update(qctx.answer)
qctx.response.set_rcode(dns.rcode.NOERROR)