end
addLuaAction("replaced.trailing.tests.powerdns.com.", replaceTrailingData)
addLuaAction("replaced.trailing.tests.powerdns.com.", reportTrailingData)
+
+ function reportTrailingHex(dq)
+ local tail = dq:getTrailingData()
+ local hex = string.gsub(tail, ".", function(ch)
+ return string.format("\\x2502X", string.byte(ch))
+ end)
+ return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com."
+ end
+ addLuaAction("echoed-hex.trailing.tests.powerdns.com.", reportTrailingHex)
+
+ function replaceTrailingData_unsafe(dq)
+ local success = dq:setTrailingData("\\xB0\\x00\\x00\\xDE\\xAD.")
+ if not success then
+ return DNSAction.ServFail, ""
+ end
+ return DNSAction.None, ""
+ end
+ addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", replaceTrailingData_unsafe)
+ addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", reportTrailingHex)
"""
def testTrailingDropped(self):
def testTrailingRead(self):
"""
- Trailing data: Count
+ Trailing data: Echo
"""
name = 'echoed.trailing.tests.powerdns.com.'
self.assertTrue(receivedResponse)
expectedResponse.flags = receivedResponse.flags
self.assertEquals(receivedResponse, expectedResponse)
+
+ def testTrailingReadUnsafe(self):
+ """
+ Trailing data: Echo as hex
+
+ """
+ name = 'echoed-hex.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'\x00\x00\xDE\xAD'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True)
+ # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testTrailingReplacedUnsafe(self):
+ """
+ Trailing data: Replace with null and/or non-ASCII bytes
+
+ """
+ name = 'replaced-unsafe.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-0xB00000DEAD2E.echoed-hex.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'TrailingData'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True)
+ # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)