self.assertRRsetInAnswer(res, dns.rrset.from_text('cname-custom-a.example.', 0, dns.rdataclass.IN, 'CNAME', 'cname-custom-a-target.example.'))
self.assertRRsetInAnswer(res, dns.rrset.from_text('cname-custom-a-target.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.103'))
+class RPZCustomDefpolTest(RPZRecursorTest):
+ """
+ This test makes sure that the recursor applies defpol to hits and follows the CNAME
+ """
+
+ _PREFIX = os.environ['PREFIX']
+ _confdir = 'RPZCustomDefpol'
+ _lua_config_file = """
+ rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", defpol=Policy.Custom, defcontent="a.secure.example"})
+ """ % (_confdir)
+ _config_template = ""
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ rpzFilePath = os.path.join(confdir, 'zone.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+hit.example IN CNAME .
+""".format(soa=cls._SOA))
+
+ super(RPZCustomDefpolTest, cls).generateRecursorConfig(confdir)
+
+ def testRPDefpol(self):
+ # two times to check the cache
+ for _ in range(2):
+ query = dns.message.make_query('hit.example.', 'A', want_dnssec=True)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ # the RPZ CNAME record is not signed
+ self.assertEqual(len(res.answer), 3)
+ self.assertRRsetInAnswer(res, dns.rrset.from_text('hit.example.', 0, dns.rdataclass.IN, 'CNAME', 'a.secure.example.'))
+ self.assertRRsetInAnswer(res, dns.rrset.from_text('a.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.20', '192.0.2.22'))
+
class RPZFileModByLuaRecursorTest(RPZRecursorTest):
"""
This test makes sure that we correctly load RPZ zones from a file while being modified by Lua callbacks