]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add regression tests for certificates and OCSP reloading 9986/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 18 Jan 2021 14:41:10 +0000 (15:41 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 18 Jan 2021 14:41:10 +0000 (15:41 +0100)
regression-tests.dnsdist/dnsdisttests.py
regression-tests.dnsdist/test_OCSP.py
regression-tests.dnsdist/test_TLS.py

index f84569ffbe439e7d1dff06fbdf26dcffa4f8f4be..178b424365e45c988a59366c7639d5900c79e9af 100644 (file)
@@ -602,3 +602,24 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase):
     def checkResponseNoEDNS(self, expected, received):
         self.checkMessageNoEDNS(expected, received)
 
+    def generateNewCertificateAndKey(self):
+        # generate and sign a new cert
+        cmd = ['openssl', 'req', '-new', '-newkey', 'rsa:2048', '-nodes', '-keyout', 'server.key', '-out', 'server.csr', '-config', 'configServer.conf']
+        output = None
+        try:
+            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+            output = process.communicate(input='')
+        except subprocess.CalledProcessError as exc:
+            raise AssertionError('openssl req failed (%d): %s' % (exc.returncode, exc.output))
+        cmd = ['openssl', 'x509', '-req', '-days', '1', '-CA', 'ca.pem', '-CAkey', 'ca.key', '-CAcreateserial', '-in', 'server.csr', '-out', 'server.pem', '-extfile', 'configServer.conf', '-extensions', 'v3_req']
+        output = None
+        try:
+            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+            output = process.communicate(input='')
+        except subprocess.CalledProcessError as exc:
+            raise AssertionError('openssl x509 failed (%d): %s' % (exc.returncode, exc.output))
+
+        with open('server.chain', 'w') as outFile:
+            for inFileName in ['server.pem', 'ca.pem']:
+                with open(inFileName) as inFile:
+                    outFile.write(inFile.read())
index 5e2d5485bb64704c6206c44afecf57c0cccebf20..abbb16e76ef39259e4236617abb7550161a89e81 100644 (file)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python
+import base64
 import dns
 import os
 import subprocess
@@ -15,13 +16,27 @@ class DNSDistOCSPStaplingTest(DNSDistTest):
             process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
             output = process.communicate(input='')
         except subprocess.CalledProcessError as exc:
-            raise AssertionError('dnsdist --check-config failed (%d): %s' % (exc.returncode, exc.output))
+            raise AssertionError('openssl s_client failed (%d): %s' % (exc.returncode, exc.output))
 
         return output[0].decode()
 
+    @classmethod
+    def getOCSPSerial(cls, output):
+        serialNumber = None
+        for line in output.splitlines():
+            line = line.strip()
+            print(line)
+            if line.startswith('Serial Number:'):
+                (_, serialNumber) = line.split(':')
+                break
+
+        return serialNumber
+
 @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
 class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
 
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
@@ -31,12 +46,14 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
     _dohServerPort = 8443
     _config_template = """
     newServer{address="127.0.0.1:%s"}
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
 
     -- generate an OCSP response file for our certificate, valid one day
     generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
     addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile']
 
     @classmethod
     def setUpClass(cls):
@@ -58,8 +75,23 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
         output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert)
         self.assertIn('OCSP Response Status: successful (0x0)', output)
 
+        serialNumber = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber)
+
+        self.generateNewCertificateAndKey()
+        self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert)
+        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        serialNumber2 = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber2)
+        self.assertNotEquals(serialNumber, serialNumber2)
+
 class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
 
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
@@ -69,12 +101,14 @@ class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
     _tlsServerPort = 8443
     _config_template = """
     newServer{address="127.0.0.1:%s"}
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
 
     -- generate an OCSP response file for our certificate, valid one day
     generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
     addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
 
     def testOCSPStapling(self):
         """
@@ -83,8 +117,23 @@ class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
         output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
         self.assertIn('OCSP Response Status: successful (0x0)', output)
 
+        serialNumber = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber)
+
+        self.generateNewCertificateAndKey()
+        self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        serialNumber2 = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber2)
+        self.assertNotEquals(serialNumber, serialNumber2)
+
 class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
 
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
@@ -94,12 +143,14 @@ class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
     _tlsServerPort = 8443
     _config_template = """
     newServer{address="127.0.0.1:%s"}
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
 
     -- generate an OCSP response file for our certificate, valid one day
     generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
     addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
 
     def testOCSPStapling(self):
         """
@@ -107,3 +158,16 @@ class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
         """
         output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
         self.assertIn('OCSP Response Status: successful (0x0)', output)
+
+        serialNumber = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber)
+
+        self.generateNewCertificateAndKey()
+        self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        serialNumber2 = self.getOCSPSerial(output)
+        self.assertTrue(serialNumber2)
+        self.assertNotEquals(serialNumber, serialNumber2)
index f2c8a9ba83a3de172488c735ea609296a03b5288..e0bb572c025ecf7e6cdb69aae42a0024ff992071 100644 (file)
@@ -1,12 +1,18 @@
 #!/usr/bin/env python
+import base64
 import dns
 import socket
 import ssl
+import subprocess
 import unittest
 from dnsdisttests import DNSDistTest
 
 class TLSTests(object):
 
+    def getServerCertificate(self):
+        conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+        return conn.getpeercert()
+
     def testTLSSimple(self):
         """
         TLS: Single query
@@ -31,6 +37,52 @@ class TLSTests(object):
         self.assertEquals(query, receivedQuery)
         self.assertEquals(response, receivedResponse)
 
+        # check the certificate
+        cert = self.getServerCertificate()
+        self.assertIn('subject', cert)
+        self.assertIn('serialNumber', cert)
+        self.assertIn('subjectAltName', cert)
+        subject = cert['subject']
+        altNames = cert['subjectAltName']
+        self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
+        self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
+        names = []
+        for entry in altNames:
+            names.append(entry[1])
+        self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com'])
+        serialNumber = cert['serialNumber']
+
+        self.generateNewCertificateAndKey()
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        # open a new connection
+        conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+
+        self.sendTCPQueryOverConnection(conn, query, response=response)
+        (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # check that the certificate is OK
+        cert = self.getServerCertificate()
+        self.assertIn('subject', cert)
+        self.assertIn('serialNumber', cert)
+        self.assertIn('subjectAltName', cert)
+        subject = cert['subject']
+        altNames = cert['subjectAltName']
+        self.assertEquals(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
+        self.assertEquals(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
+        names = []
+        for entry in altNames:
+            names.append(entry[1])
+        self.assertEquals(names, ['tls.tests.dnsdist.org', 'powerdns.com'])
+
+        # and that the serial is different
+        self.assertNotEquals(serialNumber, cert['serialNumber'])
+
     def testTLKA(self):
         """
         TLS: Several queries over the same connection
@@ -199,34 +251,43 @@ class TLSTests(object):
 
 class TestOpenSSL(DNSDistTest, TLSTests):
 
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
     _caCert = 'ca.pem'
     _tlsServerPort = 8453
     _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+
     newServer{address="127.0.0.1:%s"}
     addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
     addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
     """
-    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
 
 class TestGnuTLS(DNSDistTest, TLSTests):
 
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
     _caCert = 'ca.pem'
     _tlsServerPort = 8453
     _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+
     newServer{address="127.0.0.1:%s"}
     addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
     addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
     """
-    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
 
 class TestDOTWithCache(DNSDistTest):
-
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'