]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_Advanced.py
spelling: version
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Advanced.py
index d65cca8e4b63f1ba39b11ec64d7093ed5f93e71d..192f937e9afbedf85efa907c65f8bf2fcc95e549 100644 (file)
@@ -5,6 +5,7 @@ import os
 import string
 import time
 import dns
+import clientsubnetoption
 from dnsdisttests import DNSDistTest
 
 class TestAdvancedAllow(DNSDistTest):
@@ -1581,6 +1582,44 @@ class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest):
         (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
         self.assertEquals(receivedResponse, response)
 
+class TestAdvancedGetLocalAddressOnAnyBind(DNSDistTest):
+
+    _config_template = """
+    function answerBasedOnLocalAddress(dq)
+      local dest = dq.localaddr:toString()
+      local i, j = string.find(dest, "[0-9.]+")
+      local addr = string.sub(dest, i, j)
+      local dashAddr = string.gsub(addr, "[.]", "-")
+      return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
+    end
+    addAction("local-address-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
+    newServer{address="127.0.0.1:%s"}
+    """
+    _dnsDistListeningAddr = "0.0.0.0"
+
+    def testAdvancedGetLocalAddressOnAnyBind(self):
+        """
+        Advanced: Return CNAME containing the local address for an ANY bind
+        """
+        name = 'local-address-any.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.')
+        response.answer.append(rrset)
+
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
 class TestAdvancedLuaTempFailureTTL(DNSDistTest):
 
     _config_template = """
@@ -1604,7 +1643,7 @@ class TestAdvancedLuaTempFailureTTL(DNSDistTest):
 
     def testTempFailureTTLBinding(self):
         """
-        Exercise dq.tempFailureTTL Lua binding
+        Advanced: Exercise dq.tempFailureTTL Lua binding
         """
         name = 'tempfailurettlbinding.advanced.tests.powerdns.com.'
         query = dns.message.make_query(name, 'A', 'IN')
@@ -1622,3 +1661,240 @@ class TestAdvancedLuaTempFailureTTL(DNSDistTest):
         receivedQuery.id = query.id
         self.assertEquals(query, receivedQuery)
         self.assertEquals(receivedResponse, response)
+
+class TestAdvancedEDNSOptionRule(DNSDistTest):
+
+    _config_template = """
+    newServer{address="127.0.0.1:%s"}
+    addAction(EDNSOptionRule(EDNSOptionCode.ECS), DropAction())
+    """
+
+    def testDropped(self):
+        """
+        Advanced: A question with ECS is dropped
+        """
+
+        name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+        ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
+
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None)
+        self.assertEquals(receivedResponse, None)
+        (_, receivedResponse) = self.sendTCPQuery(query, response=None)
+        self.assertEquals(receivedResponse, None)
+
+    def testReplied(self):
+        """
+        Advanced: A question without ECS is answered
+        """
+
+        name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+        # both with EDNS
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[], payload=512)
+        response = dns.message.make_response(query)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+
+        # and with no EDNS at all
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        response = dns.message.make_response(query)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+
+class TestAdvancedAllowHeaderOnly(DNSDistTest):
+
+    _config_template = """
+    newServer{address="127.0.0.1:%s"}
+    setAllowEmptyResponse(true)
+    """
+
+    def testHeaderOnlyRefused(self):
+        """
+        Advanced: Header-only refused response
+        """
+        name = 'header-only-refused-response.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        response.set_rcode(dns.rcode.REFUSED)
+        response.question = []
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
+
+    def testHeaderOnlyNoErrorResponse(self):
+        """
+        Advanced: Header-only NoError response should be allowed
+        """
+        name = 'header-only-noerror-response.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        response.question = []
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
+
+    def testHeaderOnlyNXDResponse(self):
+        """
+        Advanced: Header-only NXD response should be allowed
+        """
+        name = 'header-only-nxd-response.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        response.set_rcode(dns.rcode.NXDOMAIN)
+        response.question = []
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
+
+class TestAdvancedEDNSVersionRule(DNSDistTest):
+
+    _config_template = """
+    newServer{address="127.0.0.1:%s"}
+    addAction(EDNSVersionRule(0), ERCodeAction(dnsdist.BADVERS))
+    """
+
+    def testDropped(self):
+        """
+        Advanced: A question with ECS version larger than 0 is dropped
+        """
+
+        name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=1)
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.set_rcode(dns.rcode.BADVERS)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None)
+            self.assertEquals(receivedResponse, expectedResponse)
+
+    def testNoEDNS0Pass(self):
+        """
+        Advanced: A question with ECS version 0 goes through
+        """
+
+        name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+        response = dns.message.make_response(query)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
+
+    def testReplied(self):
+        """
+        Advanced: A question without ECS goes through
+        """
+
+        name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        response = dns.message.make_response(query)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
+
+class TestSetRules(DNSDistTest):
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    newServer{address="127.0.0.1:%s"}
+    addAction(AllRule(), SpoofAction("192.0.2.1"))
+    """
+
+    def testClearThenSetRules(self):
+        """
+        Advanced: Clear rules, set rules
+
+        """
+        name = 'clearthensetrules.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+        # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead
+        self.sendConsoleCommand("clearRules()")
+
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.set_rcode(dns.rcode.SERVFAIL)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+        # insert a new spoofing rule
+        self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })")
+
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.2')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)