# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from __future__ import print_function
-import os
-import sys
-import signal
-import socket
-import select
-from datetime import datetime, timedelta
-import time
-import functools
-
-import dns, dns.message, dns.query, dns.flags
-from dns.rdatatype import *
-from dns.rdataclass import *
-from dns.rcode import *
-from dns.name import *
-
-
-# Log query to file
-def logquery(type, qname):
- with open("qlog", "a") as f:
- f.write("%s %s\n", type, qname)
-
-
-############################################################################
-# Respond to a DNS query.
-# If there are EDNS options present return FORMERR copying the OPT record.
-# Otherwise:
-# SOA gets a unsigned response.
-# NS gets a unsigned response.
-# A gets a unsigned response.
-# All other types get a unsigned NODATA response.
-############################################################################
-def create_response(msg):
- m = dns.message.from_wire(msg)
- qname = m.question[0].name.to_text()
- rrtype = m.question[0].rdtype
- typename = dns.rdatatype.to_text(rrtype)
-
- with open("query.log", "a") as f:
- f.write("%s %s\n" % (typename, qname))
- print("%s %s" % (typename, qname), end=" ")
-
- if m.edns != -1 and len(m.options) != 0:
- r = dns.message.make_response(m)
- r.use_edns(
- edns=m.edns, ednsflags=m.ednsflags, payload=m.payload, options=m.options
- )
- r.set_rcode(FORMERR)
- else:
- r = dns.message.make_response(m)
- r.set_rcode(NOERROR)
- if rrtype == A:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
- elif rrtype == NS:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, NS, "."))
- elif rrtype == SOA:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, SOA, ". . 0 0 0 0 0"))
+from typing import AsyncGenerator
+
+import dns.rcode
+import dns.rdatatype
+
+from isctest.asyncserver import (
+ AsyncDnsServer,
+ DnsResponseSend,
+ QueryContext,
+ ResponseHandler,
+)
+
+from resolver_ans import rrset, soa_rrset
+
+
+class EdnsWithOptionsFormerrHandler(ResponseHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.query.edns > -1 and qctx.query.options
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ qctx.response.set_rcode(dns.rcode.FORMERR)
+ # The test requires that the server echoes back the client cookie
+ qctx.response.opt = qctx.query.opt
+ yield DnsResponseSend(qctx.response, authoritative=False)
+
+
+class FallbackHandler(ResponseHandler):
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ if qctx.qtype == dns.rdatatype.A:
+ a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.10")
+ qctx.response.answer.append(a_rrset)
+ elif qctx.qtype == dns.rdatatype.NS:
+ ns_rrset = rrset(qctx.qname, dns.rdatatype.NS, ".")
+ qctx.response.answer.append(ns_rrset)
+ elif qctx.qtype == dns.rdatatype.SOA:
+ qctx.response.answer.append(soa_rrset(qctx.qname))
else:
- r.authority.append(dns.rrset.from_text(qname, 1, IN, SOA, ". . 0 0 0 0 0"))
- r.flags |= dns.flags.AA
- return r
-
-
-def sigterm(signum, frame):
- print("Shutting down now...")
- os.remove("ans.pid")
- running = False
- sys.exit(0)
-
-
-############################################################################
-# Main
-#
-# Set up responder and control channel, open the pid file, and start
-# the main loop, listening for queries on the query channel or commands
-# on the control channel and acting on them.
-############################################################################
-ip4 = "10.53.0.10"
-ip6 = "fd92:7065:b8e:ffff::10"
-
-try:
- port = int(os.environ["PORT"])
-except:
- port = 5300
-
-query4_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-query4_socket.bind((ip4, port))
-havev6 = True
-try:
- query6_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- try:
- query6_socket.bind((ip6, port))
- except:
- query6_socket.close()
- havev6 = False
-except:
- havev6 = False
-signal.signal(signal.SIGTERM, sigterm)
-
-f = open("ans.pid", "w")
-pid = os.getpid()
-print(pid, file=f)
-f.close()
+ qctx.response.authority.append(soa_rrset(qctx.qname))
-running = True
+ yield DnsResponseSend(qctx.response, authoritative=True)
-print("Listening on %s port %d" % (ip4, port))
-if havev6:
- print("Listening on %s port %d" % (ip6, port))
-print("Ctrl-c to quit")
-if havev6:
- input = [query4_socket, query6_socket]
-else:
- input = [query4_socket]
+def main() -> None:
+ server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
+ server.install_response_handlers(
+ EdnsWithOptionsFormerrHandler(),
+ FallbackHandler(),
+ )
+ server.run()
-while running:
- try:
- inputready, outputready, exceptready = select.select(input, [], [])
- except select.error as e:
- break
- except socket.error as e:
- break
- except KeyboardInterrupt:
- break
- for s in inputready:
- if s == query4_socket or s == query6_socket:
- print(
- "Query received on %s" % (ip4 if s == query4_socket else ip6), end=" "
- )
- # Handle incoming queries
- msg = s.recvfrom(65535)
- rsp = create_response(msg[0])
- if rsp:
- print(dns.rcode.to_text(rsp.rcode()))
- s.sendto(rsp.to_wire(), msg[1])
- else:
- print("NO RESPONSE")
- if not running:
- break
+if __name__ == "__main__":
+ main()