authzone.write("""$ORIGIN example.
@ 3600 IN SOA {soa}
sub.test 3600 IN A 192.0.2.42
+ip 3600 IN A 33.22.11.99
""".format(soa=cls._SOA))
rpzFilePath = os.path.join(confdir, 'zone.rpz')
rpzZone.write("""$ORIGIN zone.rpz.
@ 3600 IN SOA {soa}
*.test.example.zone.rpz. 60 IN CNAME rpz-passthru.
+24.0.11.22.33.rpz-ip 60 IN A 1.2.3.4
""".format(soa=cls._SOA))
super(ProtobufRPZTest, cls).generateRecursorConfig(confdir)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
self.checkNoRemainingMessage()
+ def testB(self):
+ name = 'ip.example.'
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '1.2.3.4')
+ query = dns.message.make_query(name, 'A', want_dnssec=True)
+ query.flags |= dns.flags.CD
+ res = self.sendUDPQuery(query)
+ self.assertRRsetInAnswer(res, expected)
+
+ # check the protobuf messages corresponding to the UDP query and answer
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+
+ # then the response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+ self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.RESPONSEIP, 'zone.rpz.', '24.0.11.22.33.rpz-ip.', '33.22.11.99', dnsmessage_pb2.PBDNSMessage.PolicyKind.Custom)
+ self.assertEqual(len(msg.response.rrs), 1)
+ self.checkNoRemainingMessage()
+
class ProtobufRPZTagsTest(TestRecursorProtobuf):
"""
This test makes sure that we correctly export the RPZ tags in our protobuf messages