]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_RecordsCount.py
Merge pull request #8701 from pieterlexis/remote-support-also-notify
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
index 1193fdd4180f779328e7c79a3ace9b36c23b6cc9..1abe575fd0991168e83c3ccbcc18d26311b05d9f 100644 (file)
@@ -7,13 +7,13 @@ from dnsdisttests import DNSDistTest
 class TestRecordsCountOnlyOneAR(DNSDistTest):
 
     _config_template = """
-    addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(dnsdist.REFUSED))
+    addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
     newServer{address="127.0.0.1:%s"}
     """
 
     def testRecordsCountRefuseEmptyAR(self):
         """
-        RecordsCount: Refuse arcount == 0
+        RecordsCount: Refuse arcount == 0 (No OPT)
 
         Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
         check that we are getting a REFUSED response.
@@ -23,15 +23,14 @@ class TestRecordsCountOnlyOneAR(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testRecordsCountAllowOneAR(self):
         """
-        RecordsCount: Allow arcount == 1
+        RecordsCount: Allow arcount == 1 (OPT)
 
         Send a query to "allowonear.recordscount.tests.powerdns.com.",
         check that we are getting a valid response.
@@ -45,23 +44,18 @@ class TestRecordsCountOnlyOneAR(DNSDistTest):
                                                    dns.rdatatype.A,
                                                    '127.0.0.1'))
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testRecordsCountRefuseTwoAR(self):
         """
-        RecordsCount: Refuse arcount > 1
+        RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
 
         Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
         check that we are getting a REFUSED response.
@@ -76,17 +70,16 @@ class TestRecordsCountOnlyOneAR(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
 
     _config_template = """
     addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
-    addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
+    addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
     newServer{address="127.0.0.1:%s"}
     """
 
@@ -102,11 +95,10 @@ class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testRecordsCountAllowTwoAN(self):
         """
@@ -126,19 +118,14 @@ class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
         response = dns.message.make_response(query)
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testRecordsCountRefuseFourAN(self):
         """
@@ -160,17 +147,16 @@ class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
         expectedResponse.set_rcode(dns.rcode.REFUSED)
         expectedResponse.answer.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
 class TestRecordsCountNothingInNS(DNSDistTest):
 
     _config_template = """
     addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
-    addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
+    addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
     newServer{address="127.0.0.1:%s"}
     """
 
@@ -193,11 +179,10 @@ class TestRecordsCountNothingInNS(DNSDistTest):
         expectedResponse.set_rcode(dns.rcode.REFUSED)
         expectedResponse.authority.append(rrset)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
 
     def testRecordsCountAllowEmptyNS(self):
@@ -216,24 +201,19 @@ class TestRecordsCountNothingInNS(DNSDistTest):
                                                    dns.rdatatype.A,
                                                    '127.0.0.1'))
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
 class TestRecordsCountNoOPTInAR(DNSDistTest):
 
     _config_template = """
-    addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, dnsdist.OPT, 0, 0)), RCodeAction(dnsdist.REFUSED))
+    addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
     newServer{address="127.0.0.1:%s"}
     """
 
@@ -249,11 +229,10 @@ class TestRecordsCountNoOPTInAR(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testRecordsCountAllowNoOPTInAR(self):
         """
@@ -271,16 +250,47 @@ class TestRecordsCountNoOPTInAR(DNSDistTest):
                                                    dns.rdatatype.A,
                                                    '127.0.0.1'))
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
+
+    def testRecordsCountAllowTwoARButNoOPT(self):
+        """
+        RecordsTypeCount: Allow arcount > 1 without OPT
+
+        Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
+        check that we are getting a valid response.
+        """
+        name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.additional.append(dns.rrset.from_text(name,
+                                                    3600,
+                                                    dns.rdataclass.IN,
+                                                    dns.rdatatype.A,
+                                                    '127.0.0.1'))
+        query.additional.append(dns.rrset.from_text(name,
+                                                    3600,
+                                                    dns.rdataclass.IN,
+                                                    dns.rdatatype.A,
+                                                    '127.0.0.1'))
+
+        response = dns.message.make_response(query)
+        response.answer.append(dns.rrset.from_text(name,
+                                                   3600,
+                                                   dns.rdataclass.IN,
+                                                   dns.rdatatype.A,
+                                                   '127.0.0.1'))
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)