]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
Limit the number of queries sent out to get NS addresses per query.
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
1 #!/usr/bin/env python
2 import unittest
3 import requests
4 import threading
5 import dns
6 import time
7 import clientsubnetoption
8
9 from authtests import AuthTest
10
11 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
12
13 class FakeHTTPServer(BaseHTTPRequestHandler):
14 def _set_headers(self):
15 self.send_response(200)
16 self.send_header('Content-type', 'text/html')
17 self.end_headers()
18
19 def do_GET(self):
20 self._set_headers()
21 if (self.path == '/ping.json'):
22 self.wfile.write('{"ping":"pong"}')
23 else:
24 self.wfile.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
25
26 def log_message(self, format, *args):
27 return
28
29 def do_HEAD(self):
30 self._set_headers()
31
32 class TestLuaRecords(AuthTest):
33 _config_template = """
34 geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
35 edns-subnet-processing=yes
36 launch=bind geoip
37 any-to-tcp=no
38 """
39
40 _zones = {
41 'example.org': """
42 example.org. 3600 IN SOA {soa}
43 example.org. 3600 IN NS ns1.example.org.
44 example.org. 3600 IN NS ns2.example.org.
45 ns1.example.org. 3600 IN A {prefix}.10
46 ns2.example.org. 3600 IN A {prefix}.11
47
48 web1.example.org. 3600 IN A {prefix}.101
49 web2.example.org. 3600 IN A {prefix}.102
50 web3.example.org. 3600 IN A {prefix}.103
51
52 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
53 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
54 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
55 all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
56
57 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
58 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
59 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
61 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
62 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
63 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
64 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
65
66 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
67 "EUWips={{'{prefix}.101','{prefix}.102'}} "
68 "EUEips={{'192.168.42.101','192.168.42.102'}} "
69 "NLips={{'{prefix}.111', '{prefix}.112'}} "
70 "USAips={{'{prefix}.103'}} ")
71
72 usa IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{USAips, EUEips}}, settings) ")
75
76 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
77 "{{ '192.168.42.101', '{prefix}.101' }}, "
78 "{{ stringmatch='pong' }}) ")
79
80 eu-west IN LUA A ( ";include('config') "
81 "return ifurlup('http://www.lua.org:8080/', "
82 "{{EUWips, EUEips, USAips}}, settings) ")
83
84 nl IN LUA A ( ";include('config') "
85 "return ifportup(8081, NLips) ")
86 latlon.geo IN LUA TXT "latlon()"
87 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
88 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
89 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
90 latlonloc.geo IN LUA TXT "latlonloc()"
91
92 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
93 "then return 'true' "
94 "else return 'false' end " )
95 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
96 "then return 'true' "
97 "else return 'false' end " )
98
99 view IN LUA A ("view({{ "
100 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
101 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
102 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
103 " }}) " )
104 txt.view IN LUA TXT ("view({{ "
105 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
106 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
107 " }}) " )
108 none.view IN LUA A ("view({{ "
109 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
110 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
111 " }}) " )
112 *.magic IN LUA A "closestMagic()"
113 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
114
115 any IN LUA A "'192.0.2.1'"
116 any IN TXT "hello there"
117
118 resolve IN LUA A ";local r=resolve('localhost', 1) local t={{}} for _,v in ipairs(r) do table.insert(t, v:toString()) end return t"
119
120 """,
121 }
122 _web_rrsets = []
123
124 @classmethod
125 def startResponders(cls):
126 webserver = threading.Thread(name='HTTP Listener',
127 target=cls.HTTPResponder,
128 args=[8080]
129 )
130 webserver.setDaemon(True)
131 webserver.start()
132
133 @classmethod
134 def HTTPResponder(cls, port):
135 server_address = ('', port)
136 httpd = HTTPServer(server_address, FakeHTTPServer)
137 httpd.serve_forever()
138
139 @classmethod
140 def setUpClass(cls):
141
142 super(TestLuaRecords, cls).setUpClass()
143
144 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
145 '{prefix}.101'.format(prefix=cls._PREFIX)),
146 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
147 '{prefix}.102'.format(prefix=cls._PREFIX)),
148 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
149 '{prefix}.103'.format(prefix=cls._PREFIX))
150 ]
151
152 def testPickRandom(self):
153 """
154 Basic pickrandom() test with a set of A records
155 """
156 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
157 '{prefix}.101'.format(prefix=self._PREFIX)),
158 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
159 '{prefix}.102'.format(prefix=self._PREFIX))]
160 query = dns.message.make_query('rand.example.org', 'A')
161
162 res = self.sendUDPQuery(query)
163 self.assertRcodeEqual(res, dns.rcode.NOERROR)
164 self.assertAnyRRsetInAnswer(res, expected)
165
166 def testBogusV6PickRandom(self):
167 """
168 Test a bogus AAAA pickrandom() record with a set of v4 addr
169 """
170 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
171
172 res = self.sendUDPQuery(query)
173 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
174
175 def testV6PickRandom(self):
176 """
177 Test pickrandom() AAAA record
178 """
179 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
180 '2001:db8:a0b:12f0::1'),
181 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
182 'fe80::2a1:9bff:fe9b:f268')]
183 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
184
185 res = self.sendUDPQuery(query)
186 self.assertRcodeEqual(res, dns.rcode.NOERROR)
187 self.assertAnyRRsetInAnswer(res, expected)
188
189 def testEmptyRandom(self):
190 """
191 Basic pickrandom() test with an empty set
192 """
193 query = dns.message.make_query('empty.rand.example.org', 'A')
194
195 res = self.sendUDPQuery(query)
196 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
197
198 def testWRandom(self):
199 """
200 Basic pickwrandom() test with a set of A records
201 """
202 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
203 '{prefix}.103'.format(prefix=self._PREFIX)),
204 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
205 '{prefix}.102'.format(prefix=self._PREFIX))]
206 query = dns.message.make_query('wrand.example.org', 'A')
207
208 res = self.sendUDPQuery(query)
209 self.assertRcodeEqual(res, dns.rcode.NOERROR)
210 self.assertAnyRRsetInAnswer(res, expected)
211
212 def testIfportup(self):
213 """
214 Basic ifportup() test
215 """
216 query = dns.message.make_query('all.ifportup.example.org', 'A')
217 expected = [
218 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
219 '{prefix}.101'.format(prefix=self._PREFIX)),
220 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
221 '{prefix}.102'.format(prefix=self._PREFIX))]
222
223 res = self.sendUDPQuery(query)
224 self.assertRcodeEqual(res, dns.rcode.NOERROR)
225 self.assertAnyRRsetInAnswer(res, expected)
226
227 def testIfportupWithSomeDown(self):
228 """
229 Basic ifportup() test with some ports DOWN
230 """
231 query = dns.message.make_query('some.ifportup.example.org', 'A')
232 expected = [
233 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
234 '192.168.42.21'),
235 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
236 '{prefix}.102'.format(prefix=self._PREFIX))]
237
238 # we first expect any of the IPs as no check has been performed yet
239 res = self.sendUDPQuery(query)
240 self.assertRcodeEqual(res, dns.rcode.NOERROR)
241 self.assertAnyRRsetInAnswer(res, expected)
242
243 # the first IP should not be up so only second should be returned
244 expected = [expected[1]]
245 res = self.sendUDPQuery(query)
246 self.assertRcodeEqual(res, dns.rcode.NOERROR)
247 self.assertAnyRRsetInAnswer(res, expected)
248
249 def testIfportupWithAllDown(self):
250 """
251 Basic ifportup() test with all ports DOWN
252 """
253 query = dns.message.make_query('none.ifportup.example.org', 'A')
254 expected = [
255 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
256 '192.168.42.21'),
257 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
258 '192.168.21.42'.format(prefix=self._PREFIX))]
259
260 # we first expect any of the IPs as no check has been performed yet
261 res = self.sendUDPQuery(query)
262 self.assertRcodeEqual(res, dns.rcode.NOERROR)
263 self.assertAnyRRsetInAnswer(res, expected)
264
265 # no port should be up so we expect any
266 res = self.sendUDPQuery(query)
267 self.assertRcodeEqual(res, dns.rcode.NOERROR)
268 self.assertAnyRRsetInAnswer(res, expected)
269
270 def testIfportupWithAllDownAndAllBackupSelector(self):
271 """
272 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
273 """
274 name = 'all.noneup.ifportup.example.org.'
275 query = dns.message.make_query(name, dns.rdatatype.A)
276 expected = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.42.21', '192.168.21.42')]
277
278 res = self.sendUDPQuery(query)
279 self.assertRcodeEqual(res, dns.rcode.NOERROR)
280
281 time.sleep(1)
282 res = self.sendUDPQuery(query)
283 self.assertRcodeEqual(res, dns.rcode.NOERROR)
284 self.assertEqual(res.answer, expected)
285
286 def testIfurlup(self):
287 """
288 Basic ifurlup() test
289 """
290 reachable = [
291 '{prefix}.103'.format(prefix=self._PREFIX)
292 ]
293 unreachable = ['192.168.42.101', '192.168.42.102']
294 ips = reachable + unreachable
295 all_rrs = []
296 reachable_rrs = []
297 for ip in ips:
298 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
299 all_rrs.append(rr)
300 if ip in reachable:
301 reachable_rrs.append(rr)
302
303 query = dns.message.make_query('usa.example.org', 'A')
304 res = self.sendUDPQuery(query)
305 self.assertRcodeEqual(res, dns.rcode.NOERROR)
306 self.assertAnyRRsetInAnswer(res, all_rrs)
307
308 time.sleep(1)
309 res = self.sendUDPQuery(query)
310 self.assertRcodeEqual(res, dns.rcode.NOERROR)
311 self.assertAnyRRsetInAnswer(res, reachable_rrs)
312
313 def testIfurlupSimplified(self):
314 """
315 Basic ifurlup() test with the simplified list of ips
316 Also ensures the correct path is queried
317 """
318 reachable = [
319 '{prefix}.101'.format(prefix=self._PREFIX)
320 ]
321 unreachable = ['192.168.42.101']
322 ips = reachable + unreachable
323 all_rrs = []
324 reachable_rrs = []
325 for ip in ips:
326 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
327 all_rrs.append(rr)
328 if ip in reachable:
329 reachable_rrs.append(rr)
330
331 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
332 res = self.sendUDPQuery(query)
333 self.assertRcodeEqual(res, dns.rcode.NOERROR)
334 self.assertAnyRRsetInAnswer(res, all_rrs)
335
336 time.sleep(1)
337 res = self.sendUDPQuery(query)
338 self.assertRcodeEqual(res, dns.rcode.NOERROR)
339 self.assertAnyRRsetInAnswer(res, reachable_rrs)
340
341 def testLatlon(self):
342 """
343 Basic latlon() test
344 """
345 name = 'latlon.geo.example.org.'
346 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
347 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
348 expected = dns.rrset.from_text(name, 0,
349 dns.rdataclass.IN, 'TXT',
350 '"47.913000 -122.304200"')
351
352 res = self.sendUDPQuery(query)
353 self.assertRcodeEqual(res, dns.rcode.NOERROR)
354 self.assertRRsetInAnswer(res, expected)
355
356 def testLatlonloc(self):
357 """
358 Basic latlonloc() test
359 """
360 name = 'latlonloc.geo.example.org.'
361 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
362 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
363 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
364 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
365
366 res = self.sendUDPQuery(query)
367 self.assertRcodeEqual(res, dns.rcode.NOERROR)
368 self.assertRRsetInAnswer(res, expected)
369
370 def testWildcardError(self):
371 """
372 Ensure errors coming from LUA wildcards are reported
373 """
374 query = dns.message.make_query('failure.magic.example.org', 'A')
375
376 res = self.sendUDPQuery(query)
377 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
378 self.assertAnswerEmpty(res)
379
380 def testClosestMagic(self):
381 """
382 Basic closestMagic() test
383 """
384 name = 'www-balanced.example.org.'
385 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
386 queries = [
387 ('1.1.1.0', 24, '1.1.1.3'),
388 ('1.2.3.0', 24, '1.2.3.5'),
389 ('17.1.0.0', 16, '17.1.2.4')
390 ]
391
392 for (subnet, mask, ip) in queries:
393 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
394 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
395
396 response = dns.message.make_response(query)
397
398 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
399 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
400
401 res = self.sendUDPQuery(query)
402 self.assertRcodeEqual(res, dns.rcode.NOERROR)
403 self.assertEqual(res.answer, response.answer)
404
405 def testAsnum(self):
406 """
407 Basic asnum() test
408 """
409 queries = [
410 ('1.1.1.0', 24, '"true"'),
411 ('1.2.3.0', 24, '"false"'),
412 ('17.1.0.0', 16, '"false"')
413 ]
414 name = 'asnum.geo.example.org.'
415 for (subnet, mask, txt) in queries:
416 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
417 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
418 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
419
420 res = self.sendUDPQuery(query)
421 self.assertRcodeEqual(res, dns.rcode.NOERROR)
422 self.assertRRsetInAnswer(res, expected)
423
424 def testCountry(self):
425 """
426 Basic country() test
427 """
428 queries = [
429 ('1.1.1.0', 24, '"false"'),
430 ('1.2.3.0', 24, '"true"'),
431 ('17.1.0.0', 16, '"false"')
432 ]
433 name = 'country.geo.example.org.'
434 for (subnet, mask, txt) in queries:
435 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
436 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
437 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
438
439 res = self.sendUDPQuery(query)
440 self.assertRcodeEqual(res, dns.rcode.NOERROR)
441 self.assertRRsetInAnswer(res, expected)
442
443 def testContinent(self):
444 """
445 Basic continent() test
446 """
447 queries = [
448 ('1.1.1.0', 24, '"false"'),
449 ('1.2.3.0', 24, '"true"'),
450 ('17.1.0.0', 16, '"false"')
451 ]
452 name = 'continent.geo.example.org.'
453 for (subnet, mask, txt) in queries:
454 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
455 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
456 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
457
458 res = self.sendUDPQuery(query)
459 self.assertRcodeEqual(res, dns.rcode.NOERROR)
460 self.assertRRsetInAnswer(res, expected)
461
462 def testClosest(self):
463 """
464 Basic pickclosest() test
465 """
466 queries = [
467 ('1.1.1.0', 24, '1.1.1.2'),
468 ('1.2.3.0', 24, '1.2.3.4'),
469 ('17.1.0.0', 16, '1.1.1.2')
470 ]
471 name = 'closest.geo.example.org.'
472 for (subnet, mask, ip) in queries:
473 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
474 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
475 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
476
477 res = self.sendUDPQuery(query)
478 self.assertRcodeEqual(res, dns.rcode.NOERROR)
479 self.assertRRsetInAnswer(res, expected)
480
481 def testNetmask(self):
482 """
483 Basic netmask() test
484 """
485 queries = [
486 {
487 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
488 dns.rdataclass.IN, 'TXT',
489 '"true"'),
490 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
491 },
492 {
493 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
494 dns.rdataclass.IN, 'TXT',
495 '"false"'),
496 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
497 }
498 ]
499 for query in queries :
500 res = self.sendUDPQuery(query['query'])
501 self.assertRcodeEqual(res, dns.rcode.NOERROR)
502 self.assertRRsetInAnswer(res, query['expected'])
503
504 def testView(self):
505 """
506 Basic view() test
507 """
508 queries = [
509 {
510 'expected': dns.rrset.from_text('view.example.org.', 0,
511 dns.rdataclass.IN, 'A',
512 '{prefix}.54'.format(prefix=self._PREFIX)),
513 'query': dns.message.make_query('view.example.org', 'A')
514 },
515 {
516 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
517 dns.rdataclass.IN, 'TXT',
518 '"else"'),
519 'query': dns.message.make_query('txt.view.example.org', 'TXT')
520 }
521 ]
522 for query in queries :
523 res = self.sendUDPQuery(query['query'])
524 self.assertRcodeEqual(res, dns.rcode.NOERROR)
525 self.assertRRsetInAnswer(res, query['expected'])
526
527 def testViewNoMatch(self):
528 """
529 view() test where no netmask match
530 """
531 query = dns.message.make_query('none.view.example.org', 'A')
532
533 res = self.sendUDPQuery(query)
534 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
535 self.assertAnswerEmpty(res)
536
537 def testWHashed(self):
538 """
539 Basic pickwhashed() test with a set of A records
540 As the `bestwho` is hashed, we should always get the same answer
541 """
542 expected = [dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
543 dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
544 query = dns.message.make_query('whashed.example.org', 'A')
545
546 first = self.sendUDPQuery(query)
547 self.assertRcodeEqual(first, dns.rcode.NOERROR)
548 self.assertAnyRRsetInAnswer(first, expected)
549 for _ in range(5):
550 res = self.sendUDPQuery(query)
551 self.assertRcodeEqual(res, dns.rcode.NOERROR)
552 self.assertRRsetInAnswer(res, first.answer[0])
553
554 def testTimeout(self):
555 """
556 Test if LUA scripts are aborted if script execution takes too long
557 """
558 query = dns.message.make_query('timeout.example.org', 'A')
559
560 first = self.sendUDPQuery(query)
561 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
562
563
564 def testA(self):
565 """
566 Test A query against `any`
567 """
568 name = 'any.example.org.'
569
570 query = dns.message.make_query(name, 'A')
571
572 response = dns.message.make_response(query)
573
574 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
575
576 res = self.sendUDPQuery(query)
577 self.assertRcodeEqual(res, dns.rcode.NOERROR)
578 self.assertEqual(res.answer, response.answer)
579
580 def testANY(self):
581 """
582 Test ANY query against `any`
583 """
584
585 name = 'any.example.org.'
586
587 query = dns.message.make_query(name, 'ANY')
588
589 response = dns.message.make_response(query)
590
591 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
592 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
593
594 res = self.sendUDPQuery(query)
595 self.assertRcodeEqual(res, dns.rcode.NOERROR)
596 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
597
598 def testResolve(self):
599 """
600 Test resolve() function
601 """
602 name = 'resolve.example.org.'
603
604 query = dns.message.make_query(name, 'A')
605
606 response = dns.message.make_response(query)
607
608 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '127.0.0.1'))
609
610 res = self.sendUDPQuery(query)
611 self.assertRcodeEqual(res, dns.rcode.NOERROR)
612 self.assertEqual(res.answer, response.answer)
613
614 if __name__ == '__main__':
615 unittest.main()
616 exit(0)