clean-certs:
- rm -f ca.key ca.pem ca.srl server.csr server.key server.pem server.chain server.ocsp client csr clien.pem client.key client.p12
+ rm -f ca.key ca.pem ca.srl server.csr server.key server.pem server.chain server.ocsp client.csr clien.pem client.key client.p12
clean-configs:
rm -rf configs/*
certs:
--- /dev/null
+[req]
+default_bits = 2048
+encrypt_key = no
+prompt = no
+distinguished_name = client_distinguished_name
+req_extensions = v3_req
+
+[client_distinguished_name]
+CN = client.tests.powerdns.com
+OU = PowerDNS.com BV
+countryName = NL
+
+[v3_req]
+basicConstraints = CA:FALSE
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+subjectAltName = @alt_names
+
+[alt_names]
+DNS.1 = client.tests.powerdns.com
+DNS.2 = powerdns.com
+IP.3 = 127.0.0.1
return response
@classmethod
- def handleTCPConnection(cls, conn, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, partialWrite=False):
+ def handleTCPConnection(cls, conn, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, partialWrite=False,clientCert=False):
ignoreTrailing = trailingDataResponse is True
try:
data = conn.recv(2)
conn.close()
return
+ if clientCert:
+ print("Checking client cert")
+ cert = conn.getpeercert()
+ if cert is None:
+ raise AssertionError("Client certificate expected, got none")
+
(datalen,) = struct.unpack("!H", data)
data = conn.recv(datalen)
forceRcode = None
conn.close()
@classmethod
- def TCPResponder(cls, port, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, tlsContext=None, multipleConnections=False, listeningAddr='127.0.0.1', partialWrite=False):
+ def TCPResponder(cls, port, fromQueue, toQueue, trailingDataResponse=False, multipleResponses=False, callback=None, tlsContext=None, multipleConnections=False, listeningAddr='127.0.0.1', partialWrite=False,clientCert=False):
cls._backgroundThreads[threading.get_native_id()] = True
# trailingDataResponse=True means "ignore trailing data".
# Other values are either False (meaning "raise an exception")
if multipleConnections:
thread = threading.Thread(name='TCP Connection Handler',
target=cls.handleTCPConnection,
- args=[conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite])
+ args=[conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite,clientCert])
thread.daemon = True
thread.start()
else:
- cls.handleTCPConnection(conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite)
+ cls.handleTCPConnection(conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite,clientCert)
sock.close()
_responsesCounter = {}
_answerUnexpected = True
_roothints = None
+ _clientCert = False
@staticmethod
def sniCallback(sslSocket, sni, sslContext):
if hasattr(tlsContext, 'sni_callback'):
tlsContext.sni_callback = cls.sniCallback
+ if cls._clientCert:
+ tlsContext.verify_mode = ssl.CERT_REQUIRED
+ tlsContext.load_verify_locations(cafile="ca.pem")
+
print("Launching TLS responder..")
- cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
+ cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext, False, '127.0.0.1', False, cls._clientCert])
cls._TLSResponder.daemon = True
cls._TLSResponder.start()
'dot-outqueries': 1
})
+class DoTOKWithClientCertOpenSSLTest(DoTWithLocalResponderTests):
+ """
+ This tests DoT to responder with openssl validation using a proper CA store for the locally generated cert
+ """
+
+ _confdir = 'DoTOKWithClientCertOpenSSL'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _clientCert = True
+ _config_template = """
+dnssec:
+ validation: off
+outgoing:
+ dot_to_auth_names: [powerdns.com]
+ tls_configurations:
+ - name: dotwithverifyopensslandclientcert
+ ca_store: 'ca.pem'
+ subject_name: tls.tests.powerdns.com
+ subnets: ['127.0.0.1']
+ validate_certificate: true
+ verbose_logging: true
+ client_certificate: client.p12
+ client_certificate_password: passw0rd
+recursor:
+ forward_zones_recurse:
+ - zone: powerdns.com
+ forwarders: ['127.0.0.1:853']
+ devonly_regression_test_mode: true
+webservice:
+ webserver: true
+ port: %d
+ address: 127.0.0.1
+ password: %s
+ api_key: %s
+ """ % (_wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(DoTOKWithClientCertOpenSSLTest, cls).generateRecursorYamlConfig(confdir, False)
+
+ def testUDP(self):
+ """
+ Outgoing TLS: UDP query is sent via TLS
+ """
+ name = 'udp.outgoing-tls.test.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query, True)
+ rrset = dns.rrset.from_text(name,
+ 15,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ currentCount = 0
+ if 'TLS Responder' in self._responsesCounter:
+ currentCount = self._responsesCounter['TLS Responder']
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ # there was one TCP query
+ self.checkOnlyTLSResponderHit(currentCount + 1)
+ self.checkMetrics({
+ 'dot-outqueries': 1
+ })
class DoTOKGnuTLSTest(DoTWithLocalResponderTests):
"""