self.assertEqual(res.opcode(), 4)
self.assertEqual(res.question[0].to_text(), 'zone.rpz. IN SOA')
- def assertAdditionalHasSOA(self, msg):
+ def assertAdditionalHasSOA(self, msg, name):
if not isinstance(msg, dns.message.Message):
raise TypeError("msg is not a dns.message.Message but a %s" % type(msg))
found = False
for rrset in msg.additional:
- if rrset.rdtype == dns.rdatatype.SOA:
+ if rrset.rdtype == dns.rdatatype.SOA and str(rrset.name) == name:
found = True
break
if not found:
- raise AssertionError("No SOA record found in the authority section:\n%s" % msg.to_text())
+ raise AssertionError("No %s SOA record found in the additional section:\n%s" % (name, msg.to_text()))
- def checkBlocked(self, name, shouldBeBlocked=True, adQuery=False, singleCheck=False, soa=False):
+ def checkBlocked(self, name, shouldBeBlocked=True, adQuery=False, singleCheck=False, soa=None):
query = dns.message.make_query(name, 'A', want_dnssec=True)
query.flags |= dns.flags.CD
if adQuery:
self.assertRRsetInAnswer(res, expected)
if soa:
- self.assertAdditionalHasSOA(res)
+ self.assertAdditionalHasSOA(res, soa)
if singleCheck:
break
def checkNotBlocked(self, name, adQuery=False, singleCheck=False):
self.checkBlocked(name, False, adQuery, singleCheck)
- def checkCustom(self, qname, qtype, expected, soa=False):
+ def checkCustom(self, qname, qtype, expected, soa=None):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
for method in ("sendUDPQuery", "sendTCPQuery"):
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
if soa:
- self.assertAdditionalHasSOA(res)
+ self.assertAdditionalHasSOA(res, soa)
- def checkNoData(self, qname, qtype, soa=False):
+ def checkNoData(self, qname, qtype, soa=None):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
for method in ("sendUDPQuery", "sendTCPQuery"):
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(len(res.answer), 0)
if soa:
- self.assertAdditionalHasSOA(res)
+ self.assertAdditionalHasSOA(res, soa)
def checkNXD(self, qname, qtype='A'):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
self.assertEqual(len(res.answer), 0)
self.assertEqual(len(res.authority), 1)
- def checkTruncated(self, qname, qtype='A', soa=False):
+ def checkTruncated(self, qname, qtype='A', soa=None):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
res = self.sendUDPQuery(query)
self.assertEqual(len(res.answer), 0)
self.assertEqual(len(res.authority), 0)
if soa:
- self.assertAdditionalHasSOA(res)
+ self.assertAdditionalHasSOA(res, soa)
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
# first zone, only a should be blocked
self.waitUntilCorrectSerialIsLoaded(1)
self.checkRPZStats(1, 1, 1, self._xfrDone)
- self.checkBlocked('a.example.', soa=True)
+ self.checkBlocked('a.example.', soa='zone.rpz.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(2)
self.checkRPZStats(2, 2, 1, self._xfrDone)
- self.checkBlocked('a.example.', soa=True)
- self.checkBlocked('b.example.', soa=True)
+ self.checkBlocked('a.example.', soa='zone.rpz.')
+ self.checkBlocked('b.example.', soa='zone.rpz.')
self.checkNotBlocked('c.example.')
# third zone, only b should be blocked
self.waitUntilCorrectSerialIsLoaded(3)
self.checkRPZStats(3, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
- self.checkBlocked('b.example.', soa=True)
+ self.checkBlocked('b.example.', soa='zone.rpz.')
self.checkNotBlocked('c.example.')
# fourth zone, only c should be blocked
self.checkRPZStats(4, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
- self.checkBlocked('c.example.', soa=True)
+ self.checkBlocked('c.example.', soa='zone.rpz.')
# fifth zone, we should get a full AXFR this time, and only d should be blocked
self.sendNotify()
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
- self.checkBlocked('d.example.', soa=True)
+ self.checkBlocked('d.example.', soa='zone.rpz.')
# sixth zone, only e should be blocked, f is a local data record
self.sendNotify()
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkNotBlocked('d.example.')
- self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1', '192.0.2.2'), soa=True)
+ self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1', '192.0.2.2'), soa='zone.rpz.')
self.checkCustom('e.example.', 'MX', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'MX', '10 mx.example.'))
- self.checkNoData('e.example.', 'AAAA', soa=True)
- self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'), soa=True)
+ self.checkNoData('e.example.', 'AAAA', soa='zone.rpz.')
+ self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'), soa='zone.rpz.')
# seventh zone, e should only have one A
self.sendNotify()
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkNotBlocked('d.example.')
- self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.2'), soa=True)
- self.checkCustom('e.example.', 'MX', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'MX', '10 mx.example.'), soa=True)
- self.checkNoData('e.example.', 'AAAA', soa=True)
- self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'), soa=True)
+ self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.2'), soa='zone.rpz.')
+ self.checkCustom('e.example.', 'MX', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'MX', '10 mx.example.'), soa='zone.rpz.')
+ self.checkNoData('e.example.', 'AAAA', soa='zone.rpz.')
+ self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'), soa='zone.rpz.')
# check that the policy is disabled for AD=1 queries
self.checkNotBlocked('e.example.', True)
# check non-custom policies
- self.checkTruncated('tc.example.', soa=True)
+ self.checkTruncated('tc.example.', soa='zone.rpz.')
self.checkDropped('drop.example.')
# eighth zone, all entries should be gone
self.checkNotBlocked('c.example.')
self.checkNotBlocked('d.example.')
self.checkNotBlocked('e.example.')
- self.checkBlocked('f.example.', soa=True)
+ self.checkBlocked('f.example.', soa='zone.rpz.')
self.checkNXD('tc.example.')
self.checkNXD('drop.example.')
self.checkNotBlocked('d.example.')
self.checkNotBlocked('e.example.')
self.checkNXD('f.example.')
- self.checkBlocked('g.example.', soa=True)
+ self.checkBlocked('g.example.', soa='zone.rpz.')
self.checkNXD('tc.example.')
self.checkNXD('drop.example.')
def testRPZ(self):
self.checkCustom('a.example.', 'A', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42', '192.0.2.43'))
self.checkCustom('a.example.', 'TXT', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'TXT', '"some text"'))
- self.checkBlocked('z.example.', soa=True)
+ self.checkBlocked('z.example.', soa='zone.rpz.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkNotBlocked('d.example.')
# check that the policy is disabled for AD=1 queries
self.checkNotBlocked('z.example.', True)
# check non-custom policies
- self.checkTruncated('tc.example.', soa=True)
+ self.checkTruncated('tc.example.', soa='zone.rpz.')
self.checkDropped('drop.example.')
class RPZFileDefaultPolRecursorTest(RPZRecursorTest):