]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #12769 from neilcook/patch-1
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
index 241783fba29efbed8ebf1f6e0645a026bd261765..b1131ceafc5c2062d6b12b3a34b7bd9862fa338f 100644 (file)
@@ -204,6 +204,7 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
         expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -242,6 +243,7 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
         expectedResponse = dns.message.make_response(query, our_payload=4096)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -280,6 +282,7 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
         expectedResponse = dns.message.make_response(query, our_payload=4096)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse, ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -482,10 +485,185 @@ class TestEdnsClientSubnetOverride(DNSDistTest):
             self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
             self.checkResponseEDNSWithECS(response, receivedResponse)
 
+    def testWithECSFollowedByAnother(self):
+        """
+        ECS: Existing EDNS with ECS, followed by another record
+
+        Send a query with EDNS and an existing ECS value.
+        The OPT record is not the last one in the query
+        and is followed by another one.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'withecs-followedbyanother.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
+        # it while parsing the message in the receiver :-/
+        query.additional.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.additional.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithAnswerThenECS(self):
+        """
+        ECS: Record in answer followed by an existing EDNS with ECS
+
+        Send a query with a record in the answer section, EDNS and an existing ECS value.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        query.answer.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.answer.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithAuthThenECS(self):
+        """
+        ECS: Record in authority followed by an existing EDNS with ECS
+
+        Send a query with a record in the authority section, EDNS and an existing ECS value.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        query.authority.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.authority.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithEDNSNoECSFollowedByAnother(self):
+        """
+        ECS: Existing EDNS without ECS, followed by another record
+
+        Send a query with EDNS but no ECS value.
+        The OPT record is not the last one in the query
+        and is followed by another one.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
+        # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
+        # it while parsing the message in the receiver :-/
+        query.additional.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,rewrittenEcso])
+        expectedQuery.additional.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, rewrittenEcso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
+            self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, 2)
+
 class TestECSDisabledByRuleOrLua(DNSDistTest):
     """
     dnsdist is configured to add the EDNS0 Client Subnet
-    option, but we disable it via DisableECSAction()
+    option, but we disable it via SetDisableECSAction()
     or Lua.
     """
 
@@ -494,7 +672,7 @@ class TestECSDisabledByRuleOrLua(DNSDistTest):
     setECSSourcePrefixV4(16)
     setECSSourcePrefixV6(16)
     newServer{address="127.0.0.1:%s", useClientSubnet=true}
-    addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
+    addAction(SuffixMatchNodeRule("disabled.ecsrules.tests.powerdns.com."), SetDisableECSAction())
     function disableECSViaLua(dq)
         dq.useECS = false
         return DNSAction.None, ""
@@ -579,7 +757,7 @@ class TestECSOverrideSetByRuleOrLua(DNSDistTest):
     """
     dnsdist is configured to set the EDNS0 Client Subnet
     option without overriding an existing one, but we
-    force the overriding via ECSOverrideAction() or Lua.
+    force the overriding via SetECSOverrideAction() or Lua.
     """
 
     _config_template = """
@@ -587,7 +765,7 @@ class TestECSOverrideSetByRuleOrLua(DNSDistTest):
     setECSSourcePrefixV4(24)
     setECSSourcePrefixV6(56)
     newServer{address="127.0.0.1:%s", useClientSubnet=true}
-    addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
+    addAction(SuffixMatchNodeRule("overridden.ecsrules.tests.powerdns.com."), SetECSOverrideAction(true))
     function overrideECSViaLua(dq)
         dq.ecsOverride = true
         return DNSAction.None, ""
@@ -678,7 +856,7 @@ class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
     """
     dnsdist is configured to set the EDNS0 Client Subnet
     option with a prefix length of 24 for IPv4 and 56 for IPv6,
-    but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
+    but we override that to 32 and 128 via SetECSPrefixLengthAction() or Lua.
     """
 
     _config_template = """
@@ -686,7 +864,7 @@ class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
     setECSSourcePrefixV4(24)
     setECSSourcePrefixV6(56)
     newServer{address="127.0.0.1:%s", useClientSubnet=true}
-    addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
+    addAction(SuffixMatchNodeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), SetECSPrefixLengthAction(32, 128))
     function overrideECSPrefixLengthViaLua(dq)
         dq.ecsPrefixLength = 32
         return DNSAction.None, ""
@@ -788,7 +966,7 @@ class TestECSPrefixSetByRule(DNSDistTest):
     setECSSourcePrefixV4(32)
     setECSSourcePrefixV6(128)
     newServer{address="127.0.0.1:%s", useClientSubnet=true}
-    addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
+    addAction(SuffixMatchNodeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
     """
 
     def testWithRegularECS(self):