From: Remi Gacogne Date: Tue, 29 Jun 2021 13:38:39 +0000 (+0200) Subject: dnsdist: Add regression tests for DNSQuestion.getProtocol() X-Git-Tag: dnsdist-1.7.0-alpha1~100^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7d808ff47fedc70e005a9d5945776cc57be6994e;p=thirdparty%2Fpdns.git dnsdist: Add regression tests for DNSQuestion.getProtocol() --- diff --git a/regression-tests.dnsdist/test_Advanced.py b/regression-tests.dnsdist/test_Advanced.py index c5c5012d08..f784f9b33b 100644 --- a/regression-tests.dnsdist/test_Advanced.py +++ b/regression-tests.dnsdist/test_Advanced.py @@ -2148,3 +2148,50 @@ class TestAdvancedDropEmptyQueries(DNSDistTest): sender = getattr(self, method) (_, receivedResponse) = sender(query, response=None, useQueue=False) self.assertEqual(receivedResponse, None) + +class TestProtocols(DNSDistTest): + _config_template = """ + function checkUDP(dq) + if dq:getProtocol() ~= "Do53 UDP" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + function checkTCP(dq) + if dq:getProtocol() ~= "Do53 TCP" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + addAction("udp.protocols.advanced.tests.powerdns.com.", LuaAction(checkUDP)) + addAction("tcp.protocols.advanced.tests.powerdns.com.", LuaAction(checkTCP)) + newServer{address="127.0.0.1:%s"} + """ + + def testProtocolUDP(self): + """ + Advanced: Test DNSQuestion.Protocol over UDP + """ + name = 'udp.protocols.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) + receivedQuery.id = query.id + self.assertEqual(receivedQuery, query) + self.assertEqual(receivedResponse, response) + + def testProtocolTCP(self): + """ + Advanced: Test DNSQuestion.Protocol over TCP + """ + name = 'tcp.protocols.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) + receivedQuery.id = query.id + self.assertEqual(receivedQuery, query) + self.assertEqual(receivedResponse, response) diff --git a/regression-tests.dnsdist/test_DNSCrypt.py b/regression-tests.dnsdist/test_DNSCrypt.py index 1962342e57..15cc8dc08b 100644 --- a/regression-tests.dnsdist/test_DNSCrypt.py +++ b/regression-tests.dnsdist/test_DNSCrypt.py @@ -53,6 +53,23 @@ class TestDNSCrypt(DNSCryptTest): generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d) addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key") newServer{address="127.0.0.1:%s"} + + function checkDNSCryptUDP(dq) + if dq:getProtocol() ~= "DNSCrypt UDP" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + function checkDNSCryptTCP(dq) + if dq:getProtocol() ~= "DNSCrypt TCP" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP)) + addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP)) """ _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort'] @@ -229,6 +246,28 @@ class TestDNSCrypt(DNSCryptTest): self.assertTrue(cert) self.assertEqual(cert.serial, self._resolverCertificateSerial + 3) + def testProtocolUDP(self): + """ + DNSCrypt: Test DNSQuestion.Protocol over UDP + """ + client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443) + name = 'udp.protocols.dnscrypt.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + self.doDNSCryptQuery(client, query, response, False) + + def testProtocolTCP(self): + """ + DNSCrypt: Test DNSQuestion.Protocol over TCP + """ + client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443) + name = 'tcp.protocols.dnscrypt.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + self.doDNSCryptQuery(client, query, response, True) + class TestDNSCryptWithCache(DNSCryptTest): _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort'] diff --git a/regression-tests.dnsdist/test_DOH.py b/regression-tests.dnsdist/test_DOH.py index a7a4135fef..527090ac2e 100644 --- a/regression-tests.dnsdist/test_DOH.py +++ b/regression-tests.dnsdist/test_DOH.py @@ -1216,7 +1216,6 @@ class TestDOHFrontendLimits(DNSDistDOHTest): addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { maxConcurrentTCPConnections=%d }) """ _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerDOHFrontend'] - _verboseMode = True def testTCPConnsPerDOHFrontend(self): """ @@ -1260,3 +1259,44 @@ class TestDOHFrontendLimits(DNSDistDOHTest): self.assertEqual(count, self._maxTCPConnsPerDOHFrontend) self.assertEqual(failed, 1) + +class TestProtocols(DNSDistDOHTest): + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _dohServerPort = 8443 + _customResponseHeader1 = 'access-control-allow-origin: *' + _customResponseHeader2 = 'user-agent: derp' + _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort)) + _config_template = """ + function checkDOH(dq) + if dq:getProtocol() ~= "DNS over HTTPS" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + addAction("protocols.doh.tests.powerdns.com.", LuaAction(checkDOH)) + newServer{address="127.0.0.1:%s"} + addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }) + """ + _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey'] + + def testProtocolDOH(self): + """ + DoH: Test DNSQuestion.Protocol + """ + name = 'protocols.doh.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096) + expectedQuery.id = 0 + + (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = expectedQuery.id + self.assertEqual(expectedQuery, receivedQuery) + self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery) + self.assertEqual(response, receivedResponse) diff --git a/regression-tests.dnsdist/test_TLS.py b/regression-tests.dnsdist/test_TLS.py index 9b69dc5dab..c29bec9a74 100644 --- a/regression-tests.dnsdist/test_TLS.py +++ b/regression-tests.dnsdist/test_TLS.py @@ -371,7 +371,6 @@ class TestTLSFrontendLimits(DNSDistTest): addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d }) """ _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend'] - _verboseMode = True def testTCPConnsPerTLSFrontend(self): """ @@ -415,3 +414,41 @@ class TestTLSFrontendLimits(DNSDistTest): self.assertEqual(count, self._maxTCPConnsPerTLSFrontend) self.assertEqual(failed, 1) + +class TestProtocols(DNSDistTest): + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _tlsServerPort = 8453 + + _config_template = """ + function checkDOT(dq) + if dq:getProtocol() ~= "DNS over TLS" then + return DNSAction.Spoof, '1.2.3.4' + end + return DNSAction.None + end + + addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT)) + newServer{address="127.0.0.1:%s"} + addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" }) + """ + _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] + + def testProtocolDOT(self): + """ + DoT: Test DNSQuestion.Protocol + """ + name = 'protocols.tls.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert) + self.sendTCPQueryOverConnection(conn, query, response=response) + (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.assertEqual(response, receivedResponse)