]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
index 87acf47da254373bff73743a83ef0bba60036d46..efa909df5c3f85c42cfe2dfbe2c52eb1459e6428 100644 (file)
@@ -166,7 +166,7 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         response = dns.message.make_response(expectedQuery)
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
-        expectedResponse = dns.message.make_response(query)
+        expectedResponse = dns.message.make_response(query, our_payload=4096)
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -204,6 +204,7 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
         expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -241,7 +242,8 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
-        expectedResponse = dns.message.make_response(query)
+        expectedResponse = dns.message.make_response(query, our_payload=4096)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -279,7 +281,8 @@ class TestEdnsClientSubnetNoOverride(DNSDistTest):
         ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
         response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
-        expectedResponse = dns.message.make_response(query)
+        expectedResponse = dns.message.make_response(query, our_payload=4096)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse, ecoResponse])
         rrset = dns.rrset.from_text(name,
                                     3600,
                                     dns.rdataclass.IN,
@@ -365,7 +368,7 @@ class TestEdnsClientSubnetOverride(DNSDistTest):
                                     dns.rdatatype.A,
                                     '127.0.0.1')
         response.answer.append(rrset)
-        expectedResponse = dns.message.make_response(query)
+        expectedResponse = dns.message.make_response(query, our_payload=4096)
         expectedResponse.answer.append(rrset)
 
         for method in ("sendUDPQuery", "sendTCPQuery"):
@@ -482,6 +485,181 @@ class TestEdnsClientSubnetOverride(DNSDistTest):
             self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
             self.checkResponseEDNSWithECS(response, receivedResponse)
 
+    def testWithECSFollowedByAnother(self):
+        """
+        ECS: Existing EDNS with ECS, followed by another record
+
+        Send a query with EDNS and an existing ECS value.
+        The OPT record is not the last one in the query
+        and is followed by another one.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'withecs-followedbyanother.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
+        # it while parsing the message in the receiver :-/
+        query.additional.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.additional.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithAnswerThenECS(self):
+        """
+        ECS: Record in answer followed by an existing EDNS with ECS
+
+        Send a query with a record in the answer section, EDNS and an existing ECS value.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        query.answer.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.answer.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithAuthThenECS(self):
+        """
+        ECS: Record in authority followed by an existing EDNS with ECS
+
+        Send a query with a record in the authority section, EDNS and an existing ECS value.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
+        query.authority.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
+        expectedQuery.authority.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
+            self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
+
+    def testWithEDNSNoECSFollowedByAnother(self):
+        """
+        ECS: Existing EDNS without ECS, followed by another record
+
+        Send a query with EDNS but no ECS value.
+        The OPT record is not the last one in the query
+        and is followed by another one.
+        Check that the query received by the responder
+        has a valid ECS value and that the response
+        received from dnsdist contains an EDNS pseudo-RR.
+        """
+        name = 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
+        # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
+        # it while parsing the message in the receiver :-/
+        query.additional.append(rrset)
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,rewrittenEcso])
+        expectedQuery.additional.append(rrset)
+
+        response = dns.message.make_response(expectedQuery)
+        response.use_edns(edns=True, payload=4096, options=[eco, rewrittenEcso, eco])
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.use_edns(edns=True, payload=4096, options=[eco, eco])
+        response.answer.append(rrset)
+        response.additional.append(rrset)
+        expectedResponse.answer.append(rrset)
+        expectedResponse.additional.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
+            self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, 2)
+
 class TestECSDisabledByRuleOrLua(DNSDistTest):
     """
     dnsdist is configured to add the EDNS0 Client Subnet