self._set_headers()
class TestLuaRecords(AuthTest):
+ _config_template = """
+geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
+edns-subnet-processing=yes
+launch=bind geoip
+any-to-tcp=no
+"""
+
_zones = {
'example.org': """
example.org. 3600 IN SOA {soa}
all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
+all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
" }}) " )
*.magic IN LUA A "closestMagic()"
www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
+
+any IN LUA A "'192.0.2.1'"
+any IN TXT "hello there"
+
""",
}
_web_rrsets = []
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected)
+ def testIfportupWithAllDownAndAllBackupSelector(self):
+ """
+ Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
+ """
+ name = 'all.noneup.ifportup.example.org.'
+ query = dns.message.make_query(name, dns.rdatatype.A)
+ expected = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.42.21', '192.168.21.42')]
+
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ time.sleep(1)
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(res.answer, expected)
+
def testIfurlup(self):
"""
Basic ifurlup() test
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
+ def testWildcardError(self):
+ """
+ Ensure errors coming from LUA wildcards are reported
+ """
+ query = dns.message.make_query('failure.magic.example.org', 'A')
+
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
def testClosestMagic(self):
"""
Basic closestMagic() test
('1.2.3.0', 24, '1.2.3.5'),
('17.1.0.0', 16, '17.1.2.4')
]
-
+
for (subnet, mask, ip) in queries:
ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
"""
view() test where no netmask match
"""
- expected = dns.rrset.from_text('none.view.example.org.', 0,
- dns.rdataclass.IN, 'A')
query = dns.message.make_query('none.view.example.org', 'A')
res = self.sendUDPQuery(query)
first = self.sendUDPQuery(query)
self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
+
+ def testA(self):
+ """
+ Test A query against `any`
+ """
+ name = 'any.example.org.'
+
+ query = dns.message.make_query(name, 'A')
+
+ response = dns.message.make_response(query)
+
+ response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
+
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(res.answer, response.answer)
+
+ def testANY(self):
+ """
+ Test ANY query against `any`
+ """
+
+ name = 'any.example.org.'
+
+ query = dns.message.make_query(name, 'ANY')
+
+ response = dns.message.make_response(query)
+
+ response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
+ response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
+
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
+
if __name__ == '__main__':
unittest.main()
exit(0)