]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
index 601167e824572eb550cc030b1657bd8cf3be86a0..2ae03bf8668a178a0b846bdd8c9d5ab3b6102567 100644 (file)
@@ -6,6 +6,10 @@ class TestSpoofingSpoof(DNSDistTest):
 
     _config_template = """
     addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
+    addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
+    addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
+    addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
+    addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
     addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
     addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
     newServer{address="127.0.0.1:%s"}
@@ -30,13 +34,11 @@ class TestSpoofingSpoof(DNSDistTest):
                                     '192.0.2.1')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testSpoofActionAAAA(self):
         """
@@ -57,13 +59,11 @@ class TestSpoofingSpoof(DNSDistTest):
                                     '2001:DB8::1')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testSpoofActionCNAME(self):
         """
@@ -84,13 +84,11 @@ class TestSpoofingSpoof(DNSDistTest):
                                     'cnameaction.spoofing.tests.powerdns.com.')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testSpoofActionMultiA(self):
         """
@@ -111,13 +109,11 @@ class TestSpoofingSpoof(DNSDistTest):
                                     '192.0.2.2', '192.0.2.1')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testSpoofActionMultiAAAA(self):
         """
@@ -138,13 +134,11 @@ class TestSpoofingSpoof(DNSDistTest):
                                     '2001:DB8::1', '2001:DB8::2')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testSpoofActionMultiANY(self):
         """
@@ -173,13 +167,101 @@ class TestSpoofingSpoof(DNSDistTest):
                                     '2001:DB8::1', '2001:DB8::2')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+    def testSpoofActionSetAA(self):
+        """
+        Spoofing: Spoof via Action, setting AA=1
+        """
+        name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.AA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+    def testSpoofActionSetAD(self):
+        """
+        Spoofing: Spoof via Action, setting AD=1
+        """
+        name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.AD
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+    def testSpoofActionSetRA(self):
+        """
+        Spoofing: Spoof via Action, setting RA=1
+        """
+        name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.RA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+    def testSpoofActionSetNoRA(self):
+        """
+        Spoofing: Spoof via Action, setting RA=0
+        """
+        name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'AAAA', 'IN')
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags &= ~dns.flags.RA
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '2001:DB8::1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
 class TestSpoofingLuaSpoof(DNSDistTest):
 
@@ -222,13 +304,11 @@ class TestSpoofingLuaSpoof(DNSDistTest):
                                     '192.0.2.1', '192.0.2.2')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testLuaSpoofAAAA(self):
         """
@@ -249,13 +329,11 @@ class TestSpoofingLuaSpoof(DNSDistTest):
                                     '2001:DB8::1')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testLuaSpoofAWithCNAME(self):
         """
@@ -276,13 +354,11 @@ class TestSpoofingLuaSpoof(DNSDistTest):
                                     'spoofedcname.spoofing.tests.powerdns.com.')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
     def testLuaSpoofAAAAWithCNAME(self):
         """
@@ -303,13 +379,11 @@ class TestSpoofingLuaSpoof(DNSDistTest):
                                     'spoofedcname.spoofing.tests.powerdns.com.')
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponse, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
 
 class TestSpoofingLuaWithStatistics(DNSDistTest):
 
@@ -367,10 +441,8 @@ class TestSpoofingLuaWithStatistics(DNSDistTest):
         self.assertTrue(receivedResponse)
         self.assertEquals(expectedResponse2, receivedResponse)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponseAfterwards, receivedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertTrue(receivedResponse)
-        self.assertEquals(expectedResponseAfterwards, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponseAfterwards, receivedResponse)