records = [
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
dns.rrset.from_text('d.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
+ dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'),
+ dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
elif newSerial == 6:
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % oldSerial),
dns.rrset.from_text('d.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
+ dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'),
+ dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1', '192.0.2.2'),
+ dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.MX, '10 mx.example.'),
+ dns.rrset.from_text('f.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'e.example.'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
+ elif newSerial == 7:
+ records = [
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % oldSerial),
+ dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1', '192.0.2.2'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'),
+ dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'),
+ dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
+ elif newSerial == 8:
+ # this one is a bit special too, we are answering with a full AXFR and the new zone is empty
+ records = [
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
- dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
print('Error in RPZ socket: %s' % str(e))
sock.close()
-rpzServerPort = 4250
-rpzServer = RPZServer(rpzServerPort)
-
class RPZRecursorTest(RecursorTest):
- """
- This test makes sure that we correctly update RPZ zones via AXFR then IXFR
- """
-
- global rpzServerPort
- _lua_config_file = """
- -- The first server is a bogus one, to test that we correctly fail over to the second one
- rpzMaster({'127.0.0.1:9999', '127.0.0.1:%d'}, 'zone.rpz.', { refresh=1 })
- """ % (rpzServerPort)
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
webserver-address=127.0.0.1
webserver-password=%s
api-key=%s
+log-rpz-changes=yes
""" % (_confdir, _wsPort, _wsPassword, _apiKey)
- _xfrDone = 0
-
- @classmethod
- def generateRecursorConfig(cls, confdir):
- authzonepath = os.path.join(confdir, 'example.zone')
- with open(authzonepath, 'w') as authzone:
- authzone.write("""$ORIGIN example.
-@ 3600 IN SOA {soa}
-a 3600 IN A 192.0.2.42
-b 3600 IN A 192.0.2.42
-c 3600 IN A 192.0.2.42
-d 3600 IN A 192.0.2.42
-e 3600 IN A 192.0.2.42
-""".format(soa=cls._SOA))
- super(RPZRecursorTest, cls).generateRecursorConfig(confdir)
@classmethod
def setUpClass(cls):
query.flags |= dns.flags.CD
if adQuery:
query.flags |= dns.flags.AD
- res = self.sendUDPQuery(query)
- if shouldBeBlocked:
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
- else:
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- self.assertRRsetInAnswer(res, expected)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ if shouldBeBlocked:
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ else:
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+
+ self.assertRRsetInAnswer(res, expected)
def checkNotBlocked(self, name, adQuery=False):
self.checkBlocked(name, False, adQuery)
- def waitUntilCorrectSerialIsLoaded(self, serial, timeout=5):
- global rpzServer
-
- rpzServer.moveToSerial(serial)
-
- attempts = 0
- while attempts < timeout:
- currentSerial = rpzServer.getCurrentSerial()
- if currentSerial > serial:
- raise AssertionError("Expected serial %d, got %d" % (serial, currentSerial))
- if currentSerial == serial:
- self._xfrDone = self._xfrDone + 1
- return
-
- attempts = attempts + 1
- time.sleep(1)
-
- raise AssertionError("Waited %d seconds for the serial to be updated to %d but the serial is still %d" % (timeout, serial, currentSerial))
+ def checkCustom(self, qname, qtype, expected):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
+
+ def checkNoData(self, qname, qtype):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(len(res.answer), 0)
+
+ def checkNXD(self, qname, qtype='A'):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+ self.assertEqual(len(res.answer), 0)
+ self.assertEqual(len(res.authority), 1)
+
+ def checkTruncated(self, qname, qtype='A'):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD', 'TC'])
+ self.assertEqual(len(res.answer), 0)
+ self.assertEqual(len(res.authority), 0)
+ self.assertEqual(len(res.additional), 0)
+
+ res = self.sendTCPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertEqual(len(res.answer), 0)
+ self.assertEqual(len(res.authority), 1)
+ self.assertEqual(len(res.additional), 0)
+
+ def checkDropped(self, qname, qtype='A'):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertEqual(res, None)
def checkRPZStats(self, serial, recordsCount, fullXFRCount, totalXFRCount):
headers = {'x-api-key': self._apiKey}
self.assertEquals(zone['transfers_full'], fullXFRCount)
self.assertEquals(zone['transfers_success'], totalXFRCount)
+rpzServerPort = 4250
+rpzServer = RPZServer(rpzServerPort)
+
+class RPZXFRRecursorTest(RPZRecursorTest):
+ """
+ This test makes sure that we correctly update RPZ zones via AXFR then IXFR
+ """
+
+ global rpzServerPort
+ _lua_config_file = """
+ -- The first server is a bogus one, to test that we correctly fail over to the second one
+ rpzMaster({'127.0.0.1:9999', '127.0.0.1:%d'}, 'zone.rpz.', { refresh=1 })
+ """ % (rpzServerPort)
+ _confdir = 'RPZXFR'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+auth-zones=example=configs/%s/example.zone
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+api-key=%s
+""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+ _xfrDone = 0
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+b 3600 IN A 192.0.2.42
+c 3600 IN A 192.0.2.42
+d 3600 IN A 192.0.2.42
+e 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+ super(RPZRecursorTest, cls).generateRecursorConfig(confdir)
+
+ def waitUntilCorrectSerialIsLoaded(self, serial, timeout=5):
+ global rpzServer
+
+ rpzServer.moveToSerial(serial)
+
+ attempts = 0
+ while attempts < timeout:
+ currentSerial = rpzServer.getCurrentSerial()
+ if currentSerial > serial:
+ raise AssertionError("Expected serial %d, got %d" % (serial, currentSerial))
+ if currentSerial == serial:
+ self._xfrDone = self._xfrDone + 1
+ return
+
+ attempts = attempts + 1
+ time.sleep(1)
+
+ raise AssertionError("Waited %d seconds for the serial to be updated to %d but the serial is still %d" % (timeout, serial, currentSerial))
+
def testRPZ(self):
# first zone, only a should be blocked
self.waitUntilCorrectSerialIsLoaded(1)
# fifth zone, we should get a full AXFR this time, and only d should be blocked
self.waitUntilCorrectSerialIsLoaded(5)
- self.checkRPZStats(5, 1, 2, self._xfrDone)
+ self.checkRPZStats(5, 3, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkBlocked('d.example.')
- # sixth zone, only e should be blocked
+ # sixth zone, only e should be blocked, f is a local data record
self.waitUntilCorrectSerialIsLoaded(6)
- self.checkRPZStats(6, 1, 2, self._xfrDone)
+ self.checkRPZStats(6, 2, 2, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1', '192.0.2.2'))
+ self.checkCustom('e.example.', 'MX', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'MX', '10 mx.example.'))
+ self.checkNoData('e.example.', 'AAAA')
+ self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'))
+
+ # seventh zone, e should only have one A
+ self.waitUntilCorrectSerialIsLoaded(7)
+ self.checkRPZStats(7, 4, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkNotBlocked('d.example.')
- self.checkBlocked('e.example.')
+ self.checkCustom('e.example.', 'A', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.2'))
+ self.checkCustom('e.example.', 'MX', dns.rrset.from_text('e.example.', 0, dns.rdataclass.IN, 'MX', '10 mx.example.'))
+ self.checkNoData('e.example.', 'AAAA')
+ self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'))
# check that the policy is disabled for AD=1 queries
self.checkNotBlocked('e.example.', True)
+ # check non-custom policies
+ self.checkTruncated('tc.example.')
+ self.checkDropped('drop.example.')
+
+ # eighth zone, all entries should be gone
+ self.waitUntilCorrectSerialIsLoaded(8)
+ self.checkRPZStats(8, 0, 3, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ self.checkNXD('f.example.')
+ self.checkNXD('tc.example.')
+ self.checkNXD('drop.example.')
+
+class RPZFileRecursorTest(RPZRecursorTest):
+ """
+ This test makes sure that we correctly load RPZ zones from a file
+ """
+
+ _confdir = 'RPZFile'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _lua_config_file = """
+ rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz." })
+ """ % (_confdir)
+ _config_template = """
+auth-zones=example=configs/%s/example.zone
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+api-key=%s
+""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+b 3600 IN A 192.0.2.42
+c 3600 IN A 192.0.2.42
+d 3600 IN A 192.0.2.42
+e 3600 IN A 192.0.2.42
+z 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+
+ rpzFilePath = os.path.join(confdir, 'zone.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+a.example.zone.rpz. 60 IN A 192.0.2.42
+a.example.zone.rpz. 60 IN A 192.0.2.43
+a.example.zone.rpz. 60 IN TXT "some text"
+drop.example.zone.rpz. 60 IN CNAME rpz-drop.
+z.example.zone.rpz. 60 IN A 192.0.2.1
+tc.example.zone.rpz. 60 IN CNAME rpz-tcp-only.
+""".format(soa=cls._SOA))
+ super(RPZFileRecursorTest, cls).generateRecursorConfig(confdir)
+
+ def testRPZ(self):
+ self.checkCustom('a.example.', 'A', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42', '192.0.2.43'))
+ self.checkCustom('a.example.', 'TXT', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'TXT', '"some text"'))
+ self.checkBlocked('z.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ # check that the policy is disabled for AD=1 queries
+ self.checkNotBlocked('z.example.', True)
+ # check non-custom policies
+ self.checkTruncated('tc.example.')
+ self.checkDropped('drop.example.')
+
+class RPZFileDefaultPolRecursorTest(RPZRecursorTest):
+ """
+ This test makes sure that we correctly load RPZ zones from a file with a default policy
+ """
+
+ _confdir = 'RPZFileDefaultPolicy'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _lua_config_file = """
+ rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", defpol=Policy.NoAction })
+ """ % (_confdir)
+ _config_template = """
+auth-zones=example=configs/%s/example.zone
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+api-key=%s
+""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+b 3600 IN A 192.0.2.42
+c 3600 IN A 192.0.2.42
+d 3600 IN A 192.0.2.42
+drop 3600 IN A 192.0.2.42
+e 3600 IN A 192.0.2.42
+z 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+
+ rpzFilePath = os.path.join(confdir, 'zone.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+a.example.zone.rpz. 60 IN A 192.0.2.42
+drop.example.zone.rpz. 60 IN CNAME rpz-drop.
+z.example.zone.rpz. 60 IN A 192.0.2.1
+tc.example.zone.rpz. 60 IN CNAME rpz-tcp-only.
+""".format(soa=cls._SOA))
+ super(RPZFileDefaultPolRecursorTest, cls).generateRecursorConfig(confdir)
+
+ def testRPZ(self):
+ # local data entries are overridden by default
+ self.checkCustom('a.example.', 'A', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42'))
+ self.checkNoData('a.example.', 'TXT')
+ # will not be blocked because the default policy overrides local data entries by default
+ self.checkNotBlocked('z.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ # check non-local policies, they should be overridden by the default policy
+ self.checkNXD('tc.example.', 'A')
+ self.checkNotBlocked('drop.example.')
+
+class RPZFileDefaultPolNotOverrideLocalRecursorTest(RPZRecursorTest):
+ """
+ This test makes sure that we correctly load RPZ zones from a file with a default policy, not overriding local data entries
+ """
+
+ _confdir = 'RPZFileDefaultPolicyNotOverrideLocal'
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _lua_config_file = """
+ rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", defpol=Policy.NoAction, defpolOverrideLocalData=false })
+ """ % (_confdir)
+ _config_template = """
+auth-zones=example=configs/%s/example.zone
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+api-key=%s
+""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+b 3600 IN A 192.0.2.42
+c 3600 IN A 192.0.2.42
+d 3600 IN A 192.0.2.42
+drop 3600 IN A 192.0.2.42
+e 3600 IN A 192.0.2.42
+z 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+
+ rpzFilePath = os.path.join(confdir, 'zone.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+a.example.zone.rpz. 60 IN A 192.0.2.42
+a.example.zone.rpz. 60 IN A 192.0.2.43
+a.example.zone.rpz. 60 IN TXT "some text"
+drop.example.zone.rpz. 60 IN CNAME rpz-drop.
+z.example.zone.rpz. 60 IN A 192.0.2.1
+tc.example.zone.rpz. 60 IN CNAME rpz-tcp-only.
+""".format(soa=cls._SOA))
+ super(RPZFileDefaultPolNotOverrideLocalRecursorTest, cls).generateRecursorConfig(confdir)
+
+ def testRPZ(self):
+ # local data entries will not be overridden by the default polic
+ self.checkCustom('a.example.', 'A', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42', '192.0.2.43'))
+ self.checkCustom('a.example.', 'TXT', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'TXT', '"some text"'))
+ # will be blocked because the default policy does not override local data entries
+ self.checkBlocked('z.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ # check non-local policies, they should be overridden by the default policy
+ self.checkNXD('tc.example.', 'A')
+ self.checkNotBlocked('drop.example.')