]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #8945 from rgacogne/ddist-x-forwarded-for
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
index 4cedcd32bf59294dfdc00fea6839cc146d168946..fa81ccf263a8c1edb2196914e2fa9a0dfcf7a3c1 100644 (file)
@@ -6,8 +6,15 @@ class TestSpoofingSpoof(DNSDistTest):
 
     _config_template = """
     addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
+    addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
+    addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
+    addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
+    addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
     addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
     addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
+    addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
+    addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
+    addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
     newServer{address="127.0.0.1:%s"}
     """
 
@@ -169,6 +176,164 @@ class TestSpoofingSpoof(DNSDistTest):
             self.assertTrue(receivedResponse)
             self.assertEquals(expectedResponse, receivedResponse)
 
+    def testSpoofActionSetAA(self):
+        """
+        Spoofing: Spoof via Action, setting AA=1
+        """
+        name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+    def testSpoofActionSetAD(self):
+        """
+        Spoofing: Spoof via Action, setting AD=1
+        """
+        name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.AD
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+    def testSpoofActionSetRA(self):
+        """
+        Spoofing: Spoof via Action, setting RA=1
+        """
+        name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.RA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+    def testSpoofActionSetNoRA(self):
+        """
+        Spoofing: Spoof via Action, setting RA=0
+        """
+        name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.RA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+    def testSpoofRawAction(self):
+        """
+        Spoofing: Spoof a response from raw bytes
+        """
+        name = 'raw.spoofing.tests.powerdns.com.'
+
+        # A
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+        # TXT
+        query = dns.message.make_query(name, 'TXT', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.TXT,
+                                    '"aaa" "bbbb" "ccccccccccc"')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+        # SRV
+        query = dns.message.make_query(name, 'SRV', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        # this one should have the AA flag set
+        expectedResponse.flags |= dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.SRV,
+                                    '0 0 65535 srv.powerdns.com.')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 3600)
+
 class TestSpoofingLuaSpoof(DNSDistTest):
 
     _config_template = """
@@ -183,11 +348,29 @@ class TestSpoofingLuaSpoof(DNSDistTest):
                 return DNSAction.None, ""
         end
     end
+
     function spoof2rule(dq)
         return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
     end
+
+    addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
+    addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
+
+    function spoofrawrule(dq)
+        if dq.qtype == DNSQType.A then
+             return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
+        elseif dq.qtype == DNSQType.TXT then
+             return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
+        elseif dq.qtype == DNSQType.SRV then
+            dq.dh:setAA(true)
+            return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
+        end
+        return DNSAction.None, ""
+    end
+
     addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
     addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
+    addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
     newServer{address="127.0.0.1:%s"}
     """
 
@@ -291,6 +474,71 @@ class TestSpoofingLuaSpoof(DNSDistTest):
             self.assertTrue(receivedResponse)
             self.assertEquals(expectedResponse, receivedResponse)
 
+    def testLuaSpoofRawAction(self):
+        """
+        Spoofing: Spoof a response from raw bytes via Lua
+        """
+        name = 'lua-raw.spoofing.tests.powerdns.com.'
+
+        # A
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+        # TXT
+        query = dns.message.make_query(name, 'TXT', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.TXT,
+                                    '"aaa" "bbbb" "ccccccccccc"')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            self.assertEquals(receivedResponse.answer[0].ttl, 60)
+
+        # SRV
+        query = dns.message.make_query(name, 'SRV', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        # this one should have the AA flag set
+        expectedResponse.flags |= dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.SRV,
+                                    '0 0 65535 srv.powerdns.com.')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+            # sorry, we can't set the TTL from the Lua API right now
+            #self.assertEquals(receivedResponse.answer[0].ttl, 3600)
+
 class TestSpoofingLuaWithStatistics(DNSDistTest):
 
     _config_template = """