import string
import time
import dns
+import clientsubnetoption
from dnsdisttests import DNSDistTest
class TestAdvancedAllow(DNSDistTest):
(_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
self.assertEquals(receivedResponse, response)
+class TestAdvancedGetLocalAddressOnAnyBind(DNSDistTest):
+
+ _config_template = """
+ function answerBasedOnLocalAddress(dq)
+ local dest = dq.localaddr:toString()
+ local i, j = string.find(dest, "[0-9.]+")
+ local addr = string.sub(dest, i, j)
+ local dashAddr = string.gsub(addr, "[.]", "-")
+ return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
+ end
+ addAction("local-address-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
+ newServer{address="127.0.0.1:%s"}
+ """
+ _dnsDistListeningAddr = "0.0.0.0"
+
+ def testAdvancedGetLocalAddressOnAnyBind(self):
+ """
+ Advanced: Return CNAME containing the local address for an ANY bind
+ """
+ name = 'local-address-any.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ 'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.')
+ response.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
class TestAdvancedLuaTempFailureTTL(DNSDistTest):
_config_template = """
def testTempFailureTTLBinding(self):
"""
- Exercise dq.tempFailureTTL Lua binding
+ Advanced: Exercise dq.tempFailureTTL Lua binding
"""
name = 'tempfailurettlbinding.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)
+
+class TestAdvancedEDNSOptionRule(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(EDNSOptionRule(EDNSOptionCode.ECS), DropAction())
+ """
+
+ def testDropped(self):
+ """
+ Advanced: A question with ECS is dropped
+ """
+
+ name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+ ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None)
+ self.assertEquals(receivedResponse, None)
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None)
+ self.assertEquals(receivedResponse, None)
+
+ def testReplied(self):
+ """
+ Advanced: A question without ECS is answered
+ """
+
+ name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+ # both with EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[], payload=512)
+ response = dns.message.make_response(query)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ # and with no EDNS at all
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+class TestAdvancedAllowHeaderOnly(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ setAllowEmptyResponse(true)
+ """
+
+ def testHeaderOnlyRefused(self):
+ """
+ Advanced: Header-only refused response
+ """
+ name = 'header-only-refused-response.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.REFUSED)
+ response.question = []
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ def testHeaderOnlyNoErrorResponse(self):
+ """
+ Advanced: Header-only NoError response should be allowed
+ """
+ name = 'header-only-noerror-response.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.question = []
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ def testHeaderOnlyNXDResponse(self):
+ """
+ Advanced: Header-only NXD response should be allowed
+ """
+ name = 'header-only-nxd-response.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.NXDOMAIN)
+ response.question = []
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+class TestAdvancedEDNSVersionnRule(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(EDNSVersionRule(0), ERCodeAction(dnsdist.BADVERS))
+ """
+
+ def testDropped(self):
+ """
+ Advanced: A question with ECS version larger than 0 is dropped
+ """
+
+ name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=1)
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.BADVERS)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testNoEDNS0Pass(self):
+ """
+ Advanced: A question with ECS version 0 goes through
+ """
+
+ name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ response = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ def testReplied(self):
+ """
+ Advanced: A question without ECS goes through
+ """
+
+ name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+class TestSetRules(DNSDistTest):
+
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ newServer{address="127.0.0.1:%s"}
+ addAction(AllRule(), SpoofAction("192.0.2.1"))
+ """
+
+ def testClearThenSetRules(self):
+ """
+ Advanced: Clear rules, set rules
+
+ """
+ name = 'clearthensetrules.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ expectedResponse.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead
+ self.sendConsoleCommand("clearRules()")
+
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.SERVFAIL)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ # insert a new spoofing rule
+ self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })")
+
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.2')
+ expectedResponse.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)