]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_DOQ.py
dnsdist: Add a regression test for DoQ certs/keys reloading
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
index 965e9d6dd32ae15f5061a1b857aa2f550a75dd87..9af5d8a9387bfceebd462feeea7a1ad59fdb599b 100644 (file)
@@ -1,12 +1,14 @@
 #!/usr/bin/env python
+import base64
 import dns
 import clientsubnetoption
 
 from dnsdisttests import DNSDistTest
 from dnsdisttests import pickAvailablePort
 from doqclient import quic_bogus_query
-from quictests import QUICTests, QUICWithCacheTests
+from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
 import doqclient
+from doqclient import quic_query
 
 class TestDOQBogus(DNSDistTest):
     _serverKey = 'server.key'
@@ -20,7 +22,6 @@ class TestDOQBogus(DNSDistTest):
     addDOQLocal("127.0.0.1:%d", "%s", "%s")
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
     def testDOQBogus(self):
         """
@@ -55,7 +56,6 @@ class TestDOQ(QUICTests, DNSDistTest):
     addDOQLocal("127.0.0.1:%d", "%s", "%s")
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
     def getQUICConnection(self):
         return self.getDOQConnection(self._doqServerPort, self._caCert)
@@ -78,10 +78,67 @@ class TestDOQWithCache(QUICWithCacheTests, DNSDistTest):
     getPool(""):setCache(pc)
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
     def getQUICConnection(self):
         return self.getDOQConnection(self._doqServerPort, self._caCert)
 
     def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
         return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
+
+class TestDOQWithACL(QUICACLTests, DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _config_template = """
+    newServer{address="127.0.0.1:%d"}
+
+    setACL("192.0.2.1/32")
+    addDOQLocal("127.0.0.1:%d", "%s", "%s")
+    """
+    _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
+
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
+
+class TestDOQCertificateReloading(DNSDistTest):
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _serverKey = 'server-doq.key'
+    _serverCert = 'server-doq.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+
+    newServer{address="127.0.0.1:%d"}
+
+    addDOQLocal("127.0.0.1:%d", "%s", "%s")
+    """
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+
+    @classmethod
+    def setUpClass(cls):
+        cls.generateNewCertificateAndKey('server-doq')
+        cls.startResponders()
+        cls.startDNSDist()
+        cls.setUpSockets()
+
+    def testCertificateReloaded(self):
+        name = 'certificate-reload.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+
+        self.generateNewCertificateAndKey('server-doq')
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+        # check that the serial is different
+        self.assertNotEqual(serial, secondSerial)