]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
Merge pull request #6518 from rgacogne/rec-max-udp-queries-per-round
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
1 #!/usr/bin/env python
2 import unittest
3 import requests
4 import threading
5 import dns
6 import time
7 import clientsubnetoption
8
9 from authtests import AuthTest
10
11 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
12
13 class FakeHTTPServer(BaseHTTPRequestHandler):
14 def _set_headers(self):
15 self.send_response(200)
16 self.send_header('Content-type', 'text/html')
17 self.end_headers()
18
19 def do_GET(self):
20 self._set_headers()
21 if (self.path == '/ping.json'):
22 self.wfile.write('{"ping":"pong"}')
23 else:
24 self.wfile.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
25
26 def log_message(self, format, *args):
27 return
28
29 def do_HEAD(self):
30 self._set_headers()
31
32 class TestLuaRecords(AuthTest):
33 _zones = {
34 'example.org': """
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
40
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
44
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
48
49 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
50 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
51 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
53 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
54 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
55 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
56 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
57
58 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
59 "EUWips={{'{prefix}.101','{prefix}.102'}} "
60 "EUEips={{'192.168.42.101','192.168.42.102'}} "
61 "NLips={{'{prefix}.111', '{prefix}.112'}} "
62 "USAips={{'{prefix}.103'}} ")
63
64 usa IN LUA A ( ";include('config') "
65 "return ifurlup('http://www.lua.org:8080/', "
66 "{{USAips, EUEips}}, settings) ")
67
68 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
69 "{{ '192.168.42.101', '{prefix}.101' }}, "
70 "{{ stringmatch='pong' }}) ")
71
72 eu-west IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{EUWips, EUEips, USAips}}, settings) ")
75
76 nl IN LUA A ( ";include('config') "
77 "return ifportup(8081, NLips) ")
78 latlon.geo IN LUA TXT "latlon()"
79 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
80 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
81 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
82 latlonloc.geo IN LUA TXT "latlonloc()"
83
84 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
85 "then return 'true' "
86 "else return 'false' end " )
87 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
88 "then return 'true' "
89 "else return 'false' end " )
90
91 view IN LUA A ("view({{ "
92 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
93 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
94 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
95 " }}) " )
96 txt.view IN LUA TXT ("view({{ "
97 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
98 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
99 " }}) " )
100 none.view IN LUA A ("view({{ "
101 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
102 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
103 " }}) " )
104 *.magic IN LUA A "closestMagic()"
105 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
106 """,
107 }
108 _web_rrsets = []
109
110 @classmethod
111 def startResponders(cls):
112 webserver = threading.Thread(name='HTTP Listener',
113 target=cls.HTTPResponder,
114 args=[8080]
115 )
116 webserver.setDaemon(True)
117 webserver.start()
118
119 @classmethod
120 def HTTPResponder(cls, port):
121 server_address = ('', port)
122 httpd = HTTPServer(server_address, FakeHTTPServer)
123 httpd.serve_forever()
124
125 @classmethod
126 def setUpClass(cls):
127
128 super(TestLuaRecords, cls).setUpClass()
129
130 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
131 '{prefix}.101'.format(prefix=cls._PREFIX)),
132 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
133 '{prefix}.102'.format(prefix=cls._PREFIX)),
134 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
135 '{prefix}.103'.format(prefix=cls._PREFIX))
136 ]
137
138 def testPickRandom(self):
139 """
140 Basic pickrandom() test with a set of A records
141 """
142 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
143 '{prefix}.101'.format(prefix=self._PREFIX)),
144 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
145 '{prefix}.102'.format(prefix=self._PREFIX))]
146 query = dns.message.make_query('rand.example.org', 'A')
147
148 res = self.sendUDPQuery(query)
149 self.assertRcodeEqual(res, dns.rcode.NOERROR)
150 self.assertAnyRRsetInAnswer(res, expected)
151
152 def testBogusV6PickRandom(self):
153 """
154 Test a bogus AAAA pickrandom() record with a set of v4 addr
155 """
156 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
157
158 res = self.sendUDPQuery(query)
159 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
160
161 def testV6PickRandom(self):
162 """
163 Test pickrandom() AAAA record
164 """
165 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
166 '2001:db8:a0b:12f0::1'),
167 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
168 'fe80::2a1:9bff:fe9b:f268')]
169 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
170
171 res = self.sendUDPQuery(query)
172 self.assertRcodeEqual(res, dns.rcode.NOERROR)
173 self.assertAnyRRsetInAnswer(res, expected)
174
175 def testEmptyRandom(self):
176 """
177 Basic pickrandom() test with an empty set
178 """
179 query = dns.message.make_query('empty.rand.example.org', 'A')
180
181 res = self.sendUDPQuery(query)
182 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
183
184 def testWRandom(self):
185 """
186 Basic pickwrandom() test with a set of A records
187 """
188 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
189 '{prefix}.103'.format(prefix=self._PREFIX)),
190 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
191 '{prefix}.102'.format(prefix=self._PREFIX))]
192 query = dns.message.make_query('wrand.example.org', 'A')
193
194 res = self.sendUDPQuery(query)
195 self.assertRcodeEqual(res, dns.rcode.NOERROR)
196 self.assertAnyRRsetInAnswer(res, expected)
197
198 def testIfportup(self):
199 """
200 Basic ifportup() test
201 """
202 query = dns.message.make_query('all.ifportup.example.org', 'A')
203 expected = [
204 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
205 '{prefix}.101'.format(prefix=self._PREFIX)),
206 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
207 '{prefix}.102'.format(prefix=self._PREFIX))]
208
209 res = self.sendUDPQuery(query)
210 self.assertRcodeEqual(res, dns.rcode.NOERROR)
211 self.assertAnyRRsetInAnswer(res, expected)
212
213 def testIfportupWithSomeDown(self):
214 """
215 Basic ifportup() test with some ports DOWN
216 """
217 query = dns.message.make_query('some.ifportup.example.org', 'A')
218 expected = [
219 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
220 '192.168.42.21'),
221 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
222 '{prefix}.102'.format(prefix=self._PREFIX))]
223
224 # we first expect any of the IPs as no check has been performed yet
225 res = self.sendUDPQuery(query)
226 self.assertRcodeEqual(res, dns.rcode.NOERROR)
227 self.assertAnyRRsetInAnswer(res, expected)
228
229 # the first IP should not be up so only second shoud be returned
230 expected = [expected[1]]
231 res = self.sendUDPQuery(query)
232 self.assertRcodeEqual(res, dns.rcode.NOERROR)
233 self.assertAnyRRsetInAnswer(res, expected)
234
235 def testIfportupWithAllDown(self):
236 """
237 Basic ifportup() test with all ports DOWN
238 """
239 query = dns.message.make_query('none.ifportup.example.org', 'A')
240 expected = [
241 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
242 '192.168.42.21'),
243 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
244 '192.168.21.42'.format(prefix=self._PREFIX))]
245
246 # we first expect any of the IPs as no check has been performed yet
247 res = self.sendUDPQuery(query)
248 self.assertRcodeEqual(res, dns.rcode.NOERROR)
249 self.assertAnyRRsetInAnswer(res, expected)
250
251 # no port should be up so we expect any
252 res = self.sendUDPQuery(query)
253 self.assertRcodeEqual(res, dns.rcode.NOERROR)
254 self.assertAnyRRsetInAnswer(res, expected)
255
256 def testIfurlup(self):
257 """
258 Basic ifurlup() test
259 """
260 reachable = [
261 '{prefix}.103'.format(prefix=self._PREFIX)
262 ]
263 unreachable = ['192.168.42.101', '192.168.42.102']
264 ips = reachable + unreachable
265 all_rrs = []
266 reachable_rrs = []
267 for ip in ips:
268 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
269 all_rrs.append(rr)
270 if ip in reachable:
271 reachable_rrs.append(rr)
272
273 query = dns.message.make_query('usa.example.org', 'A')
274 res = self.sendUDPQuery(query)
275 self.assertRcodeEqual(res, dns.rcode.NOERROR)
276 self.assertAnyRRsetInAnswer(res, all_rrs)
277
278 time.sleep(1)
279 res = self.sendUDPQuery(query)
280 self.assertRcodeEqual(res, dns.rcode.NOERROR)
281 self.assertAnyRRsetInAnswer(res, reachable_rrs)
282
283 def testIfurlupSimplified(self):
284 """
285 Basic ifurlup() test with the simplified list of ips
286 Also ensures the correct path is queried
287 """
288 reachable = [
289 '{prefix}.101'.format(prefix=self._PREFIX)
290 ]
291 unreachable = ['192.168.42.101']
292 ips = reachable + unreachable
293 all_rrs = []
294 reachable_rrs = []
295 for ip in ips:
296 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
297 all_rrs.append(rr)
298 if ip in reachable:
299 reachable_rrs.append(rr)
300
301 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
302 res = self.sendUDPQuery(query)
303 self.assertRcodeEqual(res, dns.rcode.NOERROR)
304 self.assertAnyRRsetInAnswer(res, all_rrs)
305
306 time.sleep(1)
307 res = self.sendUDPQuery(query)
308 self.assertRcodeEqual(res, dns.rcode.NOERROR)
309 self.assertAnyRRsetInAnswer(res, reachable_rrs)
310
311 def testLatlon(self):
312 """
313 Basic latlon() test
314 """
315 name = 'latlon.geo.example.org.'
316 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
317 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
318 expected = dns.rrset.from_text(name, 0,
319 dns.rdataclass.IN, 'TXT',
320 '"47.913000 -122.304200"')
321
322 res = self.sendUDPQuery(query)
323 self.assertRcodeEqual(res, dns.rcode.NOERROR)
324 self.assertRRsetInAnswer(res, expected)
325
326 def testLatlonloc(self):
327 """
328 Basic latlonloc() test
329 """
330 name = 'latlonloc.geo.example.org.'
331 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
332 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
333 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
334 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
335
336 res = self.sendUDPQuery(query)
337 self.assertRcodeEqual(res, dns.rcode.NOERROR)
338 self.assertRRsetInAnswer(res, expected)
339
340 def testClosestMagic(self):
341 """
342 Basic closestMagic() test
343 """
344 name = 'www-balanced.example.org.'
345 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
346 queries = [
347 ('1.1.1.0', 24, '1.1.1.3'),
348 ('1.2.3.0', 24, '1.2.3.5'),
349 ('17.1.0.0', 16, '17.1.2.4')
350 ]
351
352 for (subnet, mask, ip) in queries:
353 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
354 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
355
356 response = dns.message.make_response(query)
357
358 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
359 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
360
361 res = self.sendUDPQuery(query)
362 self.assertRcodeEqual(res, dns.rcode.NOERROR)
363 self.assertEqual(res.answer, response.answer)
364
365 def testAsnum(self):
366 """
367 Basic asnum() test
368 """
369 queries = [
370 ('1.1.1.0', 24, '"true"'),
371 ('1.2.3.0', 24, '"false"'),
372 ('17.1.0.0', 16, '"false"')
373 ]
374 name = 'asnum.geo.example.org.'
375 for (subnet, mask, txt) in queries:
376 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
377 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
378 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
379
380 res = self.sendUDPQuery(query)
381 self.assertRcodeEqual(res, dns.rcode.NOERROR)
382 self.assertRRsetInAnswer(res, expected)
383
384 def testCountry(self):
385 """
386 Basic country() test
387 """
388 queries = [
389 ('1.1.1.0', 24, '"false"'),
390 ('1.2.3.0', 24, '"true"'),
391 ('17.1.0.0', 16, '"false"')
392 ]
393 name = 'country.geo.example.org.'
394 for (subnet, mask, txt) in queries:
395 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
396 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
397 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
398
399 res = self.sendUDPQuery(query)
400 self.assertRcodeEqual(res, dns.rcode.NOERROR)
401 self.assertRRsetInAnswer(res, expected)
402
403 def testContinent(self):
404 """
405 Basic continent() test
406 """
407 queries = [
408 ('1.1.1.0', 24, '"false"'),
409 ('1.2.3.0', 24, '"true"'),
410 ('17.1.0.0', 16, '"false"')
411 ]
412 name = 'continent.geo.example.org.'
413 for (subnet, mask, txt) in queries:
414 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
415 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
416 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
417
418 res = self.sendUDPQuery(query)
419 self.assertRcodeEqual(res, dns.rcode.NOERROR)
420 self.assertRRsetInAnswer(res, expected)
421
422 def testClosest(self):
423 """
424 Basic pickclosest() test
425 """
426 queries = [
427 ('1.1.1.0', 24, '1.1.1.2'),
428 ('1.2.3.0', 24, '1.2.3.4'),
429 ('17.1.0.0', 16, '1.1.1.2')
430 ]
431 name = 'closest.geo.example.org.'
432 for (subnet, mask, ip) in queries:
433 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
434 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
435 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
436
437 res = self.sendUDPQuery(query)
438 self.assertRcodeEqual(res, dns.rcode.NOERROR)
439 self.assertRRsetInAnswer(res, expected)
440
441 def testNetmask(self):
442 """
443 Basic netmask() test
444 """
445 queries = [
446 {
447 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
448 dns.rdataclass.IN, 'TXT',
449 '"true"'),
450 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
451 },
452 {
453 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
454 dns.rdataclass.IN, 'TXT',
455 '"false"'),
456 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
457 }
458 ]
459 for query in queries :
460 res = self.sendUDPQuery(query['query'])
461 self.assertRcodeEqual(res, dns.rcode.NOERROR)
462 self.assertRRsetInAnswer(res, query['expected'])
463
464 def testView(self):
465 """
466 Basic view() test
467 """
468 queries = [
469 {
470 'expected': dns.rrset.from_text('view.example.org.', 0,
471 dns.rdataclass.IN, 'A',
472 '{prefix}.54'.format(prefix=self._PREFIX)),
473 'query': dns.message.make_query('view.example.org', 'A')
474 },
475 {
476 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
477 dns.rdataclass.IN, 'TXT',
478 '"else"'),
479 'query': dns.message.make_query('txt.view.example.org', 'TXT')
480 }
481 ]
482 for query in queries :
483 res = self.sendUDPQuery(query['query'])
484 self.assertRcodeEqual(res, dns.rcode.NOERROR)
485 self.assertRRsetInAnswer(res, query['expected'])
486
487 def testViewNoMatch(self):
488 """
489 view() test where no netmask match
490 """
491 expected = dns.rrset.from_text('none.view.example.org.', 0,
492 dns.rdataclass.IN, 'A')
493 query = dns.message.make_query('none.view.example.org', 'A')
494
495 res = self.sendUDPQuery(query)
496 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
497 self.assertAnswerEmpty(res)
498
499 def testWHashed(self):
500 """
501 Basic pickwhashed() test with a set of A records
502 As the `bestwho` is hashed, we should always get the same answer
503 """
504 expected = [dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
505 dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
506 query = dns.message.make_query('whashed.example.org', 'A')
507
508 first = self.sendUDPQuery(query)
509 self.assertRcodeEqual(first, dns.rcode.NOERROR)
510 self.assertAnyRRsetInAnswer(first, expected)
511 for _ in range(5):
512 res = self.sendUDPQuery(query)
513 self.assertRcodeEqual(res, dns.rcode.NOERROR)
514 self.assertRRsetInAnswer(res, first.answer[0])
515
516 def testTimeout(self):
517 """
518 Test if LUA scripts are aborted if script execution takes too long
519 """
520 query = dns.message.make_query('timeout.example.org', 'A')
521
522 first = self.sendUDPQuery(query)
523 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
524
525 if __name__ == '__main__':
526 unittest.main()
527 exit(0)