]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_Basics.py
rec: ensure correct service user on debian
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
index a4b891269edab51008c548f8d8c6fc6d7868d271..5dfda57d21467c8a49411f3564aeb5cc23c87b37 100644 (file)
@@ -9,23 +9,15 @@ class TestBasics(DNSDistTest):
     _config_template = """
     newServer{address="127.0.0.1:%s"}
     truncateTC(true)
-    addAnyTCRule()
-    addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
+    addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
+    addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
     mySMN = newSuffixMatchNode()
     mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
-    addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
+    addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
     addAction(makeRule("drop.test.powerdns.com."), DropAction())
-    addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
-    addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
-    block=newDNSName("powerdns.org.")
-    function blockFilter(dq)
-        if(dq.qname:isPartOf(block))
-        then
-            print("Blocking *.powerdns.org")
-            return true
-        end
-        return false
-    end
+    addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
+    addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
+    addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
     """
 
     def testDropped(self):
@@ -38,27 +30,10 @@ class TestBasics(DNSDistTest):
         """
         name = 'drop.test.powerdns.com.'
         query = dns.message.make_query(name, 'A', 'IN')
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, None)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, None)
-
-    def testBlockedA(self):
-        """
-        Basics: Blocked A query
-
-        Send an A query for the powerdns.org domain,
-        which is blocked by configuration. We expect
-        no response.
-        """
-        name = 'blockeda.tests.powerdns.org.'
-        query = dns.message.make_query(name, 'A', 'IN')
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, None)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, None)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, None)
 
     def testAWithECS(self):
         """
@@ -69,22 +44,19 @@ class TestBasics(DNSDistTest):
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
         response = dns.message.make_response(query)
         rrset = dns.rrset.from_text(name,
-                                    3600,
+                                    60,
                                     dns.rdataclass.IN,
                                     dns.rdatatype.A,
                                     '127.0.0.1')
 
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testSimpleA(self):
         """
@@ -100,19 +72,14 @@ class TestBasics(DNSDistTest):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testAnyIsTruncated(self):
         """
@@ -166,6 +133,48 @@ class TestBasics(DNSDistTest):
 
         response.answer.append(rrset)
         response.flags |= dns.flags.TC
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.TC
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(expectedResponse.flags, receivedResponse.flags)
+        self.assertEquals(expectedResponse.question, receivedResponse.question)
+        self.assertFalse(response.answer == receivedResponse.answer)
+        self.assertEquals(len(receivedResponse.answer), 0)
+        self.assertEquals(len(receivedResponse.authority), 0)
+        self.assertEquals(len(receivedResponse.additional), 0)
+        self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+    def testTruncateTCEDNS(self):
+        """
+        Basics: Truncate TC with EDNS
+
+        dnsdist is configured to truncate TC (default),
+        we make the backend send responses
+        with TC set and additional content,
+        and check that the received response has been fixed.
+        Note that the query and initial response had EDNS,
+        so the final response should have it too.
+        """
+        name = 'atruncatetc.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
+        response = dns.message.make_response(query)
+        # force a different responder payload than the one in the query,
+        # so we check that we don't just mirror it
+        response.payload = 4242
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+
+        response.answer.append(rrset)
+        response.flags |= dns.flags.TC
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.payload = 4242
+        expectedResponse.flags |= dns.flags.TC
 
         (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
         receivedQuery.id = query.id
@@ -176,6 +185,11 @@ class TestBasics(DNSDistTest):
         self.assertEquals(len(receivedResponse.answer), 0)
         self.assertEquals(len(receivedResponse.authority), 0)
         self.assertEquals(len(receivedResponse.additional), 0)
+        print(expectedResponse)
+        print(receivedResponse)
+        self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
+        self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
+        self.assertEquals(receivedResponse.payload, 4242)
 
     def testRegexReturnsRefused(self):
         """
@@ -191,11 +205,33 @@ class TestBasics(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+    def testQNameReturnsSpoofed(self):
+        """
+        Basics: test QNameRule and Spoof
+
+        dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
+        """
+        name = 'ds9a.nl.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.set_rcode(dns.rcode.NOERROR)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '1.2.3.4')
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testDomainAndQTypeReturnsNotImplemented(self):
         """
@@ -211,11 +247,10 @@ class TestBasics(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.NOTIMP)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
-
-        (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testDomainWithoutQTypeIsNotAffected(self):
         """
@@ -236,19 +271,14 @@ class TestBasics(DNSDistTest):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testOtherDomainANDQTypeIsNotAffected(self):
         """
@@ -269,19 +299,14 @@ class TestBasics(DNSDistTest):
                                     'nothing to see here')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
 
     def testWrongResponse(self):
         """
@@ -305,17 +330,13 @@ class TestBasics(DNSDistTest):
                                     'nothing to see here')
         unrelatedResponse.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
-        self.assertTrue(receivedQuery)
-        self.assertEquals(receivedResponse, None)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
-        self.assertTrue(receivedQuery)
-        self.assertEquals(receivedResponse, None)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
+            self.assertTrue(receivedQuery)
+            self.assertEquals(receivedResponse, None)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
 
     def testHeaderOnlyRefused(self):
         """
@@ -327,17 +348,13 @@ class TestBasics(DNSDistTest):
         response.set_rcode(dns.rcode.REFUSED)
         response.question = []
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, response)
 
     def testHeaderOnlyNoErrorResponse(self):
         """
@@ -348,17 +365,13 @@ class TestBasics(DNSDistTest):
         response = dns.message.make_response(query)
         response.question = []
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, None)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, None)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, None)
 
     def testHeaderOnlyNXDResponse(self):
         """
@@ -370,17 +383,13 @@ class TestBasics(DNSDistTest):
         response.set_rcode(dns.rcode.NXDOMAIN)
         response.question = []
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, None)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        receivedQuery.id = query.id
-        self.assertEquals(query, receivedQuery)
-        self.assertEquals(receivedResponse, None)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(receivedResponse, None)
 
     def testAddActionDNSName(self):
         """
@@ -391,8 +400,10 @@ class TestBasics(DNSDistTest):
         expectedResponse = dns.message.make_response(query)
         expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-        self.assertEquals(receivedResponse, expectedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (_, receivedResponse) = sender(query, response=None, useQueue=False)
+            self.assertEquals(receivedResponse, expectedResponse)
 
     def testAddActionDNSNames(self):
         """
@@ -403,9 +414,8 @@ class TestBasics(DNSDistTest):
             expectedResponse = dns.message.make_response(query)
             expectedResponse.set_rcode(dns.rcode.REFUSED)
 
-            (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
-            self.assertEquals(receivedResponse, expectedResponse)
+            for method in ("sendUDPQuery", "sendTCPQuery"):
+                sender = getattr(self, method)
+                (_, receivedResponse) = sender(query, response=None, useQueue=False)
+                self.assertEquals(receivedResponse, expectedResponse)
 
-if __name__ == '__main__':
-    unittest.main()
-    exit(0)