]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
chore(dnsdist): reformat test_OCSP.py
authorPieter Lexis <pieter.lexis@powerdns.com>
Thu, 15 Jan 2026 13:45:10 +0000 (14:45 +0100)
committerPieter Lexis <pieter.lexis@powerdns.com>
Fri, 16 Jan 2026 08:17:52 +0000 (09:17 +0100)
regression-tests.dnsdist/test_OCSP.py

index b3ab4ba5d6ad3e4bfdd0ef4214b10ad2970c865f..99a9d6b0f91bd6810c8ed5d39682258f8ec90d2d 100644 (file)
@@ -5,17 +5,36 @@ import subprocess
 import unittest
 from dnsdisttests import DNSDistTest, pickAvailablePort
 
+
 class DNSDistOCSPStaplingTest(DNSDistTest):
 
     @classmethod
     def checkOCSPStaplingStatus(cls, addr, port, serverName, caFile):
-        testcmd = ['openssl', 's_client', '-CAfile', caFile, '-connect', '%s:%d' % (addr, port), '-status', '-servername', serverName ]
+        testcmd = [
+            "openssl",
+            "s_client",
+            "-CAfile",
+            caFile,
+            "-connect",
+            "%s:%d" % (addr, port),
+            "-status",
+            "-servername",
+            serverName,
+        ]
         output = None
         try:
-            process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
-            output = process.communicate(input='')
+            process = subprocess.Popen(
+                testcmd,
+                stdout=subprocess.PIPE,
+                stdin=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                close_fds=True,
+            )
+            output = process.communicate(input="")
         except subprocess.CalledProcessError as exc:
-            raise AssertionError('openssl s_client failed (%d): %s' % (exc.returncode, exc.output))
+            raise AssertionError(
+                "openssl s_client failed (%d): %s" % (exc.returncode, exc.output)
+            )
 
         return output[0].decode()
 
@@ -25,8 +44,8 @@ class DNSDistOCSPStaplingTest(DNSDistTest):
         for line in output.splitlines():
             line = line.strip()
             print(line)
-            if line.startswith('Serial Number:'):
-                (_, serialNumber) = line.split(':')
+            if line.startswith("Serial Number:"):
+                (_, serialNumber) = line.split(":")
                 break
 
         return serialNumber
@@ -36,22 +55,23 @@ class DNSDistOCSPStaplingTest(DNSDistTest):
 
     @classmethod
     def setUpClass(cls):
-        cls.generateNewCertificateAndKey('server-ocsp')
+        cls.generateNewCertificateAndKey("server-ocsp")
         cls.startResponders()
         cls.startDNSDist()
         cls.setUpSockets()
 
-@unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
+
+@unittest.skipIf("SKIP_DOH_TESTS" in os.environ, "DNS over HTTPS tests are disabled")
 class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _ocspFile = 'server.ocsp'
-    _caCert = 'ca.pem'
-    _caKey = 'ca.key'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _ocspFile = "server.ocsp"
+    _caCert = "ca.pem"
+    _caKey = "ca.key"
     _dohWithNGHTTP2ServerPort = pickAvailablePort()
     _dohWithH2OServerPort = pickAvailablePort()
     _config_template = """
@@ -64,16 +84,32 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
     addDOHLocal("127.0.0.1:%d", "%s", "%s", { "/" }, { ocspResponses={"%s"}, library='nghttp2'})
     addDOHLocal("127.0.0.1:%d", "%s", "%s", { "/" }, { ocspResponses={"%s"}, library='h2o'})
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_ocspFile', '_dohWithH2OServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_serverCert",
+        "_caCert",
+        "_caKey",
+        "_ocspFile",
+        "_dohWithNGHTTP2ServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+        "_dohWithH2OServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     @classmethod
     def setUpClass(cls):
 
         # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
-        if 'SKIP_DOH_TESTS' in os.environ:
-            raise unittest.SkipTest('DNS over HTTPS tests are disabled')
+        if "SKIP_DOH_TESTS" in os.environ:
+            raise unittest.SkipTest("DNS over HTTPS tests are disabled")
 
-        cls.generateNewCertificateAndKey('server-ocsp')
+        cls.generateNewCertificateAndKey("server-ocsp")
         cls.startResponders()
         cls.startDNSDist()
         cls.setUpSockets()
@@ -85,32 +121,40 @@ class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
         OCSP Stapling: DOH
         """
         for port in [self._dohWithNGHTTP2ServerPort, self._dohWithH2OServerPort]:
-            output = self.checkOCSPStaplingStatus('127.0.0.1', port, self._serverName, self._caCert)
-            self.assertIn('OCSP Response Status: successful (0x0)', output)
+            output = self.checkOCSPStaplingStatus(
+                "127.0.0.1", port, self._serverName, self._caCert
+            )
+            self.assertIn("OCSP Response Status: successful (0x0)", output)
 
             serialNumber = self.getOCSPSerial(output)
             self.assertTrue(serialNumber)
 
-            self.generateNewCertificateAndKey('server-ocsp')
-            self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+            self.generateNewCertificateAndKey("server-ocsp")
+            self.sendConsoleCommand(
+                "generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)"
+                % (self._serverCert, self._caCert, self._caKey, self._ocspFile)
+            )
             self.sendConsoleCommand("reloadAllCertificates()")
 
-            output = self.checkOCSPStaplingStatus('127.0.0.1', port, self._serverName, self._caCert)
-            self.assertIn('OCSP Response Status: successful (0x0)', output)
+            output = self.checkOCSPStaplingStatus(
+                "127.0.0.1", port, self._serverName, self._caCert
+            )
+            self.assertIn("OCSP Response Status: successful (0x0)", output)
             serialNumber2 = self.getOCSPSerial(output)
             self.assertTrue(serialNumber2)
             self.assertNotEqual(serialNumber, serialNumber2)
 
+
 class TestBrokenOCSPStaplingDoH(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _caCert = 'ca.pem'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _caCert = "ca.pem"
     # invalid OCSP file!
-    _ocspFile = '/dev/null'
+    _ocspFile = "/dev/null"
     _dohWithNGHTTP2ServerPort = pickAvailablePort()
     _dohWithH2OServerPort = pickAvailablePort()
     _config_template = """
@@ -122,26 +166,41 @@ class TestBrokenOCSPStaplingDoH(DNSDistOCSPStaplingTest):
     addDOHLocal("127.0.0.1:%d", "%s", "%s", { "/" }, { ocspResponses={"%s"}, library='h2o'})
 
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_ocspFile', '_dohWithH2OServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_dohWithNGHTTP2ServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+        "_dohWithH2OServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     def testBrokenOCSPStapling(self):
         """
         OCSP Stapling: Broken (DoH)
         """
         for port in [self._dohWithNGHTTP2ServerPort, self._dohWithH2OServerPort]:
-            output = self.checkOCSPStaplingStatus('127.0.0.1', port, self._serverName, self._caCert)
-            self.assertNotIn('OCSP Response Status: successful (0x0)', output)
+            output = self.checkOCSPStaplingStatus(
+                "127.0.0.1", port, self._serverName, self._caCert
+            )
+            self.assertNotIn("OCSP Response Status: successful (0x0)", output)
+
 
 class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _ocspFile = 'server.ocsp'
-    _caCert = 'ca.pem'
-    _caKey = 'ca.key'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _ocspFile = "server.ocsp"
+    _caCert = "ca.pem"
+    _caKey = "ca.key"
     _tlsServerPort = pickAvailablePort()
     _config_template = """
     newServer{address="127.0.0.1:%d"}
@@ -152,39 +211,59 @@ class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
     generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
     addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_serverCert",
+        "_caCert",
+        "_caKey",
+        "_ocspFile",
+        "_tlsServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     def testOCSPStapling(self):
         """
         OCSP Stapling: TLS (GnuTLS)
         """
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertIn("OCSP Response Status: successful (0x0)", output)
         self.assertEqual(self.getTLSProvider(), "gnutls")
 
         serialNumber = self.getOCSPSerial(output)
         self.assertTrue(serialNumber)
 
-        self.generateNewCertificateAndKey('server-ocsp')
-        self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+        self.generateNewCertificateAndKey("server-ocsp")
+        self.sendConsoleCommand(
+            "generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)"
+            % (self._serverCert, self._caCert, self._caKey, self._ocspFile)
+        )
         self.sendConsoleCommand("reloadAllCertificates()")
 
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertIn("OCSP Response Status: successful (0x0)", output)
         serialNumber2 = self.getOCSPSerial(output)
         self.assertTrue(serialNumber2)
         self.assertNotEqual(serialNumber, serialNumber2)
 
+
 class TestBrokenOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _caCert = 'ca.pem'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _caCert = "ca.pem"
     # invalid OCSP file!
-    _ocspFile = '/dev/null'
+    _ocspFile = "/dev/null"
     _tlsServerPort = pickAvailablePort()
     _config_template = """
     newServer{address="127.0.0.1:%d"}
@@ -193,26 +272,37 @@ class TestBrokenOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
 
     addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_tlsServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     def testBrokenOCSPStapling(self):
         """
         OCSP Stapling: Broken (GnuTLS)
         """
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertNotIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertNotIn("OCSP Response Status: successful (0x0)", output)
         self.assertEqual(self.getTLSProvider(), "gnutls")
 
+
 class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _ocspFile = 'server.ocsp'
-    _caCert = 'ca.pem'
-    _caKey = 'ca.key'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _ocspFile = "server.ocsp"
+    _caCert = "ca.pem"
+    _caKey = "ca.key"
     _tlsServerPort = pickAvailablePort()
     _config_template = """
     newServer{address="127.0.0.1:%d"}
@@ -223,39 +313,59 @@ class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
     generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
     addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="openssl", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_serverCert",
+        "_caCert",
+        "_caKey",
+        "_ocspFile",
+        "_tlsServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     def testOCSPStapling(self):
         """
         OCSP Stapling: TLS (OpenSSL)
         """
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertIn("OCSP Response Status: successful (0x0)", output)
         self.assertEqual(self.getTLSProvider(), "openssl")
 
         serialNumber = self.getOCSPSerial(output)
         self.assertTrue(serialNumber)
 
-        self.generateNewCertificateAndKey('server-ocsp')
-        self.sendConsoleCommand("generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)" % (self._serverCert, self._caCert, self._caKey, self._ocspFile))
+        self.generateNewCertificateAndKey("server-ocsp")
+        self.sendConsoleCommand(
+            "generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)"
+            % (self._serverCert, self._caCert, self._caKey, self._ocspFile)
+        )
         self.sendConsoleCommand("reloadAllCertificates()")
 
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertIn("OCSP Response Status: successful (0x0)", output)
         serialNumber2 = self.getOCSPSerial(output)
         self.assertTrue(serialNumber2)
         self.assertNotEqual(serialNumber, serialNumber2)
 
+
 class TestBrokenOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
 
     _consoleKey = DNSDistTest.generateConsoleKey()
-    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
-    _serverKey = 'server-ocsp.key'
-    _serverCert = 'server-ocsp.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _caCert = 'ca.pem'
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode("ascii")
+    _serverKey = "server-ocsp.key"
+    _serverCert = "server-ocsp.chain"
+    _serverName = "tls.tests.dnsdist.org"
+    _caCert = "ca.pem"
     # invalid OCSP file!
-    _ocspFile = '/dev/null'
+    _ocspFile = "/dev/null"
     _tlsServerPort = pickAvailablePort()
     _config_template = """
     newServer{address="127.0.0.1:%d"}
@@ -264,12 +374,22 @@ class TestBrokenOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
 
     addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="openssl", ocspResponses={"%s"}})
     """
-    _config_params = ['_testServerPort', '_consoleKeyB64', '_consolePort', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+    _config_params = [
+        "_testServerPort",
+        "_consoleKeyB64",
+        "_consolePort",
+        "_tlsServerPort",
+        "_serverCert",
+        "_serverKey",
+        "_ocspFile",
+    ]
 
     def testBrokenOCSPStapling(self):
         """
         OCSP Stapling: Broken (OpenSSL)
         """
-        output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
-        self.assertNotIn('OCSP Response Status: successful (0x0)', output)
+        output = self.checkOCSPStaplingStatus(
+            "127.0.0.1", self._tlsServerPort, self._serverName, self._caCert
+        )
+        self.assertNotIn("OCSP Response Status: successful (0x0)", output)
         self.assertEqual(self.getTLSProvider(), "openssl")