import dns.message
import requests
import threading
+import ssl
+import copy
from twisted.internet import reactor
from proxyprotocol import ProxyProtocol
cls.Responder = threading.Thread(name='Responder', target=reactor.run, args=(False,))
cls.Responder.daemon = True
cls.Responder.start()
+
+
+ @classmethod
+ def _ResponderIncrementCounter(cls):
+ if threading.current_thread().name in cls._responsesCounter:
+ cls._responsesCounter[threading.current_thread().name] += 1
+ else:
+ cls._responsesCounter[threading.current_thread().name] = 1
+
+ @classmethod
+ def _getResponse(cls, request, fromQueue, toQueue, synthesize=None):
+ response = None
+ if len(request.question) != 1:
+ print("Skipping query with question count %d" % (len(request.question)))
+ return None
+ cls._ResponderIncrementCounter()
+ if not fromQueue.empty():
+ toQueue.put(request, True, cls._queueTimeout)
+ response = fromQueue.get(True, cls._queueTimeout)
+ if response:
+ response = copy.copy(response)
+ response.id = request.id
+
+ if synthesize is not None:
+ response = dns.message.make_response(request)
+ response.set_rcode(synthesize)
+
+ if not response:
+ if cls._answerUnexpected:
+ response = dns.message.make_response(request)
+ response.set_rcode(dns.rcode.SERVFAIL)
+
+ return response
+
+ @classmethod
+ def handleTCPConnection(cls, conn, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, partialWrite=False):
+ ignoreTrailing = trailingDataResponse is True
+ try:
+ data = conn.recv(2)
+ except Exception as err:
+ data = None
+ print(f'Error while reading query size in TCP responder thread {err=}, {type(err)=}')
+ if not data:
+ conn.close()
+ return
+
+ (datalen,) = struct.unpack("!H", data)
+ data = conn.recv(datalen)
+ forceRcode = None
+ try:
+ request = dns.message.from_wire(data, ignore_trailing=ignoreTrailing)
+ except dns.message.TrailingJunk as e:
+ if trailingDataResponse is False or forceRcode is True:
+ raise
+ print("TCP query with trailing data, synthesizing response")
+ request = dns.message.from_wire(data, ignore_trailing=True)
+ forceRcode = trailingDataResponse
+
+ if callback:
+ wire = callback(request)
+ else:
+ if request.edns > 1:
+ forceRcode = dns.rcode.BADVERS
+ response = cls._getResponse(request, fromQueue, toQueue, synthesize=forceRcode)
+ if response:
+ wire = response.to_wire(max_size=65535)
+
+ if not wire:
+ conn.close()
+ return
+ elif isinstance(wire, ResponderDropAction):
+ return
+
+ wireLen = struct.pack("!H", len(wire))
+ if partialWrite:
+ for b in wireLen:
+ conn.send(bytes([b]))
+ time.sleep(0.5)
+ else:
+ conn.send(wireLen)
+ conn.send(wire)
+
+ while multipleResponses:
+ # do not block, and stop as soon as the queue is empty, either the next response is already here or we are done
+ # otherwise we might read responses intended for the next connection
+ if fromQueue.empty():
+ break
+
+ response = fromQueue.get(False)
+ if not response:
+ break
+
+ response = copy.copy(response)
+ response.id = request.id
+ wire = response.to_wire(max_size=65535)
+ try:
+ conn.send(struct.pack("!H", len(wire)))
+ conn.send(wire)
+ except socket.error as e:
+ # some of the tests are going to close
+ # the connection on us, just deal with it
+ break
+
+ conn.close()
+
+ @classmethod
+ def TCPResponder(cls, port, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, tlsContext=None, multipleConnections=False, listeningAddr='127.0.0.1', partialWrite=False):
+ cls._backgroundThreads[threading.get_native_id()] = True
+ # trailingDataResponse=True means "ignore trailing data".
+ # Other values are either False (meaning "raise an exception")
+ # or are interpreted as a response RCODE for queries with trailing data.
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ try:
+ sock.bind((listeningAddr, port))
+ except socket.error as e:
+ print(listeningAddr)
+ print(port)
+ print("Error binding in the TCP responder: %s" % str(e))
+ sys.exit(1)
+
+ sock.listen(100)
+ sock.settimeout(0.5)
+ if tlsContext:
+ sock = tlsContext.wrap_socket(sock, server_side=True)
+
+ while True:
+ try:
+ (conn, _) = sock.accept()
+ except ssl.SSLError:
+ continue
+ except ConnectionResetError:
+ continue
+ except socket.timeout:
+ if cls._backgroundThreads.get(threading.get_native_id(), False) == False:
+ del cls._backgroundThreads[threading.get_native_id()]
+ break
+ else:
+ continue
+
+ conn.settimeout(5.0)
+ if multipleConnections:
+ thread = threading.Thread(name='TCP Connection Handler',
+ target=cls.handleTCPConnection,
+ args=[conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite])
+ thread.daemon = True
+ thread.start()
+ else:
+ cls.handleTCPConnection(conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite)
+
+ sock.close()
+
+class ResponderDropAction(object):
+ """
+ An object to indicate a drop action shall be taken
+ """
+ pass
import dns
import os
import subprocess
+import ssl
+import threading
+from queue import Queue
+
+
from recursortests import RecursorTest
class SimpleDoTTest(RecursorTest):
except subprocess.CalledProcessError as e:
print(e.output)
raise
-
+
+class DoTWithLocalResponderTests(RecursorTest):
+ """
+ This tests DoT to responder with validation"
+ """
+
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _tlsBackendPort = 853 # If binding to this port fails, add an empty !853 file to /etc/authbind/byport with execute permissons for you
+ _queueTimeout = 1
+ _toResponderQueue = Queue()
+ _fromResponderQueue = Queue()
+ _backgroundThreads = {}
+ _responsesCounter = {}
+ _answerUnexpected = True
+ _roothints = None
+
+ @staticmethod
+ def sniCallback(sslSocket, sni, sslContext):
+ assert(sni == 'tls.tests.powerdns.com')
+ return None
+
+ @classmethod
+ def sendUDPQuery(cls, query, response, useQueue=True, timeout=2.0, rawQuery=False):
+ if useQueue and response is not None:
+ cls._toResponderQueue.put(response, True, timeout)
+
+ if timeout:
+ cls._sock.settimeout(timeout)
+
+ try:
+ if not rawQuery:
+ query = query.to_wire()
+ cls._sock.send(query)
+ data = cls._sock.recv(4096)
+ except socket.timeout:
+ data = None
+ finally:
+ if timeout:
+ cls._sock.settimeout(None)
+
+ receivedQuery = None
+ message = None
+ if useQueue and not cls._fromResponderQueue.empty():
+ receivedQuery = cls._fromResponderQueue.get(True, timeout)
+ if data:
+ message = dns.message.from_wire(data)
+ return (receivedQuery, message)
+
+ @classmethod
+ def startResponders(cls):
+ tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ tlsContext.load_cert_chain('server.chain', 'server.key')
+ # requires Python 3.7+
+ if hasattr(tlsContext, 'sni_callback'):
+ tlsContext.sni_callback = cls.sniCallback
+
+ print("Launching TLS responder..")
+ cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
+ cls._TLSResponder.daemon = True
+ cls._TLSResponder.start()
+
+ def checkOnlyTLSResponderHit(self, numberOfTLSQueries=1):
+ self.assertNotIn('UDP Responder', self._responsesCounter)
+ self.assertNotIn('TCP Responder', self._responsesCounter)
+ self.assertEqual(self._responsesCounter['TLS Responder'], numberOfTLSQueries)
+
+class DoTOKOpenSSLTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with openssl validation using a proper CA store for the locally generated cert"
+ """
+
+ _confdir = 'DoTOKOpenSSL'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifygnu
+ ca_store: 'ca.pem'
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTOKOpenSSLTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ # there was one TCP query
+ self.checkOnlyTLSResponderHit(currentCount + 1)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
+
+
+class DoTOKGnuTLSTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with gnutls validation using a proper CA store for the locally generated cert"
+ """
+
+ _confdir = 'DoTOKGnuTLS'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifygnu
+ provider: gnutls
+ ca_store: 'ca.pem'
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTOKGnuTLSTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ # there was one TCP query
+ self.checkOnlyTLSResponderHit(currentCount + 1)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
+
+class DoTNOKOpenSSLTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with openssl validation using a missing CA store for the locally generated cert"
+ """
+
+ _confdir = 'DoTNOKOpenSSL'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifygnu
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTNOKOpenSSLTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+
+ self.assertRcodeEqual(receivedResponse, dns.rcode.SERVFAIL)
+
+ # there was no succesfull DoT query
+ self.checkOnlyTLSResponderHit(currentCount)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
+
+
+class DoTNOKGnuTLSTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with gnutls validation using a missing CA store for the locally generated cert"
+ """
+
+ _confdir = 'DoTNOKGnuTLS'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifygnu
+ provider: gnutls
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTNOKGnuTLSTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+
+ self.assertRcodeEqual(receivedResponse, dns.rcode.SERVFAIL)
+
+ # there was no succesful DoT query
+ self.checkOnlyTLSResponderHit(currentCount)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
+