import subprocess
import ssl
import threading
+import time
from queue import Queue
tlsContext.sni_callback = cls.sniCallback
if cls._clientCert:
- tlsContext.verify_mode = ssl.CERT_REQUIRED
+ # we check ourselves in the TCP handler if needed, if we would set cert_REQUIRED comms
+ # just fail and that type of problem is harder to diagnose
+ tlsContext.verify_mode = ssl.CERT_OPTIONAL
tlsContext.load_verify_locations(cafile="ca.pem")
print("Launching TLS responder..")
cls._TLSResponder.daemon = True
cls._TLSResponder.start()
+ @classmethod
+ def tearDownResponders(cls):
+ cls._backgroundThreads[cls._TLSResponder.native_id] = False
+ count = 0
+ while count < 20 and len(cls._backgroundThreads) != 0:
+ print(f'Waiting for background responder thread to exit {count}...')
+ time.sleep(0.1)
+ count = count + 1
+
def checkOnlyTLSResponderHit(self, numberOfTLSQueries=1):
self.assertNotIn('UDP Responder', self._responsesCounter)
self.assertNotIn('TCP Responder', self._responsesCounter)
'dot-outqueries': 1
})
-class DoTOKWithClientCertOpenSSLTest(DoTWithLocalResponderTests):
+class DoTOKWithClientCertPEMTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with openssl validation using a proper CA store for the locally generated cert
+ """
+
+ _confdir = 'DoTOKWithClientCertPEM'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _clientCert = True
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifyopensslandclientcert
+ ca_store: 'ca.pem'
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+ client_certificate: client.pem
+ client_certificate_key: client.key
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTOKWithClientCertPEMTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ # there was one TCP query
+ self.checkOnlyTLSResponderHit(currentCount + 1)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
+
+class DoTOKWithClientCertPKCS12Test(DoTWithLocalResponderTests):
"""
This tests DoT to responder with openssl validation using a proper CA store for the locally generated cert
"""
- _confdir = 'DoTOKWithClientCertOpenSSL'
+ _confdir = 'DoTOKWithClientCertPKCS12'
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
@classmethod
def generateRecursorConfig(cls, confdir):
- super(DoTOKWithClientCertOpenSSLTest, cls).generateRecursorYamlConfig(confdir, False)
+ super(DoTOKWithClientCertPKCS12Test, cls).generateRecursorYamlConfig(confdir, False)
def testUDP(self):
"""
self.checkMetrics({
'dot-outqueries': 1
})
-
+
class DoTOKGnuTLSTest(DoTWithLocalResponderTests):
"""
This tests DoT to responder with gnutls validation using a proper CA store for the locally generated cert