sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEqual(receivedResponse, None)
+
+class TestProtocols(DNSDistTest):
+ _config_template = """
+ function checkUDP(dq)
+ if dq:getProtocol() ~= "Do53 UDP" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ function checkTCP(dq)
+ if dq:getProtocol() ~= "Do53 TCP" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ addAction("udp.protocols.advanced.tests.powerdns.com.", LuaAction(checkUDP))
+ addAction("tcp.protocols.advanced.tests.powerdns.com.", LuaAction(checkTCP))
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testProtocolUDP(self):
+ """
+ Advanced: Test DNSQuestion.Protocol over UDP
+ """
+ name = 'udp.protocols.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEqual(receivedQuery, query)
+ self.assertEqual(receivedResponse, response)
+
+ def testProtocolTCP(self):
+ """
+ Advanced: Test DNSQuestion.Protocol over TCP
+ """
+ name = 'tcp.protocols.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEqual(receivedQuery, query)
+ self.assertEqual(receivedResponse, response)
generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
newServer{address="127.0.0.1:%s"}
+
+ function checkDNSCryptUDP(dq)
+ if dq:getProtocol() ~= "DNSCrypt UDP" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ function checkDNSCryptTCP(dq)
+ if dq:getProtocol() ~= "DNSCrypt TCP" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP))
+ addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP))
"""
_config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
self.assertTrue(cert)
self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
+ def testProtocolUDP(self):
+ """
+ DNSCrypt: Test DNSQuestion.Protocol over UDP
+ """
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ name = 'udp.protocols.dnscrypt.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ self.doDNSCryptQuery(client, query, response, False)
+
+ def testProtocolTCP(self):
+ """
+ DNSCrypt: Test DNSQuestion.Protocol over TCP
+ """
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ name = 'tcp.protocols.dnscrypt.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ self.doDNSCryptQuery(client, query, response, True)
+
class TestDNSCryptWithCache(DNSCryptTest):
_config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { maxConcurrentTCPConnections=%d })
"""
_config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerDOHFrontend']
- _verboseMode = True
def testTCPConnsPerDOHFrontend(self):
"""
self.assertEqual(count, self._maxTCPConnsPerDOHFrontend)
self.assertEqual(failed, 1)
+
+class TestProtocols(DNSDistDOHTest):
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _dohServerPort = 8443
+ _customResponseHeader1 = 'access-control-allow-origin: *'
+ _customResponseHeader2 = 'user-agent: derp'
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
+ _config_template = """
+ function checkDOH(dq)
+ if dq:getProtocol() ~= "DNS over HTTPS" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ addAction("protocols.doh.tests.powerdns.com.", LuaAction(checkDOH))
+ newServer{address="127.0.0.1:%s"}
+ addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
+ """
+ _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
+
+ def testProtocolDOH(self):
+ """
+ DoH: Test DNSQuestion.Protocol
+ """
+ name = 'protocols.doh.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+ expectedQuery.id = 0
+
+ (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEqual(expectedQuery, receivedQuery)
+ self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+ self.assertEqual(response, receivedResponse)
addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d })
"""
_config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend']
- _verboseMode = True
def testTCPConnsPerTLSFrontend(self):
"""
self.assertEqual(count, self._maxTCPConnsPerTLSFrontend)
self.assertEqual(failed, 1)
+
+class TestProtocols(DNSDistTest):
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _tlsServerPort = 8453
+
+ _config_template = """
+ function checkDOT(dq)
+ if dq:getProtocol() ~= "DNS over TLS" then
+ return DNSAction.Spoof, '1.2.3.4'
+ end
+ return DNSAction.None
+ end
+
+ addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT))
+ newServer{address="127.0.0.1:%s"}
+ addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
+ """
+ _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+
+ def testProtocolDOT(self):
+ """
+ DoT: Test DNSQuestion.Protocol
+ """
+ name = 'protocols.tls.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+ self.sendTCPQueryOverConnection(conn, query, response=response)
+ (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)