]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
7 import clientsubnetoption
9 from authtests
import AuthTest
11 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
13 class FakeHTTPServer(BaseHTTPRequestHandler
):
14 def _set_headers(self
):
15 self
.send_response(200)
16 self
.send_header('Content-type', 'text/html')
21 if (self
.path
== '/ping.json'):
22 self
.wfile
.write('{"ping":"pong"}')
24 self
.wfile
.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
26 def log_message(self
, format
, *args
):
32 class TestLuaRecords(AuthTest
):
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
49 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
50 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
51 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
53 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
54 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
55 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
56 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
58 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
59 "EUWips={{'{prefix}.101','{prefix}.102'}} "
60 "EUEips={{'192.168.42.101','192.168.42.102'}} "
61 "NLips={{'{prefix}.111', '{prefix}.112'}} "
62 "USAips={{'{prefix}.103'}} ")
64 usa IN LUA A ( ";include('config') "
65 "return ifurlup('http://www.lua.org:8080/', "
66 "{{USAips, EUEips}}, settings) ")
68 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
69 "{{ '192.168.42.101', '{prefix}.101' }}, "
70 "{{ stringmatch='pong' }}) ")
72 eu-west IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{EUWips, EUEips, USAips}}, settings) ")
76 nl IN LUA A ( ";include('config') "
77 "return ifportup(8081, NLips) ")
78 latlon.geo IN LUA TXT "latlon()"
79 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
80 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
81 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
82 latlonloc.geo IN LUA TXT "latlonloc()"
84 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
86 "else return 'false' end " )
87 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
89 "else return 'false' end " )
91 view IN LUA A ("view({{ "
92 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
93 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
94 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
96 txt.view IN LUA TXT ("view({{ "
97 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
98 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
100 none.view IN LUA A ("view({{ "
101 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
102 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
104 *.magic IN LUA A "closestMagic()"
105 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
111 def startResponders(cls
):
112 webserver
= threading
.Thread(name
='HTTP Listener',
113 target
=cls
.HTTPResponder
,
116 webserver
.setDaemon(True)
120 def HTTPResponder(cls
, port
):
121 server_address
= ('', port
)
122 httpd
= HTTPServer(server_address
, FakeHTTPServer
)
123 httpd
.serve_forever()
128 super(TestLuaRecords
, cls
).setUpClass()
130 cls
._web
_rrsets
= [dns
.rrset
.from_text('web1.example.org.', 0, dns
.rdataclass
.IN
, 'A',
131 '{prefix}.101'.format(prefix
=cls
._PREFIX
)),
132 dns
.rrset
.from_text('web2.example.org.', 0, dns
.rdataclass
.IN
, 'A',
133 '{prefix}.102'.format(prefix
=cls
._PREFIX
)),
134 dns
.rrset
.from_text('web3.example.org.', 0, dns
.rdataclass
.IN
, 'A',
135 '{prefix}.103'.format(prefix
=cls
._PREFIX
))
138 def testPickRandom(self
):
140 Basic pickrandom() test with a set of A records
142 expected
= [dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
143 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
144 dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
145 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
146 query
= dns
.message
.make_query('rand.example.org', 'A')
148 res
= self
.sendUDPQuery(query
)
149 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
150 self
.assertAnyRRsetInAnswer(res
, expected
)
152 def testBogusV6PickRandom(self
):
154 Test a bogus AAAA pickrandom() record with a set of v4 addr
156 query
= dns
.message
.make_query('v6-bogus.rand.example.org', 'AAAA')
158 res
= self
.sendUDPQuery(query
)
159 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
161 def testV6PickRandom(self
):
163 Test pickrandom() AAAA record
165 expected
= [dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
166 '2001:db8:a0b:12f0::1'),
167 dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
168 'fe80::2a1:9bff:fe9b:f268')]
169 query
= dns
.message
.make_query('v6.rand.example.org', 'AAAA')
171 res
= self
.sendUDPQuery(query
)
172 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
173 self
.assertAnyRRsetInAnswer(res
, expected
)
175 def testEmptyRandom(self
):
177 Basic pickrandom() test with an empty set
179 query
= dns
.message
.make_query('empty.rand.example.org', 'A')
181 res
= self
.sendUDPQuery(query
)
182 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
184 def testWRandom(self
):
186 Basic pickwrandom() test with a set of A records
188 expected
= [dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
189 '{prefix}.103'.format(prefix
=self
._PREFIX
)),
190 dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
191 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
192 query
= dns
.message
.make_query('wrand.example.org', 'A')
194 res
= self
.sendUDPQuery(query
)
195 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
196 self
.assertAnyRRsetInAnswer(res
, expected
)
198 def testIfportup(self
):
200 Basic ifportup() test
202 query
= dns
.message
.make_query('all.ifportup.example.org', 'A')
204 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
205 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
206 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
207 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
209 res
= self
.sendUDPQuery(query
)
210 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
211 self
.assertAnyRRsetInAnswer(res
, expected
)
213 def testIfportupWithSomeDown(self
):
215 Basic ifportup() test with some ports DOWN
217 query
= dns
.message
.make_query('some.ifportup.example.org', 'A')
219 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
221 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
222 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
224 # we first expect any of the IPs as no check has been performed yet
225 res
= self
.sendUDPQuery(query
)
226 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
227 self
.assertAnyRRsetInAnswer(res
, expected
)
229 # the first IP should not be up so only second shoud be returned
230 expected
= [expected
[1]]
231 res
= self
.sendUDPQuery(query
)
232 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
233 self
.assertAnyRRsetInAnswer(res
, expected
)
235 def testIfportupWithAllDown(self
):
237 Basic ifportup() test with all ports DOWN
239 query
= dns
.message
.make_query('none.ifportup.example.org', 'A')
241 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
243 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
244 '192.168.21.42'.format(prefix
=self
._PREFIX
))]
246 # we first expect any of the IPs as no check has been performed yet
247 res
= self
.sendUDPQuery(query
)
248 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
249 self
.assertAnyRRsetInAnswer(res
, expected
)
251 # no port should be up so we expect any
252 res
= self
.sendUDPQuery(query
)
253 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
254 self
.assertAnyRRsetInAnswer(res
, expected
)
256 def testIfurlup(self
):
261 '{prefix}.103'.format(prefix
=self
._PREFIX
)
263 unreachable
= ['192.168.42.101', '192.168.42.102']
264 ips
= reachable
+ unreachable
268 rr
= dns
.rrset
.from_text('usa.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
271 reachable_rrs
.append(rr
)
273 query
= dns
.message
.make_query('usa.example.org', 'A')
274 res
= self
.sendUDPQuery(query
)
275 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
276 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
279 res
= self
.sendUDPQuery(query
)
280 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
281 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
283 def testIfurlupSimplified(self
):
285 Basic ifurlup() test with the simplified list of ips
286 Also ensures the correct path is queried
289 '{prefix}.101'.format(prefix
=self
._PREFIX
)
291 unreachable
= ['192.168.42.101']
292 ips
= reachable
+ unreachable
296 rr
= dns
.rrset
.from_text('mix.ifurlup.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
299 reachable_rrs
.append(rr
)
301 query
= dns
.message
.make_query('mix.ifurlup.example.org', 'A')
302 res
= self
.sendUDPQuery(query
)
303 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
304 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
307 res
= self
.sendUDPQuery(query
)
308 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
309 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
311 def testLatlon(self
):
315 name
= 'latlon.geo.example.org.'
316 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
317 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
318 expected
= dns
.rrset
.from_text(name
, 0,
319 dns
.rdataclass
.IN
, 'TXT',
320 '"47.913000 -122.304200"')
322 res
= self
.sendUDPQuery(query
)
323 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
324 self
.assertRRsetInAnswer(res
, expected
)
326 def testLatlonloc(self
):
328 Basic latlonloc() test
330 name
= 'latlonloc.geo.example.org.'
331 expected
= dns
.rrset
.from_text(name
, 0,dns
.rdataclass
.IN
, 'TXT',
332 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
333 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
334 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
336 res
= self
.sendUDPQuery(query
)
337 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
338 self
.assertRRsetInAnswer(res
, expected
)
340 def testClosestMagic(self
):
342 Basic closestMagic() test
344 name
= 'www-balanced.example.org.'
345 cname
= '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
347 ('1.1.1.0', 24, '1.1.1.3'),
348 ('1.2.3.0', 24, '1.2.3.5'),
349 ('17.1.0.0', 16, '17.1.2.4')
352 for (subnet
, mask
, ip
) in queries
:
353 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
354 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
356 response
= dns
.message
.make_response(query
)
358 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.CNAME
, cname
))
359 response
.answer
.append(dns
.rrset
.from_text(cname
, 0, dns
.rdataclass
.IN
, 'A', ip
))
361 res
= self
.sendUDPQuery(query
)
362 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
363 self
.assertEqual(res
.answer
, response
.answer
)
370 ('1.1.1.0', 24, '"true"'),
371 ('1.2.3.0', 24, '"false"'),
372 ('17.1.0.0', 16, '"false"')
374 name
= 'asnum.geo.example.org.'
375 for (subnet
, mask
, txt
) in queries
:
376 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
377 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
378 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
380 res
= self
.sendUDPQuery(query
)
381 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
382 self
.assertRRsetInAnswer(res
, expected
)
384 def testCountry(self
):
389 ('1.1.1.0', 24, '"false"'),
390 ('1.2.3.0', 24, '"true"'),
391 ('17.1.0.0', 16, '"false"')
393 name
= 'country.geo.example.org.'
394 for (subnet
, mask
, txt
) in queries
:
395 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
396 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
397 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
399 res
= self
.sendUDPQuery(query
)
400 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
401 self
.assertRRsetInAnswer(res
, expected
)
403 def testContinent(self
):
405 Basic continent() test
408 ('1.1.1.0', 24, '"false"'),
409 ('1.2.3.0', 24, '"true"'),
410 ('17.1.0.0', 16, '"false"')
412 name
= 'continent.geo.example.org.'
413 for (subnet
, mask
, txt
) in queries
:
414 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
415 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
416 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
418 res
= self
.sendUDPQuery(query
)
419 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
420 self
.assertRRsetInAnswer(res
, expected
)
422 def testClosest(self
):
424 Basic pickclosest() test
427 ('1.1.1.0', 24, '1.1.1.2'),
428 ('1.2.3.0', 24, '1.2.3.4'),
429 ('17.1.0.0', 16, '1.1.1.2')
431 name
= 'closest.geo.example.org.'
432 for (subnet
, mask
, ip
) in queries
:
433 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
434 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
435 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', ip
)
437 res
= self
.sendUDPQuery(query
)
438 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
439 self
.assertRRsetInAnswer(res
, expected
)
441 def testNetmask(self
):
447 'expected': dns
.rrset
.from_text('true.netmask.example.org.', 0,
448 dns
.rdataclass
.IN
, 'TXT',
450 'query': dns
.message
.make_query('true.netmask.example.org', 'TXT')
453 'expected': dns
.rrset
.from_text('false.netmask.example.org.', 0,
454 dns
.rdataclass
.IN
, 'TXT',
456 'query': dns
.message
.make_query('false.netmask.example.org', 'TXT')
459 for query
in queries
:
460 res
= self
.sendUDPQuery(query
['query'])
461 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
462 self
.assertRRsetInAnswer(res
, query
['expected'])
470 'expected': dns
.rrset
.from_text('view.example.org.', 0,
471 dns
.rdataclass
.IN
, 'A',
472 '{prefix}.54'.format(prefix
=self
._PREFIX
)),
473 'query': dns
.message
.make_query('view.example.org', 'A')
476 'expected': dns
.rrset
.from_text('txt.view.example.org.', 0,
477 dns
.rdataclass
.IN
, 'TXT',
479 'query': dns
.message
.make_query('txt.view.example.org', 'TXT')
482 for query
in queries
:
483 res
= self
.sendUDPQuery(query
['query'])
484 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
485 self
.assertRRsetInAnswer(res
, query
['expected'])
487 def testViewNoMatch(self
):
489 view() test where no netmask match
491 expected
= dns
.rrset
.from_text('none.view.example.org.', 0,
492 dns
.rdataclass
.IN
, 'A')
493 query
= dns
.message
.make_query('none.view.example.org', 'A')
495 res
= self
.sendUDPQuery(query
)
496 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
497 self
.assertAnswerEmpty(res
)
499 def testWHashed(self
):
501 Basic pickwhashed() test with a set of A records
502 As the `bestwho` is hashed, we should always get the same answer
504 expected
= [dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '1.2.3.4'),
505 dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '4.3.2.1')]
506 query
= dns
.message
.make_query('whashed.example.org', 'A')
508 first
= self
.sendUDPQuery(query
)
509 self
.assertRcodeEqual(first
, dns
.rcode
.NOERROR
)
510 self
.assertAnyRRsetInAnswer(first
, expected
)
512 res
= self
.sendUDPQuery(query
)
513 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
514 self
.assertRRsetInAnswer(res
, first
.answer
[0])
516 def testTimeout(self
):
518 Test if LUA scripts are aborted if script execution takes too long
520 query
= dns
.message
.make_query('timeout.example.org', 'A')
522 first
= self
.sendUDPQuery(query
)
523 self
.assertRcodeEqual(first
, dns
.rcode
.SERVFAIL
)
525 if __name__
== '__main__':