]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_DOQ.py
dnsdist: Add a regression test for DoQ certs/keys reloading
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
index 9a87b62255fc274ea858c2597fc49c80d547dd41..9af5d8a9387bfceebd462feeea7a1ad59fdb599b 100644 (file)
@@ -1,11 +1,14 @@
 #!/usr/bin/env python
+import base64
 import dns
 import clientsubnetoption
 
 from dnsdisttests import DNSDistTest
 from dnsdisttests import pickAvailablePort
 from doqclient import quic_bogus_query
+from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
 import doqclient
+from doqclient import quic_query
 
 class TestDOQBogus(DNSDistTest):
     _serverKey = 'server.key'
@@ -19,7 +22,6 @@ class TestDOQBogus(DNSDistTest):
     addDOQLocal("127.0.0.1:%d", "%s", "%s")
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
     def testDOQBogus(self):
         """
@@ -37,7 +39,7 @@ class TestDOQBogus(DNSDistTest):
         except doqclient.StreamResetError as e :
             self.assertEqual(e.error, 2);
 
-class TestDOQ(DNSDistTest):
+class TestDOQ(QUICTests, DNSDistTest):
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
@@ -54,122 +56,14 @@ class TestDOQ(DNSDistTest):
     addDOQLocal("127.0.0.1:%d", "%s", "%s")
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
-    def testDOQSimple(self):
-        """
-        DOQ: Simple query
-        """
-        name = 'simple.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
-        query.id = 0
-        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
-        expectedQuery.id = 0
-        response = dns.message.make_response(query)
-        rrset = dns.rrset.from_text(name,
-                                    3600,
-                                    dns.rdataclass.IN,
-                                    dns.rdatatype.A,
-                                    '127.0.0.1')
-        response.answer.append(rrset)
-        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.assertEqual(expectedQuery, receivedQuery)
-
-    def testDOQMultipleStreams(self):
-        """
-        DOQ: Test multiple queries using the same connection
-        """
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
 
-        name = 'simple.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
-        query.id = 0
-        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
-        expectedQuery.id = 0
-        response = dns.message.make_response(query)
-        rrset = dns.rrset.from_text(name,
-                                    3600,
-                                    dns.rdataclass.IN,
-                                    dns.rdatatype.A,
-                                    '127.0.0.1')
-        response.answer.append(rrset)
-
-        connection = self.getDOQConnection(self._doqServerPort, self._caCert)
-
-        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.assertEqual(expectedQuery, receivedQuery)
-
-        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.assertEqual(expectedQuery, receivedQuery)
-
-    def testDropped(self):
-        """
-        DOQ: Dropped query
-        """
-        name = 'drop.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN')
-        dropped = False
-        try:
-            (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
-            self.assertTrue(False)
-        except doqclient.StreamResetError as e :
-            self.assertEqual(e.error, 5);
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
 
-    def testRefused(self):
-        """
-        DOQ: Refused
-        """
-        name = 'refused.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN')
-        query.id = 0
-        query.flags &= ~dns.flags.RD
-        expectedResponse = dns.message.make_response(query)
-        expectedResponse.set_rcode(dns.rcode.REFUSED)
-
-        (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
-        self.assertEqual(receivedResponse, expectedResponse)
-
-    def testSpoof(self):
-        """
-        DOQ: Spoofed
-        """
-        name = 'spoof.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN')
-        query.id = 0
-        query.flags &= ~dns.flags.RD
-        expectedResponse = dns.message.make_response(query)
-        rrset = dns.rrset.from_text(name,
-                                    3600,
-                                    dns.rdataclass.IN,
-                                    dns.rdatatype.A,
-                                    '1.2.3.4')
-        expectedResponse.answer.append(rrset)
-
-        (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
-        self.assertEqual(receivedResponse, expectedResponse)
-
-    def testDOQNoBackend(self):
-        """
-        DOQ: No backend
-        """
-        name = 'no-backend.doq.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
-        dropped = False
-        try:
-            (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
-            self.assertTrue(False)
-        except doqclient.StreamResetError as e :
-            self.assertEqual(e.error, 5);
-
-class TestDOQWithCache(DNSDistTest):
+class TestDOQWithCache(QUICWithCacheTests, DNSDistTest):
     _serverKey = 'server.key'
     _serverCert = 'server.chain'
     _serverName = 'tls.tests.dnsdist.org'
@@ -184,43 +78,67 @@ class TestDOQWithCache(DNSDistTest):
     getPool(""):setCache(pc)
     """
     _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
-    _verboseMode = True
 
-    def testCached(self):
-        """
-        Cache: Served from cache
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
 
-        dnsdist is configured to cache entries, we are sending several
-        identical requests and checking that the backend only receive
-        the first one.
-        """
-        numberOfQueries = 10
-        name = 'cached.cache.tests.powerdns.com.'
-        query = dns.message.make_query(name, 'AAAA', 'IN')
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
+
+class TestDOQWithACL(QUICACLTests, DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _config_template = """
+    newServer{address="127.0.0.1:%d"}
+
+    setACL("192.0.2.1/32")
+    addDOQLocal("127.0.0.1:%d", "%s", "%s")
+    """
+    _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
+
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
+
+class TestDOQCertificateReloading(DNSDistTest):
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _serverKey = 'server-doq.key'
+    _serverCert = 'server-doq.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+
+    newServer{address="127.0.0.1:%d"}
+
+    addDOQLocal("127.0.0.1:%d", "%s", "%s")
+    """
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+
+    @classmethod
+    def setUpClass(cls):
+        cls.generateNewCertificateAndKey('server-doq')
+        cls.startResponders()
+        cls.startDNSDist()
+        cls.setUpSockets()
+
+    def testCertificateReloaded(self):
+        name = 'certificate-reload.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
         query.id = 0
-        response = dns.message.make_response(query)
-        rrset = dns.rrset.from_text(name,
-                                    3600,
-                                    dns.rdataclass.IN,
-                                    dns.rdatatype.AAAA,
-                                    '::1')
-        response.answer.append(rrset)
-
-        # first query to fill the cache
-        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEqual(query, receivedQuery)
-        self.assertEqual(receivedResponse, response)
-
-        for _ in range(numberOfQueries):
-            (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
-            self.assertEqual(receivedResponse, response)
-
-        total = 0
-        for key in self._responsesCounter:
-            total += self._responsesCounter[key]
-            TestDOQWithCache._responsesCounter[key] = 0
-
-        self.assertEqual(total, 1)
+        (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+
+        self.generateNewCertificateAndKey('server-doq')
+        self.sendConsoleCommand("reloadAllCertificates()")
+
+        (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+        # check that the serial is different
+        self.assertNotEqual(serial, secondSerial)