def checkResponseNoEDNS(self, expected, received):
self.checkMessageNoEDNS(expected, received)
+ def generateNewCertificateAndKey(self):
+ # generate and sign a new cert
+ cmd = ['openssl', 'req', '-new', '-newkey', 'rsa:2048', '-nodes', '-keyout', 'server.key', '-out', 'server.csr', '-config', 'configServer.conf']
+ output = None
+ try:
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+ output = process.communicate(input='')
+ except subprocess.CalledProcessError as exc:
+ raise AssertionError('openssl req failed (%d): %s' % (exc.returncode, exc.output))
+ cmd = ['openssl', 'x509', '-req', '-days', '1', '-CA', 'ca.pem', '-CAkey', 'ca.key', '-CAcreateserial', '-in', 'server.csr', '-out', 'server.pem', '-extfile', 'configServer.conf', '-extensions', 'v3_req']
+ output = None
+ try:
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+ output = process.communicate(input='')
+ except subprocess.CalledProcessError as exc:
+ raise AssertionError('openssl x509 failed (%d): %s' % (exc.returncode, exc.output))
+
+ with open('server.chain', 'w') as outFile:
+ for inFileName in ['server.pem', 'ca.pem']:
+ with open(inFileName) as inFile:
+ outFile.write(inFile.read())
#!/usr/bin/env python
+import base64
import dns
import os
import subprocess
process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
output = process.communicate(input='')
except subprocess.CalledProcessError as exc:
- raise AssertionError('dnsdist --check-config failed (%d): %s' % (exc.returncode, exc.output))
+ raise AssertionError('openssl s_client failed (%d): %s' % (exc.returncode, exc.output))
return output[0].decode()
+ @classmethod
+ def getOCSPSerial(cls, output):
+ serialNumber = None
+ for line in output.splitlines():
+ line = line.strip()
+ print(line)
+ if line.startswith('Serial Number:'):
+ (_, serialNumber) = line.split(':')
+ break
+
+ return serialNumber
+
@unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_dohServerPort = 8443
_config_template = """
newServer{address="127.0.0.1:%s"}
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
-- generate an OCSP response file for our certificate, valid one day
generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { ocspResponses={"%s"}})
"""
- _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile']
+ _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile']
@classmethod
def setUpClass(cls):
output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert)
self.assertIn('OCSP Response Status: successful (0x0)', output)
+ serialNumber = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber)
+
+ self.generateNewCertificateAndKey()
+ self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+ self.sendConsoleCommand("reloadAllCertificates()")
+
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)
+ serialNumber2 = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber2)
+ self.assertNotEquals(serialNumber, serialNumber2)
+
class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_tlsServerPort = 8443
_config_template = """
newServer{address="127.0.0.1:%s"}
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
-- generate an OCSP response file for our certificate, valid one day
generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}})
"""
- _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+ _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
def testOCSPStapling(self):
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertIn('OCSP Response Status: successful (0x0)', output)
+ serialNumber = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber)
+
+ self.generateNewCertificateAndKey()
+ self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+ self.sendConsoleCommand("reloadAllCertificates()")
+
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)
+ serialNumber2 = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber2)
+ self.assertNotEquals(serialNumber, serialNumber2)
+
class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_tlsServerPort = 8443
_config_template = """
newServer{address="127.0.0.1:%s"}
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
-- generate an OCSP response file for our certificate, valid one day
generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", ocspResponses={"%s"}})
"""
- _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+ _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
def testOCSPStapling(self):
"""
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertIn('OCSP Response Status: successful (0x0)', output)
+
+ serialNumber = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber)
+
+ self.generateNewCertificateAndKey()
+ self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+ self.sendConsoleCommand("reloadAllCertificates()")
+
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)
+ serialNumber2 = self.getOCSPSerial(output)
+ self.assertTrue(serialNumber2)
+ self.assertNotEquals(serialNumber, serialNumber2)
#!/usr/bin/env python
+import base64
import dns
import socket
import ssl
+import subprocess
import unittest
from dnsdisttests import DNSDistTest
class TLSTests(object):
+ def getServerCertificate(self):
+ conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+ return conn.getpeercert()
+
def testTLSSimple(self):
"""
TLS: Single query
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
+ # check the certificate
+ cert = self.getServerCertificate()
+ self.assertIn('subject', cert)
+ self.assertIn('serialNumber', cert)
+ self.assertIn('subjectAltName', cert)
+ subject = cert['subject']
+ altNames = cert['subjectAltName']
+ self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
+ self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
+ names = []
+ for entry in altNames:
+ names.append(entry[1])
+ self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com'])
+ serialNumber = cert['serialNumber']
+
+ self.generateNewCertificateAndKey()
+ self.sendConsoleCommand("reloadAllCertificates()")
+
+ # open a new connection
+ conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+
+ self.sendTCPQueryOverConnection(conn, query, response=response)
+ (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # check that the certificate is OK
+ cert = self.getServerCertificate()
+ self.assertIn('subject', cert)
+ self.assertIn('serialNumber', cert)
+ self.assertIn('subjectAltName', cert)
+ subject = cert['subject']
+ altNames = cert['subjectAltName']
+ self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
+ self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
+ names = []
+ for entry in altNames:
+ names.append(entry[1])
+ self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com'])
+
+ # and that the serial is different
+ self.assertNotEquals(serialNumber, cert['serialNumber'])
+
def testTLKA(self):
"""
TLS: Several queries over the same connection
class TestOpenSSL(DNSDistTest, TLSTests):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
_tlsServerPort = 8453
_config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+
newServer{address="127.0.0.1:%s"}
addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
"""
- _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
class TestGnuTLS(DNSDistTest, TLSTests):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
_tlsServerPort = 8453
_config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+
newServer{address="127.0.0.1:%s"}
addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
"""
- _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
class TestDOTWithCache(DNSDistTest):
-
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'