From: Richard Gibson Date: Tue, 16 Oct 2018 21:40:31 +0000 (-0400) Subject: dnsdist: Add tests for unsafe and/or non-ASCII trailing data X-Git-Tag: rec-4.2.0-alpha1~16^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6b32cb3efd34183e42ee6e6276257bfdc42cabfa;p=thirdparty%2Fpdns.git dnsdist: Add tests for unsafe and/or non-ASCII trailing data --- diff --git a/regression-tests.dnsdist/test_Trailing.py b/regression-tests.dnsdist/test_Trailing.py index 3803e3797d..adfb781b9e 100644 --- a/regression-tests.dnsdist/test_Trailing.py +++ b/regression-tests.dnsdist/test_Trailing.py @@ -200,6 +200,25 @@ class TestTrailingDataToDnsdist(DNSDistTest): end addLuaAction("replaced.trailing.tests.powerdns.com.", replaceTrailingData) addLuaAction("replaced.trailing.tests.powerdns.com.", reportTrailingData) + + function reportTrailingHex(dq) + local tail = dq:getTrailingData() + local hex = string.gsub(tail, ".", function(ch) + return string.format("\\x2502X", string.byte(ch)) + end) + return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com." + end + addLuaAction("echoed-hex.trailing.tests.powerdns.com.", reportTrailingHex) + + function replaceTrailingData_unsafe(dq) + local success = dq:setTrailingData("\\xB0\\x00\\x00\\xDE\\xAD.") + if not success then + return DNSAction.ServFail, "" + end + return DNSAction.None, "" + end + addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", replaceTrailingData_unsafe) + addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", reportTrailingHex) """ def testTrailingDropped(self): @@ -270,7 +289,7 @@ class TestTrailingDataToDnsdist(DNSDistTest): def testTrailingRead(self): """ - Trailing data: Count + Trailing data: Echo """ name = 'echoed.trailing.tests.powerdns.com.' @@ -325,3 +344,61 @@ class TestTrailingDataToDnsdist(DNSDistTest): self.assertTrue(receivedResponse) expectedResponse.flags = receivedResponse.flags self.assertEquals(receivedResponse, expectedResponse) + + def testTrailingReadUnsafe(self): + """ + Trailing data: Echo as hex + + """ + name = 'echoed-hex.trailing.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + response.set_rcode(dns.rcode.SERVFAIL) + expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.CNAME, + '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.') + expectedResponse.answer.append(rrset) + + raw = query.to_wire() + raw = raw + b'\x00\x00\xDE\xAD' + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True) + # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True) + (_, receivedResponse) = sender(raw, response, rawQuery=True) + self.assertTrue(receivedResponse) + expectedResponse.flags = receivedResponse.flags + self.assertEquals(receivedResponse, expectedResponse) + + def testTrailingReplacedUnsafe(self): + """ + Trailing data: Replace with null and/or non-ASCII bytes + + """ + name = 'replaced-unsafe.trailing.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + response.set_rcode(dns.rcode.SERVFAIL) + expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.CNAME, + '-0xB00000DEAD2E.echoed-hex.trailing.tests.powerdns.com.') + expectedResponse.answer.append(rrset) + + raw = query.to_wire() + raw = raw + b'TrailingData' + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True) + # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True) + (_, receivedResponse) = sender(raw, response, rawQuery=True) + self.assertTrue(receivedResponse) + expectedResponse.flags = receivedResponse.flags + self.assertEquals(receivedResponse, expectedResponse)