# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from __future__ import print_function
-import os
-import sys
-import signal
-import socket
-import select
-from datetime import datetime, timedelta
-import time
-import functools
+from cookie_ans import cookie_server
-import dns
-import dns.edns
-import dns.flags
-import dns.message
-import dns.query
-import dns.tsig
-import dns.tsigkeyring
-import dns.version
-from dns.edns import *
-from dns.name import *
-from dns.rcode import *
-from dns.rdataclass import *
-from dns.rdatatype import *
-from dns.tsig import *
+def main() -> None:
+ cookie_server(evil=False).run()
-# Log query to file
-def logquery(type, qname):
- with open("qlog", "a") as f:
- f.write("%s %s\n", type, qname)
-
-
-# DNS 2.0 keyring specifies the algorithm
-try:
- keyring = dns.tsigkeyring.from_text(
- {
- "foo": {os.getenv("DEFAULT_HMAC", "hmac-sha256"), "aaaaaaaaaaaa"},
- "fake": {os.getenv("DEFAULT_HMAC", "hmac-sha256"), "aaaaaaaaaaaa"},
- }
- )
-except:
- keyring = dns.tsigkeyring.from_text({"foo": "aaaaaaaaaaaa", "fake": "aaaaaaaaaaaa"})
-
-dopass2 = False
-
-
-############################################################################
-#
-# This server will serve valid and spoofed answers. A spoofed answer will
-# have the address 10.53.0.10 included.
-#
-# When receiving a query over UDP:
-#
-# A query to "nocookie"/A will result in a spoofed answer with no cookie set.
-# A query to "tcponly"/A will result in a spoofed answer with no cookie set.
-# A query to "withtsig"/A will result in two responses, the first is a spoofed
-# answer that is TSIG signed, the second is a valid answer with a cookie set.
-# A query to anything else will result in a valid answer with a cookie set.
-#
-# When receiving a query over TCP:
-#
-# A query to "nocookie"/A will result in a valid answer with no cookie set.
-# A query to anything else will result in a valid answer with a cookie set.
-#
-############################################################################
-def create_response(msg, tcp, first, ns10):
- global dopass2
- m = dns.message.from_wire(msg, keyring=keyring)
- qname = m.question[0].name.to_text()
- lqname = qname.lower()
- labels = lqname.split(".")
- rrtype = m.question[0].rdtype
- typename = dns.rdatatype.to_text(rrtype)
-
- with open("query.log", "a") as f:
- f.write("%s %s\n" % (typename, qname))
- print("%s %s" % (typename, qname), end=" ")
-
- r = dns.message.make_response(m)
- r.set_rcode(NOERROR)
- if rrtype == A:
- # exempt potential nameserver A records.
- if labels[0] == "ns" and ns10:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
- else:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.9"))
- if not tcp and labels[0] == "nocookie":
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
- if not tcp and labels[0] == "tcponly":
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
- if first and not tcp and labels[0] == "withtsig":
- r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
- dopass2 = True
- elif rrtype == NS:
- length = len(labels)
- if length == 2:
- r.answer.append(dns.rrset.from_text(qname, 1, IN, NS, "ns." + qname))
- if ns10:
- r.additional.append(
- dns.rrset.from_text("ns." + qname, 1, IN, A, "10.53.0.10")
- )
- else:
- r.additional.append(
- dns.rrset.from_text("ns." + qname, 1, IN, A, "10.53.0.9")
- )
- else:
- tld = ".".join(labels[length - 2 :])
- r.authority.append(dns.rrset.from_text(tld, 2, IN, SOA, ". . 0 0 0 0 2"))
- elif rrtype == SOA:
- r.answer.append(dns.rrset.from_text(qname, 2, IN, SOA, ". . 0 0 0 0 2"))
- else:
- r.authority.append(dns.rrset.from_text(qname, 2, IN, SOA, ". . 0 0 0 0 2"))
- # Add a server cookie to the response
- if labels[0] != "nocookie" or rrtype != A:
- for o in m.options:
- if o.otype == 10: # Use 10 instead of COOKIE
- if first and labels[0] == "withtsig" and not tcp and rrtype == A:
- r.use_tsig(
- keyring=keyring,
- keyname=dns.name.from_text("fake"),
- algorithm=HMAC_SHA256,
- )
- elif labels[0] != "tcponly" or tcp or rrtype != A:
- cookie = o
- try:
- if len(o.server) == 0:
- cookie.server = o.client
- except AttributeError: # dnspython<2.7.0 compat
- if len(o.data) == 8:
- cookie.data = o.data + o.data
- else:
- cookie.data = o.data
- r.use_edns(options=[cookie])
- r.flags |= dns.flags.AA
- return r
-
-
-def sigterm(signum, frame):
- print("Shutting down now...")
- os.remove("ans.pid")
- running = False
- sys.exit(0)
-
-
-############################################################################
-# Main
-#
-# Set up responder and control channel, open the pid file, and start
-# the main loop, listening for queries on the query channel or commands
-# on the control channel and acting on them.
-############################################################################
-ip4_addr1 = "10.53.0.9"
-ip4_addr2 = "10.53.0.10"
-ip6_addr1 = "fd92:7065:b8e:ffff::9"
-ip6_addr2 = "fd92:7065:b8e:ffff::10"
-
-try:
- port = int(os.environ["PORT"])
-except:
- port = 5300
-
-query4_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-query4_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-query4_udp1.bind((ip4_addr1, port))
-query4_tcp1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-query4_tcp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-query4_tcp1.bind((ip4_addr1, port))
-query4_tcp1.listen(1)
-query4_tcp1.settimeout(1)
-
-query4_udp2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-query4_udp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-query4_udp2.bind((ip4_addr2, port))
-query4_tcp2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-query4_tcp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-query4_tcp2.bind((ip4_addr2, port))
-query4_tcp2.listen(1)
-query4_tcp2.settimeout(1)
-
-havev6 = True
-query6_udp1 = None
-query6_udp2 = None
-query6_tcp1 = None
-query6_tcp2 = None
-try:
- query6_udp1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- query6_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- query6_udp1.bind((ip6_addr1, port))
- query6_tcp1 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- query6_tcp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- query6_tcp1.bind((ip6_addr1, port))
- query6_tcp1.listen(1)
- query6_tcp1.settimeout(1)
-
- query6_udp2 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- query6_udp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- query6_udp2.bind((ip6_addr2, port))
- query6_tcp2 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- query6_tcp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- query6_tcp2.bind((ip6_addr2, port))
- query6_tcp2.listen(1)
- query6_tcp2.settimeout(1)
-except:
- if query6_udp1 != None:
- query6_udp1.close()
- if query6_tcp1 != None:
- query6_tcp1.close()
- if query6_udp2 != None:
- query6_udp2.close()
- if query6_tcp2 != None:
- query6_tcp2.close()
- havev6 = False
-
-signal.signal(signal.SIGTERM, sigterm)
-
-f = open("ans.pid", "w")
-pid = os.getpid()
-print(pid, file=f)
-f.close()
-
-running = True
-
-print("Using DNS version %s" % dns.version.version)
-print("Listening on %s port %d" % (ip4_addr1, port))
-print("Listening on %s port %d" % (ip4_addr2, port))
-if havev6:
- print("Listening on %s port %d" % (ip6_addr1, port))
- print("Listening on %s port %d" % (ip6_addr2, port))
-print("Ctrl-c to quit")
-
-if havev6:
- input = [
- query4_udp1,
- query6_udp1,
- query4_tcp1,
- query6_tcp1,
- query4_udp2,
- query6_udp2,
- query4_tcp2,
- query6_tcp2,
- ]
-else:
- input = [query4_udp1, query4_tcp1, query4_udp2, query4_tcp2]
-
-while running:
- try:
- inputready, outputready, exceptready = select.select(input, [], [])
- except select.error as e:
- break
- except socket.error as e:
- break
- except KeyboardInterrupt:
- break
-
- for s in inputready:
- ns10 = False
- if s == query4_udp1 or s == query6_udp1 or s == query4_udp2 or s == query6_udp2:
- if s == query4_udp1 or s == query6_udp1:
- print(
- "UDP Query received on %s"
- % (ip4_addr1 if s == query4_udp1 else ip6_addr1),
- end=" ",
- )
- if s == query4_udp2 or s == query6_udp2:
- print(
- "UDP Query received on %s"
- % (ip4_addr2 if s == query4_udp2 else ip6_addr2),
- end=" ",
- )
- ns10 = True
- # Handle incoming queries
- msg = s.recvfrom(65535)
- dopass2 = False
- rsp = create_response(msg[0], False, True, ns10)
- print(dns.rcode.to_text(rsp.rcode()))
- s.sendto(rsp.to_wire(), msg[1])
- if dopass2:
- print("Sending second UDP response without TSIG", end=" ")
- rsp = create_response(msg[0], False, False, ns10)
- s.sendto(rsp.to_wire(), msg[1])
- print(dns.rcode.to_text(rsp.rcode()))
-
- if s == query4_tcp1 or s == query6_tcp1 or s == query4_tcp2 or s == query6_tcp2:
- try:
- (cs, _) = s.accept()
- if s == query4_tcp1 or s == query6_tcp1:
- print(
- "TCP Query received on %s"
- % (ip4_addr1 if s == query4_tcp1 else ip6_addr1),
- end=" ",
- )
- if s == query4_tcp2 or s == query6_tcp2:
- print(
- "TCP Query received on %s"
- % (ip4_addr2 if s == query4_tcp2 else ip6_addr2),
- end=" ",
- )
- ns10 = True
- # get TCP message length
- buf = cs.recv(2)
- length = struct.unpack(">H", buf[:2])[0]
- # grep DNS message
- msg = cs.recv(length)
- rsp = create_response(msg, True, True, ns10)
- print(dns.rcode.to_text(rsp.rcode()))
- wire = rsp.to_wire()
- cs.send(struct.pack(">H", len(wire)))
- cs.send(wire)
- cs.close()
- except s.timeout:
- pass
- if not running:
- break
+if __name__ == "__main__":
+ main()
--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from typing import AsyncGenerator
+
+import dns
+import dns.tsigkeyring
+
+from isctest.asyncserver import (
+ AsyncDnsServer,
+ ResponseHandler,
+ DnsResponseSend,
+ DnsProtocol,
+ QueryContext,
+)
+
+from isctest.name import prepend_label
+from isctest.vars.algorithms import ALG_VARS
+
+KEYRING = dns.tsigkeyring.from_text(
+ {
+ "foo": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
+ "fake": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
+ }
+)
+
+
+def _reparse_with_keyring(qctx: QueryContext) -> None:
+ """
+ `isctest.asyncserver` doesn't support TSIG signing and validation properly
+ and hacks around it. However, here we need to be able to sign responses with
+ TSIG, so we reparse the query and recreate the response stub here.
+ """
+ qctx.query = dns.message.from_wire(qctx.query.to_wire(), keyring=KEYRING)
+ qctx.response = dns.message.make_response(qctx.query)
+
+
+def _first_label(qctx: QueryContext) -> str:
+ return qctx.qname.labels[0].decode("ascii")
+
+
+def _add_cookie(qctx: QueryContext) -> None:
+ for o in qctx.query.options:
+ if o.otype == dns.edns.OptionType.COOKIE:
+ cookie = o
+ try:
+ if len(cookie.server) == 0:
+ cookie.server = cookie.client
+ except AttributeError: # dnspython<2.7.0 compat
+ if len(o.data) == 8:
+ cookie.data *= 2
+
+ qctx.response.use_edns(options=[cookie])
+ return
+
+
+def _tld(qctx: QueryContext) -> dns.name.Name:
+ return dns.name.Name(qctx.qname.labels[-2:])
+
+
+def _soa(qctx: QueryContext) -> dns.rrset.RRset:
+ return dns.rrset.from_text(
+ _tld(qctx), 2, dns.rdataclass.IN, dns.rdatatype.SOA, ". . 0 0 0 0 2"
+ )
+
+
+def _ns_name(qctx: QueryContext) -> dns.name.Name:
+ return prepend_label("ns", _tld(qctx))
+
+
+def _ns(qctx: QueryContext) -> dns.rrset.RRset:
+ return dns.rrset.from_text(
+ qctx.qname,
+ 1,
+ dns.rdataclass.IN,
+ dns.rdatatype.NS,
+ _ns_name(qctx).to_text(),
+ )
+
+
+def _legit_a(qctx: QueryContext) -> dns.rrset.RRset:
+ return dns.rrset.from_text(
+ qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.9"
+ )
+
+
+def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset:
+ return dns.rrset.from_text(
+ qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.10"
+ )
+
+
+class _SpoofableHandler(ResponseHandler):
+ def __init__(self, evil_server: bool) -> None:
+ self.evil_server = evil_server
+
+
+class NsHandler(_SpoofableHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.qtype == dns.rdatatype.NS and qctx.qname == _tld(qctx)
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ _add_cookie(qctx)
+ qctx.response.answer.append(_ns(qctx))
+ if self.evil_server:
+ qctx.response.authority.append(_spoofed_a(qctx))
+ else:
+ qctx.response.authority.append(_legit_a(qctx))
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+class GlueHandler(_SpoofableHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.qtype == dns.rdatatype.A and qctx.qname == _ns_name(qctx)
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ _add_cookie(qctx)
+ if self.evil_server:
+ qctx.response.answer.append(_spoofed_a(qctx))
+ else:
+ qctx.response.answer.append(_legit_a(qctx))
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+class TcpAHandler(ResponseHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.TCP
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ if _first_label(qctx) != "nocookie":
+ _add_cookie(qctx)
+ qctx.response.answer.append(_legit_a(qctx))
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+class WithtsigUdpAHandler(ResponseHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return (
+ qctx.qtype == dns.rdatatype.A
+ and qctx.protocol == DnsProtocol.UDP
+ and _first_label(qctx) == "withtsig"
+ )
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ qctx.response.answer.append(_legit_a(qctx))
+ qctx.response.answer.append(_spoofed_a(qctx))
+ qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+ _reparse_with_keyring(qctx)
+ _add_cookie(qctx)
+ qctx.response.answer.append(_legit_a(qctx))
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+class UdpAHandler(ResponseHandler):
+ def match(self, qctx: QueryContext) -> bool:
+ return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.UDP
+
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ qctx.response.answer.append(_legit_a(qctx))
+ if _first_label(qctx) not in ("nocookie", "tcponly"):
+ _add_cookie(qctx)
+ else:
+ qctx.response.answer.append(_spoofed_a(qctx))
+
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+class FallbackHandler(ResponseHandler):
+ async def get_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[DnsResponseSend, None]:
+ _reparse_with_keyring(qctx)
+ _add_cookie(qctx)
+ if qctx.qtype == dns.rdatatype.SOA:
+ qctx.response.answer.append(_soa(qctx))
+ else:
+ qctx.response.authority.append(_soa(qctx))
+ yield DnsResponseSend(qctx.response, authoritative=True)
+
+
+def cookie_server(evil: bool) -> AsyncDnsServer:
+ server = AsyncDnsServer(acknowledge_tsig_dnspython_hacks=True)
+ server.install_response_handler(NsHandler(evil))
+ server.install_response_handler(GlueHandler(evil))
+ server.install_response_handler(TcpAHandler())
+ server.install_response_handler(WithtsigUdpAHandler())
+ server.install_response_handler(UdpAHandler())
+ server.install_response_handler(FallbackHandler())
+ return server