]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add a test for SetEDNSOptionResponseAction
authorSamir Aguiar <sjorgedeaguiar@netskope.com>
Wed, 28 May 2025 01:21:42 +0000 (01:21 +0000)
committerSamir Aguiar <sjorgedeaguiar@netskope.com>
Wed, 28 May 2025 14:49:45 +0000 (14:49 +0000)
regression-tests.dnsdist/test_Responses.py

index 26c1042c0a03111d20f85b9de570576178f6fd67..e6d209b900b5a691150cde77fdfdcfc4d554dd86 100644 (file)
@@ -2,6 +2,7 @@
 from datetime import datetime, timedelta
 import time
 import dns
+import cookiesoption
 from dnsdisttests import DNSDistTest
 
 class TestResponseRuleNXDelayed(DNSDistTest):
@@ -566,3 +567,91 @@ class TestResponseRewriteServFail(DNSDistTest):
             receivedQuery.id = query.id
             self.assertEqual(query, receivedQuery)
             self.assertEqual(response, receivedResponse)
+
+class TestAdvancedSetEDNSOptionResponseAction(DNSDistTest):
+    _config_template = """
+    addResponseAction(AllRule(), SetEDNSOptionResponseAction(10, "deadbeefdeadc0de"))
+    newServer{address="127.0.0.1:%s"}
+    """
+
+    def testAdvancedSetEDNSOptionResponse(self):
+        """
+        Responses: Set EDNS Option in response
+        """
+        name = 'setednsoptionresponse.responses.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de')
+        expectedResponse = dns.message.make_response(query, use_edns=True, payload=512, options=[eco])
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEqual(query, receivedQuery)
+            self.checkResponseEDNS(expectedResponse, receivedResponse)
+
+    def testAdvancedSetEDNSOptionResponseOverwrite(self):
+        """
+        Responses: Set EDNS Option in response overwrites existing option
+        """
+        name = 'setednsoptionresponse-overwrite.responses.tests.powerdns.com.'
+        initialECO = cookiesoption.CookiesOption(b'aaaaaaaa', b'bbbbbbbb')
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query, use_edns=True, payload=512, options=[initialECO])
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        overWrittenECO = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de')
+        expectedResponse = dns.message.make_response(query, use_edns=True, payload=512, options=[overWrittenECO])
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEqual(query, receivedQuery)
+            self.checkResponseEDNS(expectedResponse, receivedResponse)
+
+    def testAdvancedSetEDNSOptionResponseWithDOSet(self):
+        """
+        Responses: Set EDNS Option in response (DO bit set)
+        """
+        name = 'setednsoptionresponse-do.responses.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, want_dnssec=True, payload=4096)
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de')
+        expectedResponse = dns.message.make_response(query, use_edns=True, payload=4096, options=[eco], want_dnssec=True)
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEqual(query, receivedQuery)
+            self.checkResponseEDNS(expectedResponse, receivedResponse)