]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add tests for unsafe and/or non-ASCII trailing data
authorRichard Gibson <richard.gibson@gmail.com>
Tue, 16 Oct 2018 21:40:31 +0000 (17:40 -0400)
committerRichard Gibson <richard.gibson@gmail.com>
Tue, 16 Oct 2018 21:46:27 +0000 (17:46 -0400)
regression-tests.dnsdist/test_Trailing.py

index 3803e3797d76afe5644062e55f49db2211201957..adfb781b9e3687615c5e119059b4ec0eea309393 100644 (file)
@@ -200,6 +200,25 @@ class TestTrailingDataToDnsdist(DNSDistTest):
     end
     addLuaAction("replaced.trailing.tests.powerdns.com.", replaceTrailingData)
     addLuaAction("replaced.trailing.tests.powerdns.com.", reportTrailingData)
+
+    function reportTrailingHex(dq)
+        local tail = dq:getTrailingData()
+        local hex = string.gsub(tail, ".", function(ch)
+            return string.format("\\x2502X", string.byte(ch))
+        end)
+        return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com."
+    end
+    addLuaAction("echoed-hex.trailing.tests.powerdns.com.", reportTrailingHex)
+
+    function replaceTrailingData_unsafe(dq)
+        local success = dq:setTrailingData("\\xB0\\x00\\x00\\xDE\\xAD.")
+        if not success then
+            return DNSAction.ServFail, ""
+        end
+        return DNSAction.None, ""
+    end
+    addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", replaceTrailingData_unsafe)
+    addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", reportTrailingHex)
     """
 
     def testTrailingDropped(self):
@@ -270,7 +289,7 @@ class TestTrailingDataToDnsdist(DNSDistTest):
 
     def testTrailingRead(self):
         """
-        Trailing data: Count
+        Trailing data: Echo
 
         """
         name = 'echoed.trailing.tests.powerdns.com.'
@@ -325,3 +344,61 @@ class TestTrailingDataToDnsdist(DNSDistTest):
             self.assertTrue(receivedResponse)
             expectedResponse.flags = receivedResponse.flags
             self.assertEquals(receivedResponse, expectedResponse)
+
+    def testTrailingReadUnsafe(self):
+        """
+        Trailing data: Echo as hex
+
+        """
+        name = 'echoed-hex.trailing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        response.set_rcode(dns.rcode.SERVFAIL)
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
+        expectedResponse.answer.append(rrset)
+
+        raw = query.to_wire()
+        raw = raw + b'\x00\x00\xDE\xAD'
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True)
+            # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
+            (_, receivedResponse) = sender(raw, response, rawQuery=True)
+            self.assertTrue(receivedResponse)
+            expectedResponse.flags = receivedResponse.flags
+            self.assertEquals(receivedResponse, expectedResponse)
+
+    def testTrailingReplacedUnsafe(self):
+        """
+        Trailing data: Replace with null and/or non-ASCII bytes
+
+        """
+        name = 'replaced-unsafe.trailing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        response.set_rcode(dns.rcode.SERVFAIL)
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    '-0xB00000DEAD2E.echoed-hex.trailing.tests.powerdns.com.')
+        expectedResponse.answer.append(rrset)
+
+        raw = query.to_wire()
+        raw = raw + b'TrailingData'
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True)
+            # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
+            (_, receivedResponse) = sender(raw, response, rawQuery=True)
+            self.assertTrue(receivedResponse)
+            expectedResponse.flags = receivedResponse.flags
+            self.assertEquals(receivedResponse, expectedResponse)