]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add regression tests for DNSQuestion.getProtocol()
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 29 Jun 2021 13:38:39 +0000 (15:38 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 29 Jun 2021 14:49:52 +0000 (16:49 +0200)
regression-tests.dnsdist/test_Advanced.py
regression-tests.dnsdist/test_DNSCrypt.py
regression-tests.dnsdist/test_DOH.py
regression-tests.dnsdist/test_TLS.py

index c5c5012d082c4fdf65161d6adf4cc0efbdbdde65..f784f9b33b4956ed6a35c4d22e22856170bd6935 100644 (file)
@@ -2148,3 +2148,50 @@ class TestAdvancedDropEmptyQueries(DNSDistTest):
             sender = getattr(self, method)
             (_, receivedResponse) = sender(query, response=None, useQueue=False)
             self.assertEqual(receivedResponse, None)
+
+class TestProtocols(DNSDistTest):
+    _config_template = """
+    function checkUDP(dq)
+      if dq:getProtocol() ~= "Do53 UDP" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    function checkTCP(dq)
+      if dq:getProtocol() ~= "Do53 TCP" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    addAction("udp.protocols.advanced.tests.powerdns.com.", LuaAction(checkUDP))
+    addAction("tcp.protocols.advanced.tests.powerdns.com.", LuaAction(checkTCP))
+    newServer{address="127.0.0.1:%s"}
+    """
+
+    def testProtocolUDP(self):
+        """
+        Advanced: Test DNSQuestion.Protocol over UDP
+        """
+        name = 'udp.protocols.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        receivedQuery.id = query.id
+        self.assertEqual(receivedQuery, query)
+        self.assertEqual(receivedResponse, response)
+
+    def testProtocolTCP(self):
+        """
+        Advanced: Test DNSQuestion.Protocol over TCP
+        """
+        name = 'tcp.protocols.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        receivedQuery.id = query.id
+        self.assertEqual(receivedQuery, query)
+        self.assertEqual(receivedResponse, response)
index 1962342e575a39ca2f74049be5bceac7ef3fa62c..15cc8dc08b5777008c3fe68228b9f792d4fbc2c5 100644 (file)
@@ -53,6 +53,23 @@ class TestDNSCrypt(DNSCryptTest):
     generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
     addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
     newServer{address="127.0.0.1:%s"}
+
+    function checkDNSCryptUDP(dq)
+      if dq:getProtocol() ~= "DNSCrypt UDP" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    function checkDNSCryptTCP(dq)
+      if dq:getProtocol() ~= "DNSCrypt TCP" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP))
+    addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP))
     """
 
     _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
@@ -229,6 +246,28 @@ class TestDNSCrypt(DNSCryptTest):
         self.assertTrue(cert)
         self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
 
+    def testProtocolUDP(self):
+        """
+        DNSCrypt: Test DNSQuestion.Protocol over UDP
+        """
+        client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+        name = 'udp.protocols.dnscrypt.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        self.doDNSCryptQuery(client, query, response, False)
+
+    def testProtocolTCP(self):
+        """
+        DNSCrypt: Test DNSQuestion.Protocol over TCP
+        """
+        client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+        name = 'tcp.protocols.dnscrypt.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        self.doDNSCryptQuery(client, query, response, True)
+
 class TestDNSCryptWithCache(DNSCryptTest):
 
     _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
index a7a4135fef3a7579a58fcda9d861259e8fe4c11a..527090ac2ee418a6ab702920e7a3f9dfa63978f2 100644 (file)
@@ -1216,7 +1216,6 @@ class TestDOHFrontendLimits(DNSDistDOHTest):
     addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { maxConcurrentTCPConnections=%d })
     """
     _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerDOHFrontend']
-    _verboseMode = True
 
     def testTCPConnsPerDOHFrontend(self):
         """
@@ -1260,3 +1259,44 @@ class TestDOHFrontendLimits(DNSDistDOHTest):
 
         self.assertEqual(count, self._maxTCPConnsPerDOHFrontend)
         self.assertEqual(failed, 1)
+
+class TestProtocols(DNSDistDOHTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _dohServerPort = 8443
+    _customResponseHeader1 = 'access-control-allow-origin: *'
+    _customResponseHeader2 = 'user-agent: derp'
+    _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
+    _config_template = """
+    function checkDOH(dq)
+      if dq:getProtocol() ~= "DNS over HTTPS" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    addAction("protocols.doh.tests.powerdns.com.", LuaAction(checkDOH))
+    newServer{address="127.0.0.1:%s"}
+    addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
+    """
+    _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
+
+    def testProtocolDOH(self):
+        """
+        DoH: Test DNSQuestion.Protocol
+        """
+        name = 'protocols.doh.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+
+        (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEqual(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEqual(response, receivedResponse)
index 9b69dc5dab60ab09f7701f04c9ea4e5b5f9d8809..c29bec9a74de460ca7cb7a4d0e7e4788572a0854 100644 (file)
@@ -371,7 +371,6 @@ class TestTLSFrontendLimits(DNSDistTest):
     addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d })
     """
     _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend']
-    _verboseMode = True
 
     def testTCPConnsPerTLSFrontend(self):
         """
@@ -415,3 +414,41 @@ class TestTLSFrontendLimits(DNSDistTest):
 
         self.assertEqual(count, self._maxTCPConnsPerTLSFrontend)
         self.assertEqual(failed, 1)
+
+class TestProtocols(DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _tlsServerPort = 8453
+
+    _config_template = """
+    function checkDOT(dq)
+      if dq:getProtocol() ~= "DNS over TLS" then
+        return DNSAction.Spoof, '1.2.3.4'
+      end
+      return DNSAction.None
+    end
+
+    addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT))
+    newServer{address="127.0.0.1:%s"}
+    addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
+    """
+    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+
+    def testProtocolDOT(self):
+        """
+        DoT: Test DNSQuestion.Protocol
+        """
+        name = 'protocols.tls.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+        self.sendTCPQueryOverConnection(conn, query, response=response)
+        (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(response, receivedResponse)