class TestTrailingDataToDnsdist(DNSDistTest):
_config_template = """
newServer{address="127.0.0.1:%s"}
+ function reportTrailingData(dq)
+ local tailBytes = dq:getTrailingData()
+ local tailChars = string.char(unpack(tailBytes))
+ return DNSAction.Spoof, tailChars .. ".echoed.trailing.tests.powerdns.com."
+ end
addAction(AndRule({QNameRule("dropped.trailing.tests.powerdns.com."), TrailingDataRule()}), DropAction())
+ addLuaAction("echoed.trailing.tests.powerdns.com.", reportTrailingData)
"""
def testTrailingDropped(self):
# (_, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
(_, receivedResponse) = sender(raw, response, rawQuery=True)
self.assertEquals(receivedResponse, None)
+
+ def testTrailingRead(self):
+ """
+ Trailing data: Count
+
+ """
+ name = 'echoed.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ 'TrailingData.echoed.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'TrailingData'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ # (receivedQuery, receivedResponse) = self.sendUDPQuery(raw, response, rawQuery=True)
+ # (receivedQuery, receivedResponse) = self.sendTCPQuery(raw, response, rawQuery=True)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)