]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.auth-py/test_LuaRecords.py
Merge pull request #7908 from omoerbeek/rec-4.1.14-changelog
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
CommitLineData
1bc56192
CHB
1#!/usr/bin/env python
2import unittest
3import requests
4import threading
5import dns
6import time
847e724e 7import clientsubnetoption
1bc56192
CHB
8
9from authtests import AuthTest
10
11from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
12
13class FakeHTTPServer(BaseHTTPRequestHandler):
14 def _set_headers(self):
15 self.send_response(200)
16 self.send_header('Content-type', 'text/html')
17 self.end_headers()
18
19 def do_GET(self):
20 self._set_headers()
21 if (self.path == '/ping.json'):
22 self.wfile.write('{"ping":"pong"}')
23 else:
24 self.wfile.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
25
26 def log_message(self, format, *args):
27 return
28
29 def do_HEAD(self):
30 self._set_headers()
31
32class TestLuaRecords(AuthTest):
06c14069
PL
33 _config_template = """
34geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
35edns-subnet-processing=yes
36launch=bind geoip
37any-to-tcp=no
38"""
39
1bc56192
CHB
40 _zones = {
41 'example.org': """
42example.org. 3600 IN SOA {soa}
43example.org. 3600 IN NS ns1.example.org.
44example.org. 3600 IN NS ns2.example.org.
45ns1.example.org. 3600 IN A {prefix}.10
46ns2.example.org. 3600 IN A {prefix}.11
47
48web1.example.org. 3600 IN A {prefix}.101
49web2.example.org. 3600 IN A {prefix}.102
50web3.example.org. 3600 IN A {prefix}.103
51
52all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
53some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
54none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
707cedf3 55all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
1bc56192
CHB
56
57whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
58rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
59v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
847e724e 61closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
1bc56192 62empty.rand.example.org. 3600 IN LUA A "pickrandom()"
5b3590c8 63timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
1bc56192
CHB
64wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
65
66config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
67 "EUWips={{'{prefix}.101','{prefix}.102'}} "
68 "EUEips={{'192.168.42.101','192.168.42.102'}} "
69 "NLips={{'{prefix}.111', '{prefix}.112'}} "
70 "USAips={{'{prefix}.103'}} ")
71
72usa IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{USAips, EUEips}}, settings) ")
75
76mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
77 "{{ '192.168.42.101', '{prefix}.101' }}, "
78 "{{ stringmatch='pong' }}) ")
79
80eu-west IN LUA A ( ";include('config') "
81 "return ifurlup('http://www.lua.org:8080/', "
82 "{{EUWips, EUEips, USAips}}, settings) ")
83
84nl IN LUA A ( ";include('config') "
85 "return ifportup(8081, NLips) ")
86latlon.geo IN LUA TXT "latlon()"
847e724e
CHB
87continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
88asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
89country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
1bc56192
CHB
90latlonloc.geo IN LUA TXT "latlonloc()"
91
92true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
93 "then return 'true' "
94 "else return 'false' end " )
95false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
96 "then return 'true' "
97 "else return 'false' end " )
98
99view IN LUA A ("view({{ "
100 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
101 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
102 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
103 " }}) " )
104txt.view IN LUA TXT ("view({{ "
105 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
106 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
107 " }}) " )
108none.view IN LUA A ("view({{ "
109 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
110 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
111 " }}) " )
847e724e
CHB
112*.magic IN LUA A "closestMagic()"
113www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
658417e4
PD
114
115any IN LUA A "'192.0.2.1'"
116any IN TXT "hello there"
117
1bc56192
CHB
118 """,
119 }
120 _web_rrsets = []
121
122 @classmethod
123 def startResponders(cls):
124 webserver = threading.Thread(name='HTTP Listener',
125 target=cls.HTTPResponder,
126 args=[8080]
127 )
128 webserver.setDaemon(True)
129 webserver.start()
130
131 @classmethod
132 def HTTPResponder(cls, port):
133 server_address = ('', port)
134 httpd = HTTPServer(server_address, FakeHTTPServer)
135 httpd.serve_forever()
136
137 @classmethod
138 def setUpClass(cls):
139
140 super(TestLuaRecords, cls).setUpClass()
141
142 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
143 '{prefix}.101'.format(prefix=cls._PREFIX)),
144 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
145 '{prefix}.102'.format(prefix=cls._PREFIX)),
146 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
147 '{prefix}.103'.format(prefix=cls._PREFIX))
148 ]
149
150 def testPickRandom(self):
151 """
152 Basic pickrandom() test with a set of A records
153 """
154 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
155 '{prefix}.101'.format(prefix=self._PREFIX)),
156 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
157 '{prefix}.102'.format(prefix=self._PREFIX))]
158 query = dns.message.make_query('rand.example.org', 'A')
159
160 res = self.sendUDPQuery(query)
161 self.assertRcodeEqual(res, dns.rcode.NOERROR)
162 self.assertAnyRRsetInAnswer(res, expected)
163
164 def testBogusV6PickRandom(self):
165 """
166 Test a bogus AAAA pickrandom() record with a set of v4 addr
167 """
168 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
169
170 res = self.sendUDPQuery(query)
171 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
172
173 def testV6PickRandom(self):
174 """
175 Test pickrandom() AAAA record
176 """
177 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
178 '2001:db8:a0b:12f0::1'),
179 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
180 'fe80::2a1:9bff:fe9b:f268')]
181 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
182
183 res = self.sendUDPQuery(query)
184 self.assertRcodeEqual(res, dns.rcode.NOERROR)
185 self.assertAnyRRsetInAnswer(res, expected)
186
187 def testEmptyRandom(self):
188 """
189 Basic pickrandom() test with an empty set
190 """
191 query = dns.message.make_query('empty.rand.example.org', 'A')
192
193 res = self.sendUDPQuery(query)
194 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
195
196 def testWRandom(self):
197 """
198 Basic pickwrandom() test with a set of A records
199 """
200 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
201 '{prefix}.103'.format(prefix=self._PREFIX)),
202 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
203 '{prefix}.102'.format(prefix=self._PREFIX))]
204 query = dns.message.make_query('wrand.example.org', 'A')
205
206 res = self.sendUDPQuery(query)
207 self.assertRcodeEqual(res, dns.rcode.NOERROR)
208 self.assertAnyRRsetInAnswer(res, expected)
209
1bc56192
CHB
210 def testIfportup(self):
211 """
212 Basic ifportup() test
213 """
214 query = dns.message.make_query('all.ifportup.example.org', 'A')
215 expected = [
216 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
217 '{prefix}.101'.format(prefix=self._PREFIX)),
218 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
219 '{prefix}.102'.format(prefix=self._PREFIX))]
220
221 res = self.sendUDPQuery(query)
222 self.assertRcodeEqual(res, dns.rcode.NOERROR)
223 self.assertAnyRRsetInAnswer(res, expected)
224
225 def testIfportupWithSomeDown(self):
226 """
227 Basic ifportup() test with some ports DOWN
228 """
229 query = dns.message.make_query('some.ifportup.example.org', 'A')
230 expected = [
231 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
232 '192.168.42.21'),
233 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
234 '{prefix}.102'.format(prefix=self._PREFIX))]
235
236 # we first expect any of the IPs as no check has been performed yet
237 res = self.sendUDPQuery(query)
238 self.assertRcodeEqual(res, dns.rcode.NOERROR)
239 self.assertAnyRRsetInAnswer(res, expected)
240
241 # the first IP should not be up so only second shoud be returned
242 expected = [expected[1]]
243 res = self.sendUDPQuery(query)
244 self.assertRcodeEqual(res, dns.rcode.NOERROR)
245 self.assertAnyRRsetInAnswer(res, expected)
246
247 def testIfportupWithAllDown(self):
248 """
249 Basic ifportup() test with all ports DOWN
250 """
251 query = dns.message.make_query('none.ifportup.example.org', 'A')
252 expected = [
253 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
254 '192.168.42.21'),
255 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
256 '192.168.21.42'.format(prefix=self._PREFIX))]
257
258 # we first expect any of the IPs as no check has been performed yet
259 res = self.sendUDPQuery(query)
260 self.assertRcodeEqual(res, dns.rcode.NOERROR)
261 self.assertAnyRRsetInAnswer(res, expected)
262
263 # no port should be up so we expect any
264 res = self.sendUDPQuery(query)
265 self.assertRcodeEqual(res, dns.rcode.NOERROR)
266 self.assertAnyRRsetInAnswer(res, expected)
267
707cedf3
CHB
268 def testIfportupWithAllDownAndAllBackupSelector(self):
269 """
270 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
271 """
272 name = 'all.noneup.ifportup.example.org.'
273 query = dns.message.make_query(name, dns.rdatatype.A)
274 expected = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.42.21', '192.168.21.42')]
275
276 res = self.sendUDPQuery(query)
277 self.assertRcodeEqual(res, dns.rcode.NOERROR)
278
279 time.sleep(1)
280 res = self.sendUDPQuery(query)
281 self.assertRcodeEqual(res, dns.rcode.NOERROR)
282 self.assertEqual(res.answer, expected)
283
1bc56192
CHB
284 def testIfurlup(self):
285 """
286 Basic ifurlup() test
287 """
288 reachable = [
289 '{prefix}.103'.format(prefix=self._PREFIX)
290 ]
291 unreachable = ['192.168.42.101', '192.168.42.102']
292 ips = reachable + unreachable
293 all_rrs = []
294 reachable_rrs = []
295 for ip in ips:
296 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
297 all_rrs.append(rr)
298 if ip in reachable:
299 reachable_rrs.append(rr)
300
301 query = dns.message.make_query('usa.example.org', 'A')
302 res = self.sendUDPQuery(query)
303 self.assertRcodeEqual(res, dns.rcode.NOERROR)
304 self.assertAnyRRsetInAnswer(res, all_rrs)
305
306 time.sleep(1)
307 res = self.sendUDPQuery(query)
308 self.assertRcodeEqual(res, dns.rcode.NOERROR)
309 self.assertAnyRRsetInAnswer(res, reachable_rrs)
310
311 def testIfurlupSimplified(self):
312 """
313 Basic ifurlup() test with the simplified list of ips
314 Also ensures the correct path is queried
315 """
316 reachable = [
317 '{prefix}.101'.format(prefix=self._PREFIX)
318 ]
319 unreachable = ['192.168.42.101']
320 ips = reachable + unreachable
321 all_rrs = []
322 reachable_rrs = []
323 for ip in ips:
324 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
325 all_rrs.append(rr)
326 if ip in reachable:
327 reachable_rrs.append(rr)
328
329 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
330 res = self.sendUDPQuery(query)
331 self.assertRcodeEqual(res, dns.rcode.NOERROR)
332 self.assertAnyRRsetInAnswer(res, all_rrs)
333
334 time.sleep(1)
335 res = self.sendUDPQuery(query)
336 self.assertRcodeEqual(res, dns.rcode.NOERROR)
337 self.assertAnyRRsetInAnswer(res, reachable_rrs)
338
339 def testLatlon(self):
340 """
341 Basic latlon() test
342 """
847e724e
CHB
343 name = 'latlon.geo.example.org.'
344 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
345 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
346 expected = dns.rrset.from_text(name, 0,
1bc56192 347 dns.rdataclass.IN, 'TXT',
847e724e 348 '"47.913000 -122.304200"')
1bc56192
CHB
349
350 res = self.sendUDPQuery(query)
351 self.assertRcodeEqual(res, dns.rcode.NOERROR)
352 self.assertRRsetInAnswer(res, expected)
353
354 def testLatlonloc(self):
355 """
356 Basic latlonloc() test
357 """
847e724e
CHB
358 name = 'latlonloc.geo.example.org.'
359 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
360 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
361 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
362 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
1bc56192
CHB
363
364 res = self.sendUDPQuery(query)
365 self.assertRcodeEqual(res, dns.rcode.NOERROR)
366 self.assertRRsetInAnswer(res, expected)
367
cc5c4f6b
CHB
368 def testWildcardError(self):
369 """
370 Ensure errors coming from LUA wildcards are reported
371 """
372 query = dns.message.make_query('failure.magic.example.org', 'A')
373
374 res = self.sendUDPQuery(query)
375 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
376 self.assertAnswerEmpty(res)
377
847e724e
CHB
378 def testClosestMagic(self):
379 """
380 Basic closestMagic() test
381 """
382 name = 'www-balanced.example.org.'
383 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
384 queries = [
385 ('1.1.1.0', 24, '1.1.1.3'),
386 ('1.2.3.0', 24, '1.2.3.5'),
387 ('17.1.0.0', 16, '17.1.2.4')
388 ]
707cedf3 389
847e724e
CHB
390 for (subnet, mask, ip) in queries:
391 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
392 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
393
394 response = dns.message.make_response(query)
395
396 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
397 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
398
399 res = self.sendUDPQuery(query)
400 self.assertRcodeEqual(res, dns.rcode.NOERROR)
401 self.assertEqual(res.answer, response.answer)
402
403 def testAsnum(self):
404 """
405 Basic asnum() test
406 """
407 queries = [
408 ('1.1.1.0', 24, '"true"'),
409 ('1.2.3.0', 24, '"false"'),
410 ('17.1.0.0', 16, '"false"')
411 ]
412 name = 'asnum.geo.example.org.'
413 for (subnet, mask, txt) in queries:
414 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
415 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
416 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
417
418 res = self.sendUDPQuery(query)
419 self.assertRcodeEqual(res, dns.rcode.NOERROR)
420 self.assertRRsetInAnswer(res, expected)
421
422 def testCountry(self):
423 """
424 Basic country() test
425 """
426 queries = [
427 ('1.1.1.0', 24, '"false"'),
428 ('1.2.3.0', 24, '"true"'),
429 ('17.1.0.0', 16, '"false"')
430 ]
431 name = 'country.geo.example.org.'
432 for (subnet, mask, txt) in queries:
433 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
434 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
435 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
436
437 res = self.sendUDPQuery(query)
438 self.assertRcodeEqual(res, dns.rcode.NOERROR)
439 self.assertRRsetInAnswer(res, expected)
440
441 def testContinent(self):
442 """
443 Basic continent() test
444 """
445 queries = [
446 ('1.1.1.0', 24, '"false"'),
447 ('1.2.3.0', 24, '"true"'),
448 ('17.1.0.0', 16, '"false"')
449 ]
450 name = 'continent.geo.example.org.'
451 for (subnet, mask, txt) in queries:
452 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
453 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
454 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
455
456 res = self.sendUDPQuery(query)
457 self.assertRcodeEqual(res, dns.rcode.NOERROR)
458 self.assertRRsetInAnswer(res, expected)
459
460 def testClosest(self):
461 """
462 Basic pickclosest() test
463 """
464 queries = [
465 ('1.1.1.0', 24, '1.1.1.2'),
466 ('1.2.3.0', 24, '1.2.3.4'),
467 ('17.1.0.0', 16, '1.1.1.2')
468 ]
469 name = 'closest.geo.example.org.'
470 for (subnet, mask, ip) in queries:
471 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
472 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
473 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
474
475 res = self.sendUDPQuery(query)
476 self.assertRcodeEqual(res, dns.rcode.NOERROR)
477 self.assertRRsetInAnswer(res, expected)
478
1bc56192
CHB
479 def testNetmask(self):
480 """
481 Basic netmask() test
482 """
483 queries = [
484 {
485 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
486 dns.rdataclass.IN, 'TXT',
487 '"true"'),
488 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
489 },
490 {
491 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
492 dns.rdataclass.IN, 'TXT',
493 '"false"'),
494 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
495 }
496 ]
497 for query in queries :
498 res = self.sendUDPQuery(query['query'])
499 self.assertRcodeEqual(res, dns.rcode.NOERROR)
500 self.assertRRsetInAnswer(res, query['expected'])
501
502 def testView(self):
503 """
504 Basic view() test
505 """
506 queries = [
507 {
508 'expected': dns.rrset.from_text('view.example.org.', 0,
509 dns.rdataclass.IN, 'A',
510 '{prefix}.54'.format(prefix=self._PREFIX)),
511 'query': dns.message.make_query('view.example.org', 'A')
512 },
513 {
514 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
515 dns.rdataclass.IN, 'TXT',
516 '"else"'),
517 'query': dns.message.make_query('txt.view.example.org', 'TXT')
518 }
519 ]
520 for query in queries :
521 res = self.sendUDPQuery(query['query'])
522 self.assertRcodeEqual(res, dns.rcode.NOERROR)
523 self.assertRRsetInAnswer(res, query['expected'])
524
525 def testViewNoMatch(self):
526 """
527 view() test where no netmask match
528 """
1bc56192
CHB
529 query = dns.message.make_query('none.view.example.org', 'A')
530
531 res = self.sendUDPQuery(query)
532 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
533 self.assertAnswerEmpty(res)
534
535 def testWHashed(self):
536 """
537 Basic pickwhashed() test with a set of A records
538 As the `bestwho` is hashed, we should always get the same answer
539 """
540 expected = [dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
541 dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
542 query = dns.message.make_query('whashed.example.org', 'A')
543
544 first = self.sendUDPQuery(query)
545 self.assertRcodeEqual(first, dns.rcode.NOERROR)
546 self.assertAnyRRsetInAnswer(first, expected)
547 for _ in range(5):
548 res = self.sendUDPQuery(query)
549 self.assertRcodeEqual(res, dns.rcode.NOERROR)
550 self.assertRRsetInAnswer(res, first.answer[0])
551
af68014f
CHB
552 def testTimeout(self):
553 """
554 Test if LUA scripts are aborted if script execution takes too long
555 """
556 query = dns.message.make_query('timeout.example.org', 'A')
557
558 first = self.sendUDPQuery(query)
559 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
560
658417e4
PD
561
562 def testA(self):
563 """
564 Test A query against `any`
565 """
566 name = 'any.example.org.'
567
568 query = dns.message.make_query(name, 'A')
569
570 response = dns.message.make_response(query)
571
572 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
573
574 res = self.sendUDPQuery(query)
575 self.assertRcodeEqual(res, dns.rcode.NOERROR)
576 self.assertEqual(res.answer, response.answer)
577
578 def testANY(self):
579 """
580 Test ANY query against `any`
581 """
582
583 name = 'any.example.org.'
584
585 query = dns.message.make_query(name, 'ANY')
586
587 response = dns.message.make_response(query)
588
589 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
590 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
591
592 res = self.sendUDPQuery(query)
593 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4d4c592e 594 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
658417e4 595
1bc56192
CHB
596if __name__ == '__main__':
597 unittest.main()
598 exit(0)