]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_Advanced.py
Merge pull request #7594 from rgacogne/dnsdist-set-rules
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Advanced.py
index 04adcc0e066e07c092cfc0d22d7455f8eafb6587..ea943bb00cef825da4181b2c639a560202807037 100644 (file)
@@ -1831,3 +1831,70 @@ class TestAdvancedEDNSVersionnRule(DNSDistTest):
             receivedQuery.id = query.id
             self.assertEquals(query, receivedQuery)
             self.assertEquals(receivedResponse, response)
+
+class TestSetRules(DNSDistTest):
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    newServer{address="127.0.0.1:%s"}
+    addAction(AllRule(), SpoofAction("192.0.2.1"))
+    """
+
+    def testClearThenSetRules(self):
+        """
+        Advanced: Clear rules, set rules
+
+        """
+        name = 'clearthensetrules.advanced.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+        # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead
+        self.sendConsoleCommand("clearRules()")
+
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.set_rcode(dns.rcode.SERVFAIL)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)
+
+        # insert a new spoofing rule
+        self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })")
+
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.2')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEquals(expectedResponse, receivedResponse)