dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
+ elif newSerial == 8:
+ # this one is a bit special too, we are answering with a full AXFR and the new zone is empty
+ records = [
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
response.answer = records
return (newSerial, response)
webserver-address=127.0.0.1
webserver-password=%s
api-key=%s
+log-rpz-changes=yes
""" % (_confdir, _wsPort, _wsPassword, _apiKey)
@classmethod
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(len(res.answer), 0)
- def checkNXD(self, qname, qtype):
+ def checkNXD(self, qname, qtype='A'):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
for method in ("sendUDPQuery", "sendTCPQuery"):
self.checkTruncated('tc.example.')
self.checkDropped('drop.example.')
+ # eighth zone, all entries should be gone
+ self.waitUntilCorrectSerialIsLoaded(8)
+ self.checkRPZStats(8, 0, 3, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ self.checkNXD('f.example.')
+ self.checkNXD('tc.example.')
+ self.checkNXD('drop.example.')
+
class RPZFileRecursorTest(RPZRecursorTest):
"""
This test makes sure that we correctly load RPZ zones from a file