]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
7 import clientsubnetoption
9 from authtests
import AuthTest
11 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
13 class FakeHTTPServer(BaseHTTPRequestHandler
):
14 def _set_headers(self
):
15 self
.send_response(200)
16 self
.send_header('Content-type', 'text/html')
21 if (self
.path
== '/ping.json'):
22 self
.wfile
.write('{"ping":"pong"}')
24 self
.wfile
.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
26 def log_message(self
, format
, *args
):
32 class TestLuaRecords(AuthTest
):
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
49 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
50 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
51 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
53 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
54 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
55 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
56 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
58 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
59 "EUWips={{'{prefix}.101','{prefix}.102'}} "
60 "EUEips={{'192.168.42.101','192.168.42.102'}} "
61 "NLips={{'{prefix}.111', '{prefix}.112'}} "
62 "USAips={{'{prefix}.103'}} ")
64 usa IN LUA A ( ";include('config') "
65 "return ifurlup('http://www.lua.org:8080/', "
66 "{{USAips, EUEips}}, settings) ")
68 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
69 "{{ '192.168.42.101', '{prefix}.101' }}, "
70 "{{ stringmatch='pong' }}) ")
72 eu-west IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{EUWips, EUEips, USAips}}, settings) ")
76 nl IN LUA A ( ";include('config') "
77 "return ifportup(8081, NLips) ")
78 latlon.geo IN LUA TXT "latlon()"
79 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
80 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
81 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
82 latlonloc.geo IN LUA TXT "latlonloc()"
84 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
86 "else return 'false' end " )
87 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
89 "else return 'false' end " )
91 view IN LUA A ("view({{ "
92 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
93 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
94 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
96 txt.view IN LUA TXT ("view({{ "
97 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
98 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
100 none.view IN LUA A ("view({{ "
101 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
102 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
104 *.magic IN LUA A "closestMagic()"
105 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
107 any IN LUA A "'192.0.2.1'"
108 any IN TXT "hello there"
115 def startResponders(cls
):
116 webserver
= threading
.Thread(name
='HTTP Listener',
117 target
=cls
.HTTPResponder
,
120 webserver
.setDaemon(True)
124 def HTTPResponder(cls
, port
):
125 server_address
= ('', port
)
126 httpd
= HTTPServer(server_address
, FakeHTTPServer
)
127 httpd
.serve_forever()
132 super(TestLuaRecords
, cls
).setUpClass()
134 cls
._web
_rrsets
= [dns
.rrset
.from_text('web1.example.org.', 0, dns
.rdataclass
.IN
, 'A',
135 '{prefix}.101'.format(prefix
=cls
._PREFIX
)),
136 dns
.rrset
.from_text('web2.example.org.', 0, dns
.rdataclass
.IN
, 'A',
137 '{prefix}.102'.format(prefix
=cls
._PREFIX
)),
138 dns
.rrset
.from_text('web3.example.org.', 0, dns
.rdataclass
.IN
, 'A',
139 '{prefix}.103'.format(prefix
=cls
._PREFIX
))
142 def testPickRandom(self
):
144 Basic pickrandom() test with a set of A records
146 expected
= [dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
147 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
148 dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
149 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
150 query
= dns
.message
.make_query('rand.example.org', 'A')
152 res
= self
.sendUDPQuery(query
)
153 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
154 self
.assertAnyRRsetInAnswer(res
, expected
)
156 def testBogusV6PickRandom(self
):
158 Test a bogus AAAA pickrandom() record with a set of v4 addr
160 query
= dns
.message
.make_query('v6-bogus.rand.example.org', 'AAAA')
162 res
= self
.sendUDPQuery(query
)
163 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
165 def testV6PickRandom(self
):
167 Test pickrandom() AAAA record
169 expected
= [dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
170 '2001:db8:a0b:12f0::1'),
171 dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
172 'fe80::2a1:9bff:fe9b:f268')]
173 query
= dns
.message
.make_query('v6.rand.example.org', 'AAAA')
175 res
= self
.sendUDPQuery(query
)
176 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
177 self
.assertAnyRRsetInAnswer(res
, expected
)
179 def testEmptyRandom(self
):
181 Basic pickrandom() test with an empty set
183 query
= dns
.message
.make_query('empty.rand.example.org', 'A')
185 res
= self
.sendUDPQuery(query
)
186 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
188 def testWRandom(self
):
190 Basic pickwrandom() test with a set of A records
192 expected
= [dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
193 '{prefix}.103'.format(prefix
=self
._PREFIX
)),
194 dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
195 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
196 query
= dns
.message
.make_query('wrand.example.org', 'A')
198 res
= self
.sendUDPQuery(query
)
199 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
200 self
.assertAnyRRsetInAnswer(res
, expected
)
202 def testIfportup(self
):
204 Basic ifportup() test
206 query
= dns
.message
.make_query('all.ifportup.example.org', 'A')
208 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
209 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
210 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
211 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
213 res
= self
.sendUDPQuery(query
)
214 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
215 self
.assertAnyRRsetInAnswer(res
, expected
)
217 def testIfportupWithSomeDown(self
):
219 Basic ifportup() test with some ports DOWN
221 query
= dns
.message
.make_query('some.ifportup.example.org', 'A')
223 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
225 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
226 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
228 # we first expect any of the IPs as no check has been performed yet
229 res
= self
.sendUDPQuery(query
)
230 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
231 self
.assertAnyRRsetInAnswer(res
, expected
)
233 # the first IP should not be up so only second shoud be returned
234 expected
= [expected
[1]]
235 res
= self
.sendUDPQuery(query
)
236 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
237 self
.assertAnyRRsetInAnswer(res
, expected
)
239 def testIfportupWithAllDown(self
):
241 Basic ifportup() test with all ports DOWN
243 query
= dns
.message
.make_query('none.ifportup.example.org', 'A')
245 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
247 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
248 '192.168.21.42'.format(prefix
=self
._PREFIX
))]
250 # we first expect any of the IPs as no check has been performed yet
251 res
= self
.sendUDPQuery(query
)
252 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
253 self
.assertAnyRRsetInAnswer(res
, expected
)
255 # no port should be up so we expect any
256 res
= self
.sendUDPQuery(query
)
257 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
258 self
.assertAnyRRsetInAnswer(res
, expected
)
260 def testIfurlup(self
):
265 '{prefix}.103'.format(prefix
=self
._PREFIX
)
267 unreachable
= ['192.168.42.101', '192.168.42.102']
268 ips
= reachable
+ unreachable
272 rr
= dns
.rrset
.from_text('usa.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
275 reachable_rrs
.append(rr
)
277 query
= dns
.message
.make_query('usa.example.org', 'A')
278 res
= self
.sendUDPQuery(query
)
279 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
280 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
283 res
= self
.sendUDPQuery(query
)
284 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
285 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
287 def testIfurlupSimplified(self
):
289 Basic ifurlup() test with the simplified list of ips
290 Also ensures the correct path is queried
293 '{prefix}.101'.format(prefix
=self
._PREFIX
)
295 unreachable
= ['192.168.42.101']
296 ips
= reachable
+ unreachable
300 rr
= dns
.rrset
.from_text('mix.ifurlup.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
303 reachable_rrs
.append(rr
)
305 query
= dns
.message
.make_query('mix.ifurlup.example.org', 'A')
306 res
= self
.sendUDPQuery(query
)
307 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
308 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
311 res
= self
.sendUDPQuery(query
)
312 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
313 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
315 def testLatlon(self
):
319 name
= 'latlon.geo.example.org.'
320 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
321 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
322 expected
= dns
.rrset
.from_text(name
, 0,
323 dns
.rdataclass
.IN
, 'TXT',
324 '"47.913000 -122.304200"')
326 res
= self
.sendUDPQuery(query
)
327 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
328 self
.assertRRsetInAnswer(res
, expected
)
330 def testLatlonloc(self
):
332 Basic latlonloc() test
334 name
= 'latlonloc.geo.example.org.'
335 expected
= dns
.rrset
.from_text(name
, 0,dns
.rdataclass
.IN
, 'TXT',
336 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
337 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
338 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
340 res
= self
.sendUDPQuery(query
)
341 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
342 self
.assertRRsetInAnswer(res
, expected
)
344 def testClosestMagic(self
):
346 Basic closestMagic() test
348 name
= 'www-balanced.example.org.'
349 cname
= '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
351 ('1.1.1.0', 24, '1.1.1.3'),
352 ('1.2.3.0', 24, '1.2.3.5'),
353 ('17.1.0.0', 16, '17.1.2.4')
356 for (subnet
, mask
, ip
) in queries
:
357 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
358 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
360 response
= dns
.message
.make_response(query
)
362 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.CNAME
, cname
))
363 response
.answer
.append(dns
.rrset
.from_text(cname
, 0, dns
.rdataclass
.IN
, 'A', ip
))
365 res
= self
.sendUDPQuery(query
)
366 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
367 self
.assertEqual(res
.answer
, response
.answer
)
374 ('1.1.1.0', 24, '"true"'),
375 ('1.2.3.0', 24, '"false"'),
376 ('17.1.0.0', 16, '"false"')
378 name
= 'asnum.geo.example.org.'
379 for (subnet
, mask
, txt
) in queries
:
380 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
381 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
382 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
384 res
= self
.sendUDPQuery(query
)
385 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
386 self
.assertRRsetInAnswer(res
, expected
)
388 def testCountry(self
):
393 ('1.1.1.0', 24, '"false"'),
394 ('1.2.3.0', 24, '"true"'),
395 ('17.1.0.0', 16, '"false"')
397 name
= 'country.geo.example.org.'
398 for (subnet
, mask
, txt
) in queries
:
399 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
400 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
401 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
403 res
= self
.sendUDPQuery(query
)
404 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
405 self
.assertRRsetInAnswer(res
, expected
)
407 def testContinent(self
):
409 Basic continent() test
412 ('1.1.1.0', 24, '"false"'),
413 ('1.2.3.0', 24, '"true"'),
414 ('17.1.0.0', 16, '"false"')
416 name
= 'continent.geo.example.org.'
417 for (subnet
, mask
, txt
) in queries
:
418 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
419 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
420 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
422 res
= self
.sendUDPQuery(query
)
423 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
424 self
.assertRRsetInAnswer(res
, expected
)
426 def testClosest(self
):
428 Basic pickclosest() test
431 ('1.1.1.0', 24, '1.1.1.2'),
432 ('1.2.3.0', 24, '1.2.3.4'),
433 ('17.1.0.0', 16, '1.1.1.2')
435 name
= 'closest.geo.example.org.'
436 for (subnet
, mask
, ip
) in queries
:
437 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
438 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
439 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', ip
)
441 res
= self
.sendUDPQuery(query
)
442 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
443 self
.assertRRsetInAnswer(res
, expected
)
445 def testNetmask(self
):
451 'expected': dns
.rrset
.from_text('true.netmask.example.org.', 0,
452 dns
.rdataclass
.IN
, 'TXT',
454 'query': dns
.message
.make_query('true.netmask.example.org', 'TXT')
457 'expected': dns
.rrset
.from_text('false.netmask.example.org.', 0,
458 dns
.rdataclass
.IN
, 'TXT',
460 'query': dns
.message
.make_query('false.netmask.example.org', 'TXT')
463 for query
in queries
:
464 res
= self
.sendUDPQuery(query
['query'])
465 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
466 self
.assertRRsetInAnswer(res
, query
['expected'])
474 'expected': dns
.rrset
.from_text('view.example.org.', 0,
475 dns
.rdataclass
.IN
, 'A',
476 '{prefix}.54'.format(prefix
=self
._PREFIX
)),
477 'query': dns
.message
.make_query('view.example.org', 'A')
480 'expected': dns
.rrset
.from_text('txt.view.example.org.', 0,
481 dns
.rdataclass
.IN
, 'TXT',
483 'query': dns
.message
.make_query('txt.view.example.org', 'TXT')
486 for query
in queries
:
487 res
= self
.sendUDPQuery(query
['query'])
488 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
489 self
.assertRRsetInAnswer(res
, query
['expected'])
491 def testViewNoMatch(self
):
493 view() test where no netmask match
495 expected
= dns
.rrset
.from_text('none.view.example.org.', 0,
496 dns
.rdataclass
.IN
, 'A')
497 query
= dns
.message
.make_query('none.view.example.org', 'A')
499 res
= self
.sendUDPQuery(query
)
500 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
501 self
.assertAnswerEmpty(res
)
503 def testWHashed(self
):
505 Basic pickwhashed() test with a set of A records
506 As the `bestwho` is hashed, we should always get the same answer
508 expected
= [dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '1.2.3.4'),
509 dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '4.3.2.1')]
510 query
= dns
.message
.make_query('whashed.example.org', 'A')
512 first
= self
.sendUDPQuery(query
)
513 self
.assertRcodeEqual(first
, dns
.rcode
.NOERROR
)
514 self
.assertAnyRRsetInAnswer(first
, expected
)
516 res
= self
.sendUDPQuery(query
)
517 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
518 self
.assertRRsetInAnswer(res
, first
.answer
[0])
520 def testTimeout(self
):
522 Test if LUA scripts are aborted if script execution takes too long
524 query
= dns
.message
.make_query('timeout.example.org', 'A')
526 first
= self
.sendUDPQuery(query
)
527 self
.assertRcodeEqual(first
, dns
.rcode
.SERVFAIL
)
532 Test A query against `any`
534 name
= 'any.example.org.'
536 query
= dns
.message
.make_query(name
, 'A')
538 response
= dns
.message
.make_response(query
)
540 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
542 res
= self
.sendUDPQuery(query
)
543 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
544 self
.assertEqual(res
.answer
, response
.answer
)
548 Test ANY query against `any`
551 name
= 'any.example.org.'
553 query
= dns
.message
.make_query(name
, 'ANY')
555 response
= dns
.message
.make_response(query
)
557 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
558 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', '"hello there"'))
560 res
= self
.sendUDPQuery(query
)
561 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
562 self
.assertEqual(self
.sortRRsets(res
.answer
), self
.sortRRsets(response
.answer
))
564 if __name__
== '__main__':