self.assertEquals(record.ttl, rttl)
self.assertTrue(record.HasField('rdata'))
- def checkProtobufPolicy(self, msg, policyType, reason):
+ def checkProtobufPolicy(self, msg, policyType, reason, name):
self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType)
self.assertTrue(msg.response.HasField('appliedPolicyType'))
self.assertTrue(msg.response.HasField('appliedPolicy'))
+ self.assertTrue(msg.response.HasField('appliedPolicyTrigger'))
self.assertEquals(msg.response.appliedPolicy, reason)
self.assertEquals(msg.response.appliedPolicyType, policyType)
+ self.assertEquals(msg.response.appliedPolicyTrigger, name)
def checkProtobufTags(self, msg, tags):
print(tags)
# then the response
msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
- self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.')
+ self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.', '*.test.example.')
self.assertEquals(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]
# we have max-cache-ttl set to 15
# then the response
msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
- self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.')
+ self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.', '*.test.example.')
self.checkProtobufTags(msg, self._tags + self._tags_from_gettag + self._tags_from_rpz)
self.assertEquals(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]