From: Remi Gacogne Date: Mon, 18 Jan 2021 14:41:10 +0000 (+0100) Subject: dnsdist: Add regression tests for certificates and OCSP reloading X-Git-Tag: dnsdist-1.6.0-alpha1~28^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F9986%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Add regression tests for certificates and OCSP reloading --- diff --git a/regression-tests.dnsdist/dnsdisttests.py b/regression-tests.dnsdist/dnsdisttests.py index f84569ffbe..178b424365 100644 --- a/regression-tests.dnsdist/dnsdisttests.py +++ b/regression-tests.dnsdist/dnsdisttests.py @@ -602,3 +602,24 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase): def checkResponseNoEDNS(self, expected, received): self.checkMessageNoEDNS(expected, received) + def generateNewCertificateAndKey(self): + # generate and sign a new cert + cmd = ['openssl', 'req', '-new', '-newkey', 'rsa:2048', '-nodes', '-keyout', 'server.key', '-out', 'server.csr', '-config', 'configServer.conf'] + output = None + try: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) + output = process.communicate(input='') + except subprocess.CalledProcessError as exc: + raise AssertionError('openssl req failed (%d): %s' % (exc.returncode, exc.output)) + cmd = ['openssl', 'x509', '-req', '-days', '1', '-CA', 'ca.pem', '-CAkey', 'ca.key', '-CAcreateserial', '-in', 'server.csr', '-out', 'server.pem', '-extfile', 'configServer.conf', '-extensions', 'v3_req'] + output = None + try: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) + output = process.communicate(input='') + except subprocess.CalledProcessError as exc: + raise AssertionError('openssl x509 failed (%d): %s' % (exc.returncode, exc.output)) + + with open('server.chain', 'w') as outFile: + for inFileName in ['server.pem', 'ca.pem']: + with open(inFileName) as inFile: + outFile.write(inFile.read()) diff --git a/regression-tests.dnsdist/test_OCSP.py b/regression-tests.dnsdist/test_OCSP.py index 5e2d5485bb..abbb16e76e 100644 --- a/regression-tests.dnsdist/test_OCSP.py +++ b/regression-tests.dnsdist/test_OCSP.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import base64 import dns import os import subprocess @@ -15,13 +16,27 @@ class DNSDistOCSPStaplingTest(DNSDistTest): process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) output = process.communicate(input='') except subprocess.CalledProcessError as exc: - raise AssertionError('dnsdist --check-config failed (%d): %s' % (exc.returncode, exc.output)) + raise AssertionError('openssl s_client failed (%d): %s' % (exc.returncode, exc.output)) return output[0].decode() + @classmethod + def getOCSPSerial(cls, output): + serialNumber = None + for line in output.splitlines(): + line = line.strip() + print(line) + if line.startswith('Serial Number:'): + (_, serialNumber) = line.split(':') + break + + return serialNumber + @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled') class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest): + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org' @@ -31,12 +46,14 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest): _dohServerPort = 8443 _config_template = """ newServer{address="127.0.0.1:%s"} + setKey("%s") + controlSocket("127.0.0.1:%s") -- generate an OCSP response file for our certificate, valid one day generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0) addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { ocspResponses={"%s"}}) """ - _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile'] + _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile'] @classmethod def setUpClass(cls): @@ -58,8 +75,23 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest): output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert) self.assertIn('OCSP Response Status: successful (0x0)', output) + serialNumber = self.getOCSPSerial(output) + self.assertTrue(serialNumber) + + self.generateNewCertificateAndKey() + self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile)) + self.sendConsoleCommand("reloadAllCertificates()") + + output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert) + self.assertIn('OCSP Response Status: successful (0x0)', output) + serialNumber2 = self.getOCSPSerial(output) + self.assertTrue(serialNumber2) + self.assertNotEquals(serialNumber, serialNumber2) + class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest): + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org' @@ -69,12 +101,14 @@ class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest): _tlsServerPort = 8443 _config_template = """ newServer{address="127.0.0.1:%s"} + setKey("%s") + controlSocket("127.0.0.1:%s") -- generate an OCSP response file for our certificate, valid one day generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0) addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}}) """ - _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile'] + _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile'] def testOCSPStapling(self): """ @@ -83,8 +117,23 @@ class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest): output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert) self.assertIn('OCSP Response Status: successful (0x0)', output) + serialNumber = self.getOCSPSerial(output) + self.assertTrue(serialNumber) + + self.generateNewCertificateAndKey() + self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile)) + self.sendConsoleCommand("reloadAllCertificates()") + + output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert) + self.assertIn('OCSP Response Status: successful (0x0)', output) + serialNumber2 = self.getOCSPSerial(output) + self.assertTrue(serialNumber2) + self.assertNotEquals(serialNumber, serialNumber2) + class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest): + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org' @@ -94,12 +143,14 @@ class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest): _tlsServerPort = 8443 _config_template = """ newServer{address="127.0.0.1:%s"} + setKey("%s") + controlSocket("127.0.0.1:%s") -- generate an OCSP response file for our certificate, valid one day generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0) addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", ocspResponses={"%s"}}) """ - _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile'] + _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile'] def testOCSPStapling(self): """ @@ -107,3 +158,16 @@ class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest): """ output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert) self.assertIn('OCSP Response Status: successful (0x0)', output) + + serialNumber = self.getOCSPSerial(output) + self.assertTrue(serialNumber) + + self.generateNewCertificateAndKey() + self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile)) + self.sendConsoleCommand("reloadAllCertificates()") + + output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert) + self.assertIn('OCSP Response Status: successful (0x0)', output) + serialNumber2 = self.getOCSPSerial(output) + self.assertTrue(serialNumber2) + self.assertNotEquals(serialNumber, serialNumber2) diff --git a/regression-tests.dnsdist/test_TLS.py b/regression-tests.dnsdist/test_TLS.py index f2c8a9ba83..e0bb572c02 100644 --- a/regression-tests.dnsdist/test_TLS.py +++ b/regression-tests.dnsdist/test_TLS.py @@ -1,12 +1,18 @@ #!/usr/bin/env python +import base64 import dns import socket import ssl +import subprocess import unittest from dnsdisttests import DNSDistTest class TLSTests(object): + def getServerCertificate(self): + conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert) + return conn.getpeercert() + def testTLSSimple(self): """ TLS: Single query @@ -31,6 +37,52 @@ class TLSTests(object): self.assertEquals(query, receivedQuery) self.assertEquals(response, receivedResponse) + # check the certificate + cert = self.getServerCertificate() + self.assertIn('subject', cert) + self.assertIn('serialNumber', cert) + self.assertIn('subjectAltName', cert) + subject = cert['subject'] + altNames = cert['subjectAltName'] + self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org') + self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV') + names = [] + for entry in altNames: + names.append(entry[1]) + self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com']) + serialNumber = cert['serialNumber'] + + self.generateNewCertificateAndKey() + self.sendConsoleCommand("reloadAllCertificates()") + + # open a new connection + conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert) + + self.sendTCPQueryOverConnection(conn, query, response=response) + (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(response, receivedResponse) + + # check that the certificate is OK + cert = self.getServerCertificate() + self.assertIn('subject', cert) + self.assertIn('serialNumber', cert) + self.assertIn('subjectAltName', cert) + subject = cert['subject'] + altNames = cert['subjectAltName'] + self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org') + self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV') + names = [] + for entry in altNames: + names.append(entry[1]) + self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com']) + + # and that the serial is different + self.assertNotEquals(serialNumber, cert['serialNumber']) + def testTLKA(self): """ TLS: Several queries over the same connection @@ -199,34 +251,43 @@ class TLSTests(object): class TestOpenSSL(DNSDistTest, TLSTests): + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org' _caCert = 'ca.pem' _tlsServerPort = 8453 _config_template = """ + setKey("%s") + controlSocket("127.0.0.1:%s") + newServer{address="127.0.0.1:%s"} addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" }) addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4")) """ - _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] + _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] class TestGnuTLS(DNSDistTest, TLSTests): + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org' _caCert = 'ca.pem' _tlsServerPort = 8453 _config_template = """ + setKey("%s") + controlSocket("127.0.0.1:%s") + newServer{address="127.0.0.1:%s"} addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" }) addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4")) """ - _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] + _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] class TestDOTWithCache(DNSDistTest): - _serverKey = 'server.key' _serverCert = 'server.chain' _serverName = 'tls.tests.dnsdist.org'