]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.auth-py/test_LuaRecords.py
Merge pull request #14200 from rgacogne/auth-enable-leak-detection-unit-tests
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
CommitLineData
1bc56192
CHB
1#!/usr/bin/env python
2import unittest
3import requests
4import threading
5import dns
6import time
847e724e 7import clientsubnetoption
1bc56192
CHB
8
9from authtests import AuthTest
10
12c2f4d7 11from http.server import BaseHTTPRequestHandler, HTTPServer
1bc56192 12
876a35c3
PD
13webserver = None
14
1bc56192 15class FakeHTTPServer(BaseHTTPRequestHandler):
95d0df69
PD
16 def _set_headers(self, response_code=200):
17 self.send_response(response_code)
1bc56192
CHB
18 self.send_header('Content-type', 'text/html')
19 self.end_headers()
20
21 def do_GET(self):
95d0df69
PD
22 if self.path == '/404':
23 self._set_headers(404)
24 self.wfile.write(bytes('this page does not exist', 'utf-8'))
25 return
26
1bc56192 27 self._set_headers()
95d0df69 28 if self.path == '/ping.json':
12c2f4d7 29 self.wfile.write(bytes('{"ping":"pong"}', 'utf-8'))
1bc56192 30 else:
12c2f4d7 31 self.wfile.write(bytes("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>", "utf-8"))
1bc56192
CHB
32
33 def log_message(self, format, *args):
34 return
35
36 def do_HEAD(self):
37 self._set_headers()
38
39class TestLuaRecords(AuthTest):
06c14069
PL
40 _config_template = """
41geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
42edns-subnet-processing=yes
43launch=bind geoip
44any-to-tcp=no
58a4b8a1 45enable-lua-records
26dbeed8 46lua-records-insert-whitespace=yes
58a4b8a1 47lua-health-checks-interval=1
06c14069
PL
48"""
49
1bc56192
CHB
50 _zones = {
51 'example.org': """
52example.org. 3600 IN SOA {soa}
53example.org. 3600 IN NS ns1.example.org.
54example.org. 3600 IN NS ns2.example.org.
55ns1.example.org. 3600 IN A {prefix}.10
56ns2.example.org. 3600 IN A {prefix}.11
57
58web1.example.org. 3600 IN A {prefix}.101
59web2.example.org. 3600 IN A {prefix}.102
60web3.example.org. 3600 IN A {prefix}.103
61
62all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
63some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
05d6f9ed 64multi.ifportup 3600 IN LUA A "ifportup(8080, {{ {{'192.168.42.23'}}, {{'192.168.42.21', '{prefix}.102'}}, {{'{prefix}.101'}} }})"
1bc56192 65none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
707cedf3 66all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
1bc56192 67
60f9d9a2 68hashed.example.org. 3600 IN LUA A "pickhashed({{ '1.2.3.4', '4.3.2.1' }})"
69hashed-v6.example.org. 3600 IN LUA AAAA "pickhashed({{ '2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268' }})"
70hashed-txt.example.org. 3600 IN LUA TXT "pickhashed({{ 'bob', 'alice' }})"
1bc56192 71whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
d33b8bf4 72*.namehashed.example.org. 3600 IN LUA A "picknamehashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
60f9d9a2 73whashed-txt.example.org. 3600 IN LUA TXT "pickwhashed({{ {{15, 'bob'}}, {{42, 'alice'}} }})"
4a0d414b 74chashed.example.org. 3600 IN LUA A "pickchashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
82ed6bee 75chashed-txt.example.org. 3600 IN LUA TXT "pickchashed({{ {{15, 'bob'}}, {{42, 'alice'}} }})"
1bc56192 76rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60f9d9a2 77rand-txt.example.org. 3600 IN LUA TXT "pickrandom({{ 'bob', 'alice' }})"
78randn-txt.example.org. 3600 IN LUA TXT "pickrandomsample( 2, {{ 'bob', 'alice', 'john' }} )"
1bc56192 79v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60f9d9a2 80v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{ '2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268' }})"
81closest.geo 3600 IN LUA A "pickclosest({{ '1.1.1.2', '1.2.3.4' }})"
1bc56192 82empty.rand.example.org. 3600 IN LUA A "pickrandom()"
5b3590c8 83timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
1bc56192 84wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
60f9d9a2 85wrand-txt.example.org. 3600 IN LUA TXT "pickwrandom({{ {{30, 'bob'}}, {{15, 'alice'}} }})"
86all.example.org. 3600 IN LUA A "all({{'1.2.3.4','4.3.2.1'}})"
1bc56192
CHB
87
88config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
89 "EUWips={{'{prefix}.101','{prefix}.102'}} "
90 "EUEips={{'192.168.42.101','192.168.42.102'}} "
05d6f9ed
CHB
91 "NLips={{'{prefix}.111', '{prefix}.112'}} "
92 "USAips={{'{prefix}.103', '192.168.42.105'}} ")
1bc56192
CHB
93
94usa IN LUA A ( ";include('config') "
95 "return ifurlup('http://www.lua.org:8080/', "
05d6f9ed
CHB
96 "USAips, settings) ")
97
98usa-ext IN LUA A ( ";include('config') "
99 "return ifurlup('http://www.lua.org:8080/', "
100 "{{EUEips, USAips}}, settings) ")
1bc56192
CHB
101
102mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
103 "{{ '192.168.42.101', '{prefix}.101' }}, "
104 "{{ stringmatch='pong' }}) ")
105
95d0df69
PD
106ifurlextup IN LUA A "ifurlextup({{{{['192.168.0.1']='http://{prefix}.101:8080/404',['192.168.0.2']='http://{prefix}.102:8080/404'}}, {{['192.168.0.3']='http://{prefix}.101:8080/'}}}})"
107
1bc56192
CHB
108nl IN LUA A ( ";include('config') "
109 "return ifportup(8081, NLips) ")
110latlon.geo IN LUA TXT "latlon()"
847e724e 111continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
60f9d9a2 112continent-code.geo IN LUA TXT ";return continentCode()"
847e724e
CHB
113asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
114country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
60f9d9a2 115country-code.geo IN LUA TXT ";return countryCode()"
116region.geo IN LUA TXT ";if(region('CA')) then return 'true' else return 'false' end"
117region-code.geo IN LUA TXT ";return regionCode()"
1bc56192
CHB
118latlonloc.geo IN LUA TXT "latlonloc()"
119
120true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
121 "then return 'true' "
122 "else return 'false' end " )
123false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
124 "then return 'true' "
125 "else return 'false' end " )
126
127view IN LUA A ("view({{ "
128 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
129 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
130 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
131 " }}) " )
132txt.view IN LUA TXT ("view({{ "
133 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
134 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
135 " }}) " )
136none.view IN LUA A ("view({{ "
137 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
138 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
139 " }}) " )
847e724e
CHB
140*.magic IN LUA A "closestMagic()"
141www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
658417e4
PD
142
143any IN LUA A "'192.0.2.1'"
144any IN TXT "hello there"
145
09f4b0d9
PD
146resolve IN LUA A ";local r=resolve('localhost', 1) local t={{}} for _,v in ipairs(r) do table.insert(t, v:toString()) end return t"
147
98301eb0
PD
148filterforwardempty IN LUA A "filterForward('192.0.2.1', newNMG{{'192.1.2.0/24'}}, '')"
149
223bfcad 150*.createforward IN LUA A "filterForward(createForward(), newNMG{{'1.0.0.0/8', '64.0.0.0/8'}})"
b652505d
PD
151*.createreverse IN LUA PTR "createReverse('%5%.example.com', {{['10.10.10.10'] = 'quad10.example.com.'}})"
152*.createreverse6 IN LUA PTR "createReverse6('%33%.example.com', {{['2001:db8::1'] = 'example.example.com.'}})"
448e7a2d 153
aec9c907
PD
154newcafromraw IN LUA A "newCAFromRaw('ABCD'):toString()"
155newcafromraw IN LUA AAAA "newCAFromRaw('ABCD020340506070'):toString()"
21f08f59
PD
156
157counter IN LUA TXT ";counter = counter or 0 counter=counter+1 return tostring(counter)"
3bf2f94d
PD
158
159lookmeup IN A 192.0.2.5
2ef08930 160dblookup IN LUA A "dblookup('lookmeup.example.org', pdns.A)[1]"
26dbeed8
PD
161
162whitespace IN LUA TXT "'foo" "bar'"
1bc56192 163 """,
6c88aa06
PD
164 'createforward6.example.org': """
165createforward6.example.org. 3600 IN SOA {soa}
166createforward6.example.org. 3600 IN NS ns1.example.org.
167createforward6.example.org. 3600 IN NS ns2.example.org.
223bfcad 168* IN LUA AAAA "filterForward(createForward6(), newNMG{{'2000::/3'}}, 'fe80::1')"
6c88aa06
PD
169 """
170# the separate createforward6 zone is because some of the code in lua-record.cc insists on working relatively to the zone apex
1bc56192
CHB
171 }
172 _web_rrsets = []
173
174 @classmethod
175 def startResponders(cls):
876a35c3
PD
176 global webserver
177 if webserver: return # it is already running
178
1bc56192
CHB
179 webserver = threading.Thread(name='HTTP Listener',
180 target=cls.HTTPResponder,
181 args=[8080]
182 )
183 webserver.setDaemon(True)
184 webserver.start()
185
186 @classmethod
187 def HTTPResponder(cls, port):
188 server_address = ('', port)
189 httpd = HTTPServer(server_address, FakeHTTPServer)
190 httpd.serve_forever()
191
192 @classmethod
193 def setUpClass(cls):
194
195 super(TestLuaRecords, cls).setUpClass()
196
197 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
198 '{prefix}.101'.format(prefix=cls._PREFIX)),
199 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
200 '{prefix}.102'.format(prefix=cls._PREFIX)),
201 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
202 '{prefix}.103'.format(prefix=cls._PREFIX))
203 ]
204
205 def testPickRandom(self):
206 """
207 Basic pickrandom() test with a set of A records
208 """
209 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
210 '{prefix}.101'.format(prefix=self._PREFIX)),
211 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
212 '{prefix}.102'.format(prefix=self._PREFIX))]
213 query = dns.message.make_query('rand.example.org', 'A')
214
215 res = self.sendUDPQuery(query)
216 self.assertRcodeEqual(res, dns.rcode.NOERROR)
217 self.assertAnyRRsetInAnswer(res, expected)
218
60f9d9a2 219 def testPickRandomTxt(self):
220 """
221 Basic pickrandom() test with a set of TXT records
222 """
223 expected = [dns.rrset.from_text('rand-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'bob'),
224 dns.rrset.from_text('rand-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'alice')]
225 query = dns.message.make_query('rand-txt.example.org', 'TXT')
226
227 res = self.sendUDPQuery(query)
228 self.assertRcodeEqual(res, dns.rcode.NOERROR)
229 self.assertAnyRRsetInAnswer(res, expected)
230
1bc56192
CHB
231 def testBogusV6PickRandom(self):
232 """
233 Test a bogus AAAA pickrandom() record with a set of v4 addr
234 """
235 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
236
237 res = self.sendUDPQuery(query)
238 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
239
240 def testV6PickRandom(self):
241 """
242 Test pickrandom() AAAA record
243 """
244 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
245 '2001:db8:a0b:12f0::1'),
246 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
247 'fe80::2a1:9bff:fe9b:f268')]
248 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
249
250 res = self.sendUDPQuery(query)
251 self.assertRcodeEqual(res, dns.rcode.NOERROR)
252 self.assertAnyRRsetInAnswer(res, expected)
253
254 def testEmptyRandom(self):
255 """
256 Basic pickrandom() test with an empty set
257 """
258 query = dns.message.make_query('empty.rand.example.org', 'A')
259
260 res = self.sendUDPQuery(query)
261 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
262
60f9d9a2 263 def testPickRandomSampleTxt(self):
264 """
265 Basic pickrandomsample() test with a set of TXT records
266 """
267 expected = [dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'bob', 'alice'),
268 dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'bob', 'john'),
269 dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'alice', 'bob'),
270 dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'alice', 'john'),
271 dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'john', 'bob'),
272 dns.rrset.from_text('randn-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'john', 'alice')]
273 query = dns.message.make_query('randn-txt.example.org', 'TXT')
274
275 res = self.sendUDPQuery(query)
276 self.assertRcodeEqual(res, dns.rcode.NOERROR)
277 self.assertIn(res.answer[0], expected)
278
1bc56192
CHB
279 def testWRandom(self):
280 """
281 Basic pickwrandom() test with a set of A records
282 """
283 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
284 '{prefix}.103'.format(prefix=self._PREFIX)),
285 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
286 '{prefix}.102'.format(prefix=self._PREFIX))]
287 query = dns.message.make_query('wrand.example.org', 'A')
288
289 res = self.sendUDPQuery(query)
290 self.assertRcodeEqual(res, dns.rcode.NOERROR)
291 self.assertAnyRRsetInAnswer(res, expected)
292
60f9d9a2 293 def testWRandomTxt(self):
294 """
295 Basic pickwrandom() test with a set of TXT records
296 """
297 expected = [dns.rrset.from_text('wrand-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'bob'),
298 dns.rrset.from_text('wrand-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'alice')]
299 query = dns.message.make_query('wrand-txt.example.org', 'TXT')
300
301 res = self.sendUDPQuery(query)
302 self.assertRcodeEqual(res, dns.rcode.NOERROR)
303 self.assertAnyRRsetInAnswer(res, expected)
304
1bc56192
CHB
305 def testIfportup(self):
306 """
307 Basic ifportup() test
308 """
309 query = dns.message.make_query('all.ifportup.example.org', 'A')
310 expected = [
311 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
312 '{prefix}.101'.format(prefix=self._PREFIX)),
313 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
314 '{prefix}.102'.format(prefix=self._PREFIX))]
315
316 res = self.sendUDPQuery(query)
317 self.assertRcodeEqual(res, dns.rcode.NOERROR)
318 self.assertAnyRRsetInAnswer(res, expected)
319
320 def testIfportupWithSomeDown(self):
321 """
322 Basic ifportup() test with some ports DOWN
323 """
324 query = dns.message.make_query('some.ifportup.example.org', 'A')
325 expected = [
326 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
327 '192.168.42.21'),
328 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
329 '{prefix}.102'.format(prefix=self._PREFIX))]
330
331 # we first expect any of the IPs as no check has been performed yet
332 res = self.sendUDPQuery(query)
333 self.assertRcodeEqual(res, dns.rcode.NOERROR)
334 self.assertAnyRRsetInAnswer(res, expected)
335
ef2ea4bf 336 # the first IP should not be up so only second should be returned
1bc56192
CHB
337 expected = [expected[1]]
338 res = self.sendUDPQuery(query)
339 self.assertRcodeEqual(res, dns.rcode.NOERROR)
340 self.assertAnyRRsetInAnswer(res, expected)
341
05d6f9ed
CHB
342 def testIfportupWithSomeDownMultiset(self):
343 """
344 Basic ifportup() test with some ports DOWN from multiple sets
345 """
346 query = dns.message.make_query('multi.ifportup.example.org', 'A')
347 expected = [
348 dns.rrset.from_text('multi.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
349 '192.168.42.21'),
350 dns.rrset.from_text('multi.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
351 '192.168.42.23'),
352 dns.rrset.from_text('multi.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
353 '{prefix}.102'.format(prefix=self._PREFIX)),
354 dns.rrset.from_text('multi.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
355 '{prefix}.101'.format(prefix=self._PREFIX))
356 ]
357
358 # we first expect any of the IPs as no check has been performed yet
359 res = self.sendUDPQuery(query)
360 self.assertRcodeEqual(res, dns.rcode.NOERROR)
361 self.assertAnyRRsetInAnswer(res, expected)
362
363 # An ip is up in 2 sets, but we expect only the one from the middle set
364 expected = [expected[2]]
365 res = self.sendUDPQuery(query)
366 self.assertRcodeEqual(res, dns.rcode.NOERROR)
367 self.assertAnyRRsetInAnswer(res, expected)
368
1bc56192
CHB
369 def testIfportupWithAllDown(self):
370 """
371 Basic ifportup() test with all ports DOWN
372 """
373 query = dns.message.make_query('none.ifportup.example.org', 'A')
374 expected = [
375 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
376 '192.168.42.21'),
377 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
378 '192.168.21.42'.format(prefix=self._PREFIX))]
379
380 # we first expect any of the IPs as no check has been performed yet
381 res = self.sendUDPQuery(query)
382 self.assertRcodeEqual(res, dns.rcode.NOERROR)
383 self.assertAnyRRsetInAnswer(res, expected)
384
385 # no port should be up so we expect any
386 res = self.sendUDPQuery(query)
387 self.assertRcodeEqual(res, dns.rcode.NOERROR)
388 self.assertAnyRRsetInAnswer(res, expected)
389
707cedf3
CHB
390 def testIfportupWithAllDownAndAllBackupSelector(self):
391 """
392 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
393 """
394 name = 'all.noneup.ifportup.example.org.'
395 query = dns.message.make_query(name, dns.rdatatype.A)
396 expected = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.42.21', '192.168.21.42')]
397
398 res = self.sendUDPQuery(query)
399 self.assertRcodeEqual(res, dns.rcode.NOERROR)
400
2abc1539 401 time.sleep(3)
707cedf3
CHB
402 res = self.sendUDPQuery(query)
403 self.assertRcodeEqual(res, dns.rcode.NOERROR)
404 self.assertEqual(res.answer, expected)
405
1bc56192
CHB
406 def testIfurlup(self):
407 """
408 Basic ifurlup() test
409 """
410 reachable = [
411 '{prefix}.103'.format(prefix=self._PREFIX)
412 ]
05d6f9ed 413 unreachable = ['192.168.42.105']
1bc56192
CHB
414 ips = reachable + unreachable
415 all_rrs = []
416 reachable_rrs = []
417 for ip in ips:
418 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
419 all_rrs.append(rr)
420 if ip in reachable:
421 reachable_rrs.append(rr)
422
423 query = dns.message.make_query('usa.example.org', 'A')
424 res = self.sendUDPQuery(query)
425 self.assertRcodeEqual(res, dns.rcode.NOERROR)
426 self.assertAnyRRsetInAnswer(res, all_rrs)
427
2abc1539
OM
428 # the timeout in the LUA health checker is 2 second, so we make sure to wait slightly longer here
429 time.sleep(3)
1bc56192
CHB
430 res = self.sendUDPQuery(query)
431 self.assertRcodeEqual(res, dns.rcode.NOERROR)
432 self.assertAnyRRsetInAnswer(res, reachable_rrs)
433
05d6f9ed
CHB
434 def testIfurlupMultiSet(self):
435 """
436 Basic ifurlup() test with mutiple sets
437 """
438 reachable = [
439 '{prefix}.103'.format(prefix=self._PREFIX)
440 ]
441 unreachable = ['192.168.42.101', '192.168.42.102', '192.168.42.105']
442 ips = reachable + unreachable
443 all_rrs = []
444 reachable_rrs = []
445 for ip in ips:
446 rr = dns.rrset.from_text('usa-ext.example.org.', 0, dns.rdataclass.IN, 'A', ip)
447 all_rrs.append(rr)
448 if ip in reachable:
449 reachable_rrs.append(rr)
450
451 query = dns.message.make_query('usa-ext.example.org', 'A')
452 res = self.sendUDPQuery(query)
453 self.assertRcodeEqual(res, dns.rcode.NOERROR)
454 self.assertAnyRRsetInAnswer(res, all_rrs)
455
456 # the timeout in the LUA health checker is 2 second, so we make sure to wait slightly longer here
457 time.sleep(3)
458 res = self.sendUDPQuery(query)
459 self.assertRcodeEqual(res, dns.rcode.NOERROR)
460 self.assertAnyRRsetInAnswer(res, reachable_rrs)
461
95d0df69
PD
462 def testIfurlextup(self):
463 expected = [dns.rrset.from_text('ifurlextup.example.org.', 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.0.3')]
464
465 query = dns.message.make_query('ifurlextup.example.org', 'A')
466 res = self.sendUDPQuery(query)
467
468 # wait for health checks to happen
469 time.sleep(5)
470
471 res = self.sendUDPQuery(query)
472
473 self.assertRcodeEqual(res, dns.rcode.NOERROR)
474 self.assertEqual(res.answer, expected)
475
1bc56192
CHB
476 def testIfurlupSimplified(self):
477 """
478 Basic ifurlup() test with the simplified list of ips
479 Also ensures the correct path is queried
480 """
481 reachable = [
482 '{prefix}.101'.format(prefix=self._PREFIX)
483 ]
484 unreachable = ['192.168.42.101']
485 ips = reachable + unreachable
486 all_rrs = []
487 reachable_rrs = []
488 for ip in ips:
489 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
490 all_rrs.append(rr)
491 if ip in reachable:
492 reachable_rrs.append(rr)
493
494 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
495 res = self.sendUDPQuery(query)
496 self.assertRcodeEqual(res, dns.rcode.NOERROR)
497 self.assertAnyRRsetInAnswer(res, all_rrs)
498
2abc1539 499 time.sleep(3)
1bc56192
CHB
500 res = self.sendUDPQuery(query)
501 self.assertRcodeEqual(res, dns.rcode.NOERROR)
502 self.assertAnyRRsetInAnswer(res, reachable_rrs)
503
504 def testLatlon(self):
505 """
506 Basic latlon() test
507 """
847e724e
CHB
508 name = 'latlon.geo.example.org.'
509 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
510 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
511 expected = dns.rrset.from_text(name, 0,
1bc56192 512 dns.rdataclass.IN, 'TXT',
847e724e 513 '"47.913000 -122.304200"')
1bc56192
CHB
514
515 res = self.sendUDPQuery(query)
516 self.assertRcodeEqual(res, dns.rcode.NOERROR)
517 self.assertRRsetInAnswer(res, expected)
518
519 def testLatlonloc(self):
520 """
521 Basic latlonloc() test
522 """
847e724e
CHB
523 name = 'latlonloc.geo.example.org.'
524 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
525 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
526 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
527 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
1bc56192
CHB
528
529 res = self.sendUDPQuery(query)
530 self.assertRcodeEqual(res, dns.rcode.NOERROR)
531 self.assertRRsetInAnswer(res, expected)
532
cc5c4f6b
CHB
533 def testWildcardError(self):
534 """
535 Ensure errors coming from LUA wildcards are reported
536 """
537 query = dns.message.make_query('failure.magic.example.org', 'A')
538
539 res = self.sendUDPQuery(query)
540 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
541 self.assertAnswerEmpty(res)
542
847e724e
CHB
543 def testClosestMagic(self):
544 """
545 Basic closestMagic() test
546 """
547 name = 'www-balanced.example.org.'
548 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
549 queries = [
550 ('1.1.1.0', 24, '1.1.1.3'),
551 ('1.2.3.0', 24, '1.2.3.5'),
552 ('17.1.0.0', 16, '17.1.2.4')
553 ]
707cedf3 554
847e724e
CHB
555 for (subnet, mask, ip) in queries:
556 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
557 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
558
559 response = dns.message.make_response(query)
560
561 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
562 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
563
564 res = self.sendUDPQuery(query)
565 self.assertRcodeEqual(res, dns.rcode.NOERROR)
566 self.assertEqual(res.answer, response.answer)
567
568 def testAsnum(self):
569 """
570 Basic asnum() test
571 """
572 queries = [
573 ('1.1.1.0', 24, '"true"'),
574 ('1.2.3.0', 24, '"false"'),
575 ('17.1.0.0', 16, '"false"')
576 ]
577 name = 'asnum.geo.example.org.'
578 for (subnet, mask, txt) in queries:
579 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
580 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
581 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
582
583 res = self.sendUDPQuery(query)
584 self.assertRcodeEqual(res, dns.rcode.NOERROR)
585 self.assertRRsetInAnswer(res, expected)
586
587 def testCountry(self):
588 """
589 Basic country() test
590 """
591 queries = [
592 ('1.1.1.0', 24, '"false"'),
593 ('1.2.3.0', 24, '"true"'),
594 ('17.1.0.0', 16, '"false"')
595 ]
596 name = 'country.geo.example.org.'
597 for (subnet, mask, txt) in queries:
598 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
599 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
600 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
601
602 res = self.sendUDPQuery(query)
603 self.assertRcodeEqual(res, dns.rcode.NOERROR)
604 self.assertRRsetInAnswer(res, expected)
605
60f9d9a2 606 def testCountryCode(self):
607 """
608 Basic countryCode() test
609 """
610 queries = [
611 ('1.1.1.0', 24, '"au"'),
612 ('1.2.3.0', 24, '"us"'),
613 ('17.1.0.0', 16, '"--"')
614 ]
615 name = 'country-code.geo.example.org.'
616 for (subnet, mask, txt) in queries:
617 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
618 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
619 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
620
621 res = self.sendUDPQuery(query)
622 self.assertRcodeEqual(res, dns.rcode.NOERROR)
623 self.assertRRsetInAnswer(res, expected)
624
625 def testRegion(self):
626 """
627 Basic region() test
628 """
629 queries = [
630 ('1.1.1.0', 24, '"false"'),
631 ('1.2.3.0', 24, '"true"'),
632 ('17.1.0.0', 16, '"false"')
633 ]
634 name = 'region.geo.example.org.'
635 for (subnet, mask, txt) in queries:
636 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
637 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
638 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
639
640 res = self.sendUDPQuery(query)
641 self.assertRcodeEqual(res, dns.rcode.NOERROR)
642 self.assertRRsetInAnswer(res, expected)
643
644 def testRegionCode(self):
645 """
646 Basic regionCode() test
647 """
648 queries = [
649 ('1.1.1.0', 24, '"--"'),
650 ('1.2.3.0', 24, '"ca"'),
651 ('17.1.0.0', 16, '"--"')
652 ]
653 name = 'region-code.geo.example.org.'
654 for (subnet, mask, txt) in queries:
655 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
656 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
657 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
658
659 res = self.sendUDPQuery(query)
660 self.assertRcodeEqual(res, dns.rcode.NOERROR)
661 self.assertRRsetInAnswer(res, expected)
662
847e724e
CHB
663 def testContinent(self):
664 """
665 Basic continent() test
666 """
667 queries = [
668 ('1.1.1.0', 24, '"false"'),
669 ('1.2.3.0', 24, '"true"'),
670 ('17.1.0.0', 16, '"false"')
671 ]
672 name = 'continent.geo.example.org.'
673 for (subnet, mask, txt) in queries:
674 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
675 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
676 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
677
678 res = self.sendUDPQuery(query)
679 self.assertRcodeEqual(res, dns.rcode.NOERROR)
680 self.assertRRsetInAnswer(res, expected)
681
60f9d9a2 682 def testContinentCode(self):
683 """
684 Basic continentCode() test
685 """
686 queries = [
687 ('1.1.1.0', 24, '"oc"'),
688 ('1.2.3.0', 24, '"na"'),
689 ('17.1.0.0', 16, '"--"')
690 ]
691 name = 'continent-code.geo.example.org.'
692 for (subnet, mask, txt) in queries:
693 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
694 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
695 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
696
697 res = self.sendUDPQuery(query)
698 self.assertRcodeEqual(res, dns.rcode.NOERROR)
699 self.assertRRsetInAnswer(res, expected)
700
847e724e
CHB
701 def testClosest(self):
702 """
703 Basic pickclosest() test
704 """
705 queries = [
706 ('1.1.1.0', 24, '1.1.1.2'),
707 ('1.2.3.0', 24, '1.2.3.4'),
708 ('17.1.0.0', 16, '1.1.1.2')
709 ]
710 name = 'closest.geo.example.org.'
711 for (subnet, mask, ip) in queries:
712 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
713 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
714 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
715
716 res = self.sendUDPQuery(query)
717 self.assertRcodeEqual(res, dns.rcode.NOERROR)
718 self.assertRRsetInAnswer(res, expected)
719
60f9d9a2 720 def testAll(self):
721 """
722 Basic all() test
723 """
724 expected = [dns.rrset.from_text('all.example.org.', 0, dns.rdataclass.IN, dns.rdatatype.A, '1.2.3.4', '4.3.2.1')]
725 query = dns.message.make_query('all.example.org.', 'A')
726
727 res = self.sendUDPQuery(query)
728 self.assertRcodeEqual(res, dns.rcode.NOERROR)
729 self.assertEqual(res.answer, expected)
730
1bc56192
CHB
731 def testNetmask(self):
732 """
733 Basic netmask() test
734 """
735 queries = [
736 {
737 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
738 dns.rdataclass.IN, 'TXT',
739 '"true"'),
740 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
741 },
742 {
743 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
744 dns.rdataclass.IN, 'TXT',
745 '"false"'),
746 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
747 }
748 ]
749 for query in queries :
750 res = self.sendUDPQuery(query['query'])
751 self.assertRcodeEqual(res, dns.rcode.NOERROR)
752 self.assertRRsetInAnswer(res, query['expected'])
753
754 def testView(self):
755 """
756 Basic view() test
757 """
758 queries = [
759 {
760 'expected': dns.rrset.from_text('view.example.org.', 0,
761 dns.rdataclass.IN, 'A',
762 '{prefix}.54'.format(prefix=self._PREFIX)),
763 'query': dns.message.make_query('view.example.org', 'A')
764 },
765 {
766 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
767 dns.rdataclass.IN, 'TXT',
768 '"else"'),
769 'query': dns.message.make_query('txt.view.example.org', 'TXT')
770 }
771 ]
772 for query in queries :
773 res = self.sendUDPQuery(query['query'])
774 self.assertRcodeEqual(res, dns.rcode.NOERROR)
775 self.assertRRsetInAnswer(res, query['expected'])
776
777 def testViewNoMatch(self):
778 """
779 view() test where no netmask match
780 """
1bc56192
CHB
781 query = dns.message.make_query('none.view.example.org', 'A')
782
783 res = self.sendUDPQuery(query)
784 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
785 self.assertAnswerEmpty(res)
786
4a0d414b 787 def testCWHashed(self):
1bc56192 788 """
4a0d414b 789 Basic pickwhashed() and pickchashed() test with a set of A records
1bc56192
CHB
790 As the `bestwho` is hashed, we should always get the same answer
791 """
4a0d414b
CHB
792 for qname in ['whashed.example.org.', 'chashed.example.org.']:
793 expected = [dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
794 dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
795 query = dns.message.make_query(qname, 'A')
60f9d9a2 796
4a0d414b
CHB
797 first = self.sendUDPQuery(query)
798 self.assertRcodeEqual(first, dns.rcode.NOERROR)
799 self.assertAnyRRsetInAnswer(first, expected)
800 for _ in range(5):
801 res = self.sendUDPQuery(query)
802 self.assertRcodeEqual(res, dns.rcode.NOERROR)
803 self.assertRRsetInAnswer(res, first.answer[0])
d33b8bf4
BR
804
805 def testNamehashed(self):
806 """
807 Basic picknamehashed() test with a set of A records
808 As the name is hashed, we should always get the same IP back for the same record name.
809 """
810
811 queries = [
812 {
33e306dc 813 'query': dns.message.make_query('test.namehashed.example.org', 'A'),
d33b8bf4
BR
814 'expected': dns.rrset.from_text('test.namehashed.example.org.', 0,
815 dns.rdataclass.IN, 'A',
816 '1.2.3.4'),
d33b8bf4
BR
817 },
818 {
33e306dc 819 'query': dns.message.make_query('test2.namehashed.example.org', 'A'),
d33b8bf4
BR
820 'expected': dns.rrset.from_text('test2.namehashed.example.org.', 0,
821 dns.rdataclass.IN, 'A',
822 '4.3.2.1'),
d33b8bf4
BR
823 }
824 ]
825 for query in queries :
826 res = self.sendUDPQuery(query['query'])
827 self.assertRcodeEqual(res, dns.rcode.NOERROR)
828 self.assertRRsetInAnswer(res, query['expected'])
829
4a0d414b 830 def testCWHashedTxt(self):
60f9d9a2 831 """
832 Basic pickwhashed() test with a set of TXT records
833 As the `bestwho` is hashed, we should always get the same answer
834 """
4a0d414b
CHB
835 for qname in ['whashed-txt.example.org.', 'chashed-txt.example.org.']:
836 expected = [dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', 'bob'),
837 dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', 'alice')]
838 query = dns.message.make_query(qname,'TXT')
839
840 first = self.sendUDPQuery(query)
841 self.assertRcodeEqual(first, dns.rcode.NOERROR)
842 self.assertAnyRRsetInAnswer(first, expected)
843 for _ in range(5):
844 res = self.sendUDPQuery(query)
845 self.assertRcodeEqual(res, dns.rcode.NOERROR)
846 self.assertRRsetInAnswer(res, first.answer[0])
60f9d9a2 847
848 def testHashed(self):
849 """
850 Basic pickhashed() test with a set of A records
851 As the `bestwho` is hashed, we should always get the same answer
852 """
853 expected = [dns.rrset.from_text('hashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
854 dns.rrset.from_text('hashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
855 query = dns.message.make_query('hashed.example.org', 'A')
856
857 first = self.sendUDPQuery(query)
858 self.assertRcodeEqual(first, dns.rcode.NOERROR)
859 self.assertAnyRRsetInAnswer(first, expected)
860 for _ in range(5):
861 res = self.sendUDPQuery(query)
862 self.assertRcodeEqual(res, dns.rcode.NOERROR)
863 self.assertRRsetInAnswer(res, first.answer[0])
864
865 def testHashedV6(self):
866 """
867 Basic pickhashed() test with a set of AAAA records
868 As the `bestwho` is hashed, we should always get the same answer
869 """
870 expected = [dns.rrset.from_text('hashed-v6.example.org.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8:a0b:12f0::1'),
871 dns.rrset.from_text('hashed-v6.example.org.', 0, dns.rdataclass.IN, 'AAAA', 'fe80::2a1:9bff:fe9b:f268')]
872 query = dns.message.make_query('hashed-v6.example.org', 'AAAA')
873
874 first = self.sendUDPQuery(query)
875 self.assertRcodeEqual(first, dns.rcode.NOERROR)
876 self.assertAnyRRsetInAnswer(first, expected)
877 for _ in range(5):
878 res = self.sendUDPQuery(query)
879 self.assertRcodeEqual(res, dns.rcode.NOERROR)
880 self.assertRRsetInAnswer(res, first.answer[0])
881
882 def testHashedTXT(self):
883 """
884 Basic pickhashed() test with a set of TXT records
885 As the `bestwho` is hashed, we should always get the same answer
886 """
887 expected = [dns.rrset.from_text('hashed-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'bob'),
888 dns.rrset.from_text('hashed-txt.example.org.', 0, dns.rdataclass.IN, 'TXT', 'alice')]
889 query = dns.message.make_query('hashed-txt.example.org', 'TXT')
890
891 first = self.sendUDPQuery(query)
892 self.assertRcodeEqual(first, dns.rcode.NOERROR)
893 self.assertAnyRRsetInAnswer(first, expected)
894 for _ in range(5):
895 res = self.sendUDPQuery(query)
896 self.assertRcodeEqual(res, dns.rcode.NOERROR)
1bc56192
CHB
897 self.assertRRsetInAnswer(res, first.answer[0])
898
af68014f
CHB
899 def testTimeout(self):
900 """
901 Test if LUA scripts are aborted if script execution takes too long
902 """
903 query = dns.message.make_query('timeout.example.org', 'A')
904
905 first = self.sendUDPQuery(query)
906 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
907
658417e4
PD
908
909 def testA(self):
910 """
911 Test A query against `any`
912 """
913 name = 'any.example.org.'
914
915 query = dns.message.make_query(name, 'A')
916
917 response = dns.message.make_response(query)
918
919 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
920
921 res = self.sendUDPQuery(query)
922 self.assertRcodeEqual(res, dns.rcode.NOERROR)
923 self.assertEqual(res.answer, response.answer)
924
925 def testANY(self):
926 """
927 Test ANY query against `any`
928 """
929
930 name = 'any.example.org.'
931
932 query = dns.message.make_query(name, 'ANY')
933
934 response = dns.message.make_response(query)
935
936 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
937 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
938
939 res = self.sendUDPQuery(query)
940 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4d4c592e 941 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
658417e4 942
aec9c907
PD
943 def testCAFromRaw(self):
944 """
945 Test newCAFromRaw() function
946 """
947 name = 'newcafromraw.example.org.'
948
949 query = dns.message.make_query(name, 'A')
950
951 response = dns.message.make_response(query)
952
953 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '65.66.67.68'))
954
955 res = self.sendUDPQuery(query)
956 self.assertRcodeEqual(res, dns.rcode.NOERROR)
957 self.assertEqual(res.answer, response.answer)
958
959 query = dns.message.make_query(name, 'AAAA')
960
961 response = dns.message.make_response(query)
962
963 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.AAAA, '4142:4344:3032:3033:3430:3530:3630:3730'))
964
965 res = self.sendUDPQuery(query)
966 self.assertRcodeEqual(res, dns.rcode.NOERROR)
967 self.assertEqual(res.answer, response.answer)
968
09f4b0d9
PD
969 def testResolve(self):
970 """
971 Test resolve() function
972 """
973 name = 'resolve.example.org.'
974
975 query = dns.message.make_query(name, 'A')
976
977 response = dns.message.make_response(query)
978
979 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '127.0.0.1'))
980
981 res = self.sendUDPQuery(query)
982 self.assertRcodeEqual(res, dns.rcode.NOERROR)
983 self.assertEqual(res.answer, response.answer)
984
98301eb0
PD
985 def testFilterForwardEmpty(self):
986 """
987 Test filterForward() function with empty fallback
988 """
989 name = 'filterforwardempty.example.org.'
990
991 query = dns.message.make_query(name, 'A')
992
993 res = self.sendUDPQuery(query)
994 self.assertRcodeEqual(res, dns.rcode.NOERROR)
995 self.assertEqual(res.answer, [])
996
6c88aa06
PD
997 def testCreateForwardAndReverse(self):
998 expected = {
999 ".createforward.example.org." : (dns.rdatatype.A, {
1000 "1.2.3.4": "1.2.3.4",
1001 "1.2.3.4.static": "1.2.3.4",
1002 "1.2.3.4.5.6": "1.2.3.4",
1003 "invalid.1.2.3.4": "0.0.0.0",
1004 "invalid": "0.0.0.0",
1005 "1-2-3-4": "1.2.3.4",
1006 "1-2-3-4.foo": "1.2.3.4",
1007 "1-2-3-4.foo.bar": "0.0.0.0",
1008 "1-2-3-4.foo.bar.baz": "0.0.0.0",
1009 "1-2-3-4.foo.bar.baz.quux": "0.0.0.0",
1010 "ip-1-2-3-4": "1.2.3.4",
1011 "ip-is-here-for-you-1-2-3-4": "1.2.3.4",
1010ac95
PD
1012 "40414243": "64.65.66.67",
1013 "p40414243": "64.65.66.67",
6c88aa06 1014 "ip40414243": "64.65.66.67",
1effebed 1015 "ipp40414243": "64.65.66.67",
6c88aa06 1016 "ip4041424": "0.0.0.0",
f7deb7d0 1017 "host64-22-33-44": "64.22.33.44",
223bfcad 1018 "2.2.2.2": "0.0.0.0" # filtered
6c88aa06
PD
1019 }),
1020 ".createreverse.example.org." : (dns.rdatatype.PTR, {
b652505d 1021 "4.3.2.1": "1-2-3-4.example.com.",
223bfcad 1022 "10.10.10.10": "quad10.example.com." # exception
6c88aa06
PD
1023 }),
1024 ".createforward6.example.org." : (dns.rdatatype.AAAA, {
223bfcad 1025 "2001--db8" : "2001::db8",
a5d82a72
PD
1026 "20010002000300040005000600070db8" : "2001:2:3:4:5:6:7:db8",
1027 "blabla20010002000300040005000600070db8" : "2001:2:3:4:5:6:7:db8",
223bfcad 1028 "4000-db8--1" : "fe80::1" # filtered, with fallback address override
6c88aa06
PD
1029 }),
1030 ".createreverse6.example.org." : (dns.rdatatype.PTR, {
b652505d 1031 "8.b.d.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.0.0.2" : "2001--db8.example.com.",
223bfcad 1032 "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2" : "example.example.com." # exception
6c88aa06 1033 })
448e7a2d
PL
1034 }
1035
6c88aa06
PD
1036 for suffix, v in expected.items():
1037 qtype, pairs = v
1038 for prefix, target in pairs.items():
1039 name = prefix + suffix
448e7a2d 1040
6c88aa06
PD
1041 query = dns.message.make_query(name, qtype)
1042 response = dns.message.make_response(query)
1043 response.answer.append(dns.rrset.from_text(
1044 name, 0, dns.rdataclass.IN, qtype, target))
448e7a2d 1045
6c88aa06
PD
1046 res = self.sendUDPQuery(query)
1047 print(res)
1048 self.assertRcodeEqual(res, dns.rcode.NOERROR)
1049 self.assertEqual(res.answer, response.answer)
448e7a2d 1050
1d76a103 1051 def _getCounter(self, tcp=False):
21f08f59
PD
1052 """
1053 Helper function for shared/non-shared testing
1054 """
1055 name = 'counter.example.org.'
1056
1057 query = dns.message.make_query(name, 'TXT')
1058 responses = []
1059
1d76a103
PD
1060 sender = self.sendTCPQuery if tcp else self.sendUDPQuery
1061
21f08f59 1062 for i in range(50):
1d76a103 1063 res = sender(query)
21f08f59
PD
1064 responses.append(res.answer[0][0])
1065
1066 return(responses)
1067
1068 def testCounter(self):
1069 """
1070 Test non-shared behaviour
1071 """
1072
1d76a103
PD
1073 resUDP = set(self._getCounter(tcp=False))
1074 resTCP = set(self._getCounter(tcp=True))
21f08f59 1075
1d76a103
PD
1076 self.assertEqual(len(resUDP), 1)
1077 self.assertEqual(len(resTCP), 1)
21f08f59 1078
3bf2f94d
PD
1079 def testDblookup(self):
1080 """
1081 Test dblookup() function
1082 """
1083
1084 name = 'dblookup.example.org.'
1085
1086 query = dns.message.make_query(name, 'A')
1087
1088 response = dns.message.make_response(query)
1089
1090 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.5'))
1091
1092 res = self.sendUDPQuery(query)
1093 self.assertRcodeEqual(res, dns.rcode.NOERROR)
1094 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
1095
26dbeed8
PD
1096 def testWhitespace(self, expectws=True):
1097 """
1098 Test TXT query for whitespace
1099 """
1100 name = 'whitespace.example.org.'
1101
1102 query = dns.message.make_query(name, 'TXT')
1103
1104 response = dns.message.make_response(query)
1105
1106 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.TXT, '"foo bar"' if expectws else '"foobar"'))
1107
1108 res = self.sendUDPQuery(query)
1109 self.assertRcodeEqual(res, dns.rcode.NOERROR)
1110 self.assertEqual(res.answer, response.answer)
1111
3bf2f94d 1112
876a35c3
PD
1113class TestLuaRecordsShared(TestLuaRecords):
1114 _config_template = """
1115geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
1116edns-subnet-processing=yes
1117launch=bind geoip
1118any-to-tcp=no
1119enable-lua-records=shared
26dbeed8 1120lua-records-insert-whitespace=yes
876a35c3
PD
1121lua-health-checks-interval=1
1122"""
448e7a2d 1123
21f08f59
PD
1124 def testCounter(self):
1125 """
1126 Test shared behaviour
1127 """
1128
1d76a103
PD
1129 resUDP = set(self._getCounter(tcp=False))
1130 resTCP = set(self._getCounter(tcp=True))
21f08f59 1131
1d76a103
PD
1132 self.assertEqual(len(resUDP), 50)
1133 self.assertEqual(len(resTCP), 50)
21f08f59 1134
26dbeed8
PD
1135class TestLuaRecordsNoWhiteSpace(TestLuaRecords):
1136 _config_template = """
1137geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
1138edns-subnet-processing=yes
1139launch=bind geoip
1140any-to-tcp=no
1141enable-lua-records
1142lua-records-insert-whitespace=no
1143lua-health-checks-interval=1
1144"""
1145
1146 def testWhitespace(self):
1147 return TestLuaRecords.testWhitespace(self, False)
1148
1bc56192
CHB
1149if __name__ == '__main__':
1150 unittest.main()
1151 exit(0)