]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add tests for the TimedIPSet feature
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 4 Nov 2025 13:35:55 +0000 (14:35 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 13 Nov 2025 09:42:20 +0000 (10:42 +0100)
Signed-off-by: Remi Gacogne <remi.gacogne@powerdns.com>
regression-tests.dnsdist/test_TimedIPSet.py [new file with mode: 0644]

diff --git a/regression-tests.dnsdist/test_TimedIPSet.py b/regression-tests.dnsdist/test_TimedIPSet.py
new file mode 100644 (file)
index 0000000..a7d0ead
--- /dev/null
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+import base64
+import time
+import dns
+from dnsdisttests import DNSDistTest, pickAvailablePort
+
+class TestTimeIPSetYaml(DNSDistTest):
+
+    _yaml_config_template = """---
+console:
+  listen_address: "127.0.0.1:%d"
+  key: "%s"
+  acl:
+    - 127.0.0.0/8
+
+binds:
+  - listen_address: "127.0.0.1:%d"
+    protocol: Do53
+
+backends:
+  - address: "127.0.0.1:%d"
+    protocol: Do53
+
+timed_ip_sets:
+  - name: "my-set"
+
+query_rules:
+  - name: "refuse names in the Timed IP set"
+    selector:
+      type: "TimedIPSet"
+      set_name: "my-set"
+    action:
+      type: "RCode"
+      rcode: "Refused"
+"""
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _consolePort = pickAvailablePort()
+    _testServerPort = pickAvailablePort()
+    _yaml_config_params = ['_consolePort', '_consoleKeyB64', '_dnsDistPort', '_testServerPort']
+    _config_params = []
+
+    def testTimedIPSet(self):
+        """
+        TimedIPSet from YAML configuration
+        """
+        name = 'timedipset-yaml.test.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        refusedResponse = dns.message.make_response(query)
+        refusedResponse.set_rcode(dns.rcode.REFUSED)
+
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response=response)
+            receivedQuery.id = query.id
+            self.assertEqual(receivedQuery, query)
+            self.assertEqual(receivedResponse, response)
+
+        # now we block it for one second
+        self.sendConsoleCommand('getObjectFromYAMLConfiguration(\'my-set\'):add(newCA(\'127.0.0.1\'), 1)')
+
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEqual(receivedResponse, refusedResponse)
+
+        time.sleep(1)
+
+        # should be unblocked now
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response=response)
+            receivedQuery.id = query.id
+            self.assertEqual(receivedQuery, query)
+            self.assertEqual(receivedResponse, response)
+
+class TestTimeIPSetLua(DNSDistTest):
+    _config_template = """---
+    setKey("%s")
+    controlSocket("127.0.0.1:%d")
+    newServer{address="127.0.0.1:%d"}
+
+    mySet = TimedIPSetRule()
+    addAction(mySet:slice(), RCodeAction(DNSRCode.REFUSED))
+"""
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _consolePort = pickAvailablePort()
+    _testServerPort = pickAvailablePort()
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+
+    def testTimedIPSet(self):
+        """
+        TimedIPSet from Lua configuration
+        """
+        name = 'timedipset-lua.test.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.flags &= ~dns.flags.RD
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        refusedResponse = dns.message.make_response(query)
+        refusedResponse.set_rcode(dns.rcode.REFUSED)
+
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response=response)
+            receivedQuery.id = query.id
+            self.assertEqual(receivedQuery, query)
+            self.assertEqual(receivedResponse, response)
+
+        # now we block it for one second
+        self.sendConsoleCommand('mySet:add(newCA(\'127.0.0.1\'), 1)')
+
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEqual(receivedResponse, refusedResponse)
+
+        time.sleep(1)
+
+        # should be unblocked now
+        for method in ["sendUDPQuery", "sendTCPQuery"]:
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response=response)
+            receivedQuery.id = query.id
+            self.assertEqual(receivedQuery, query)
+            self.assertEqual(receivedResponse, response)