]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
Merge pull request #6967 from gibson042/2018-08-trailing-data
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
1 #!/usr/bin/env python
2 import unittest
3 import requests
4 import threading
5 import dns
6 import time
7 import clientsubnetoption
8
9 from authtests import AuthTest
10
11 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
12
13 class FakeHTTPServer(BaseHTTPRequestHandler):
14 def _set_headers(self):
15 self.send_response(200)
16 self.send_header('Content-type', 'text/html')
17 self.end_headers()
18
19 def do_GET(self):
20 self._set_headers()
21 if (self.path == '/ping.json'):
22 self.wfile.write('{"ping":"pong"}')
23 else:
24 self.wfile.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
25
26 def log_message(self, format, *args):
27 return
28
29 def do_HEAD(self):
30 self._set_headers()
31
32 class TestLuaRecords(AuthTest):
33 _zones = {
34 'example.org': """
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
40
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
44
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
48 all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
49
50 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
51 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
53 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
54 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
55 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
56 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
57 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
58
59 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
60 "EUWips={{'{prefix}.101','{prefix}.102'}} "
61 "EUEips={{'192.168.42.101','192.168.42.102'}} "
62 "NLips={{'{prefix}.111', '{prefix}.112'}} "
63 "USAips={{'{prefix}.103'}} ")
64
65 usa IN LUA A ( ";include('config') "
66 "return ifurlup('http://www.lua.org:8080/', "
67 "{{USAips, EUEips}}, settings) ")
68
69 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
70 "{{ '192.168.42.101', '{prefix}.101' }}, "
71 "{{ stringmatch='pong' }}) ")
72
73 eu-west IN LUA A ( ";include('config') "
74 "return ifurlup('http://www.lua.org:8080/', "
75 "{{EUWips, EUEips, USAips}}, settings) ")
76
77 nl IN LUA A ( ";include('config') "
78 "return ifportup(8081, NLips) ")
79 latlon.geo IN LUA TXT "latlon()"
80 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
81 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
82 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
83 latlonloc.geo IN LUA TXT "latlonloc()"
84
85 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
86 "then return 'true' "
87 "else return 'false' end " )
88 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
89 "then return 'true' "
90 "else return 'false' end " )
91
92 view IN LUA A ("view({{ "
93 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
94 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
95 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
96 " }}) " )
97 txt.view IN LUA TXT ("view({{ "
98 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
99 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
100 " }}) " )
101 none.view IN LUA A ("view({{ "
102 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
103 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
104 " }}) " )
105 *.magic IN LUA A "closestMagic()"
106 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
107
108 any IN LUA A "'192.0.2.1'"
109 any IN TXT "hello there"
110
111 """,
112 }
113 _web_rrsets = []
114
115 @classmethod
116 def startResponders(cls):
117 webserver = threading.Thread(name='HTTP Listener',
118 target=cls.HTTPResponder,
119 args=[8080]
120 )
121 webserver.setDaemon(True)
122 webserver.start()
123
124 @classmethod
125 def HTTPResponder(cls, port):
126 server_address = ('', port)
127 httpd = HTTPServer(server_address, FakeHTTPServer)
128 httpd.serve_forever()
129
130 @classmethod
131 def setUpClass(cls):
132
133 super(TestLuaRecords, cls).setUpClass()
134
135 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
136 '{prefix}.101'.format(prefix=cls._PREFIX)),
137 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
138 '{prefix}.102'.format(prefix=cls._PREFIX)),
139 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
140 '{prefix}.103'.format(prefix=cls._PREFIX))
141 ]
142
143 def testPickRandom(self):
144 """
145 Basic pickrandom() test with a set of A records
146 """
147 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
148 '{prefix}.101'.format(prefix=self._PREFIX)),
149 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
150 '{prefix}.102'.format(prefix=self._PREFIX))]
151 query = dns.message.make_query('rand.example.org', 'A')
152
153 res = self.sendUDPQuery(query)
154 self.assertRcodeEqual(res, dns.rcode.NOERROR)
155 self.assertAnyRRsetInAnswer(res, expected)
156
157 def testBogusV6PickRandom(self):
158 """
159 Test a bogus AAAA pickrandom() record with a set of v4 addr
160 """
161 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
162
163 res = self.sendUDPQuery(query)
164 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
165
166 def testV6PickRandom(self):
167 """
168 Test pickrandom() AAAA record
169 """
170 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
171 '2001:db8:a0b:12f0::1'),
172 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
173 'fe80::2a1:9bff:fe9b:f268')]
174 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
175
176 res = self.sendUDPQuery(query)
177 self.assertRcodeEqual(res, dns.rcode.NOERROR)
178 self.assertAnyRRsetInAnswer(res, expected)
179
180 def testEmptyRandom(self):
181 """
182 Basic pickrandom() test with an empty set
183 """
184 query = dns.message.make_query('empty.rand.example.org', 'A')
185
186 res = self.sendUDPQuery(query)
187 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
188
189 def testWRandom(self):
190 """
191 Basic pickwrandom() test with a set of A records
192 """
193 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
194 '{prefix}.103'.format(prefix=self._PREFIX)),
195 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
196 '{prefix}.102'.format(prefix=self._PREFIX))]
197 query = dns.message.make_query('wrand.example.org', 'A')
198
199 res = self.sendUDPQuery(query)
200 self.assertRcodeEqual(res, dns.rcode.NOERROR)
201 self.assertAnyRRsetInAnswer(res, expected)
202
203 def testIfportup(self):
204 """
205 Basic ifportup() test
206 """
207 query = dns.message.make_query('all.ifportup.example.org', 'A')
208 expected = [
209 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
210 '{prefix}.101'.format(prefix=self._PREFIX)),
211 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
212 '{prefix}.102'.format(prefix=self._PREFIX))]
213
214 res = self.sendUDPQuery(query)
215 self.assertRcodeEqual(res, dns.rcode.NOERROR)
216 self.assertAnyRRsetInAnswer(res, expected)
217
218 def testIfportupWithSomeDown(self):
219 """
220 Basic ifportup() test with some ports DOWN
221 """
222 query = dns.message.make_query('some.ifportup.example.org', 'A')
223 expected = [
224 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
225 '192.168.42.21'),
226 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
227 '{prefix}.102'.format(prefix=self._PREFIX))]
228
229 # we first expect any of the IPs as no check has been performed yet
230 res = self.sendUDPQuery(query)
231 self.assertRcodeEqual(res, dns.rcode.NOERROR)
232 self.assertAnyRRsetInAnswer(res, expected)
233
234 # the first IP should not be up so only second shoud be returned
235 expected = [expected[1]]
236 res = self.sendUDPQuery(query)
237 self.assertRcodeEqual(res, dns.rcode.NOERROR)
238 self.assertAnyRRsetInAnswer(res, expected)
239
240 def testIfportupWithAllDown(self):
241 """
242 Basic ifportup() test with all ports DOWN
243 """
244 query = dns.message.make_query('none.ifportup.example.org', 'A')
245 expected = [
246 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
247 '192.168.42.21'),
248 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
249 '192.168.21.42'.format(prefix=self._PREFIX))]
250
251 # we first expect any of the IPs as no check has been performed yet
252 res = self.sendUDPQuery(query)
253 self.assertRcodeEqual(res, dns.rcode.NOERROR)
254 self.assertAnyRRsetInAnswer(res, expected)
255
256 # no port should be up so we expect any
257 res = self.sendUDPQuery(query)
258 self.assertRcodeEqual(res, dns.rcode.NOERROR)
259 self.assertAnyRRsetInAnswer(res, expected)
260
261 def testIfportupWithAllDownAndAllBackupSelector(self):
262 """
263 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
264 """
265 name = 'all.noneup.ifportup.example.org.'
266 query = dns.message.make_query(name, dns.rdatatype.A)
267 expected = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.168.42.21', '192.168.21.42')]
268
269 res = self.sendUDPQuery(query)
270 self.assertRcodeEqual(res, dns.rcode.NOERROR)
271
272 time.sleep(1)
273 res = self.sendUDPQuery(query)
274 self.assertRcodeEqual(res, dns.rcode.NOERROR)
275 self.assertEqual(res.answer, expected)
276
277 def testIfurlup(self):
278 """
279 Basic ifurlup() test
280 """
281 reachable = [
282 '{prefix}.103'.format(prefix=self._PREFIX)
283 ]
284 unreachable = ['192.168.42.101', '192.168.42.102']
285 ips = reachable + unreachable
286 all_rrs = []
287 reachable_rrs = []
288 for ip in ips:
289 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
290 all_rrs.append(rr)
291 if ip in reachable:
292 reachable_rrs.append(rr)
293
294 query = dns.message.make_query('usa.example.org', 'A')
295 res = self.sendUDPQuery(query)
296 self.assertRcodeEqual(res, dns.rcode.NOERROR)
297 self.assertAnyRRsetInAnswer(res, all_rrs)
298
299 time.sleep(1)
300 res = self.sendUDPQuery(query)
301 self.assertRcodeEqual(res, dns.rcode.NOERROR)
302 self.assertAnyRRsetInAnswer(res, reachable_rrs)
303
304 def testIfurlupSimplified(self):
305 """
306 Basic ifurlup() test with the simplified list of ips
307 Also ensures the correct path is queried
308 """
309 reachable = [
310 '{prefix}.101'.format(prefix=self._PREFIX)
311 ]
312 unreachable = ['192.168.42.101']
313 ips = reachable + unreachable
314 all_rrs = []
315 reachable_rrs = []
316 for ip in ips:
317 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
318 all_rrs.append(rr)
319 if ip in reachable:
320 reachable_rrs.append(rr)
321
322 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
323 res = self.sendUDPQuery(query)
324 self.assertRcodeEqual(res, dns.rcode.NOERROR)
325 self.assertAnyRRsetInAnswer(res, all_rrs)
326
327 time.sleep(1)
328 res = self.sendUDPQuery(query)
329 self.assertRcodeEqual(res, dns.rcode.NOERROR)
330 self.assertAnyRRsetInAnswer(res, reachable_rrs)
331
332 def testLatlon(self):
333 """
334 Basic latlon() test
335 """
336 name = 'latlon.geo.example.org.'
337 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
338 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
339 expected = dns.rrset.from_text(name, 0,
340 dns.rdataclass.IN, 'TXT',
341 '"47.913000 -122.304200"')
342
343 res = self.sendUDPQuery(query)
344 self.assertRcodeEqual(res, dns.rcode.NOERROR)
345 self.assertRRsetInAnswer(res, expected)
346
347 def testLatlonloc(self):
348 """
349 Basic latlonloc() test
350 """
351 name = 'latlonloc.geo.example.org.'
352 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
353 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
354 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
355 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
356
357 res = self.sendUDPQuery(query)
358 self.assertRcodeEqual(res, dns.rcode.NOERROR)
359 self.assertRRsetInAnswer(res, expected)
360
361 def testWildcardError(self):
362 """
363 Ensure errors coming from LUA wildcards are reported
364 """
365 query = dns.message.make_query('failure.magic.example.org', 'A')
366
367 res = self.sendUDPQuery(query)
368 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
369 self.assertAnswerEmpty(res)
370
371 def testClosestMagic(self):
372 """
373 Basic closestMagic() test
374 """
375 name = 'www-balanced.example.org.'
376 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
377 queries = [
378 ('1.1.1.0', 24, '1.1.1.3'),
379 ('1.2.3.0', 24, '1.2.3.5'),
380 ('17.1.0.0', 16, '17.1.2.4')
381 ]
382
383 for (subnet, mask, ip) in queries:
384 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
385 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
386
387 response = dns.message.make_response(query)
388
389 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
390 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
391
392 res = self.sendUDPQuery(query)
393 self.assertRcodeEqual(res, dns.rcode.NOERROR)
394 self.assertEqual(res.answer, response.answer)
395
396 def testAsnum(self):
397 """
398 Basic asnum() test
399 """
400 queries = [
401 ('1.1.1.0', 24, '"true"'),
402 ('1.2.3.0', 24, '"false"'),
403 ('17.1.0.0', 16, '"false"')
404 ]
405 name = 'asnum.geo.example.org.'
406 for (subnet, mask, txt) in queries:
407 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
408 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
409 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
410
411 res = self.sendUDPQuery(query)
412 self.assertRcodeEqual(res, dns.rcode.NOERROR)
413 self.assertRRsetInAnswer(res, expected)
414
415 def testCountry(self):
416 """
417 Basic country() test
418 """
419 queries = [
420 ('1.1.1.0', 24, '"false"'),
421 ('1.2.3.0', 24, '"true"'),
422 ('17.1.0.0', 16, '"false"')
423 ]
424 name = 'country.geo.example.org.'
425 for (subnet, mask, txt) in queries:
426 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
427 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
428 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
429
430 res = self.sendUDPQuery(query)
431 self.assertRcodeEqual(res, dns.rcode.NOERROR)
432 self.assertRRsetInAnswer(res, expected)
433
434 def testContinent(self):
435 """
436 Basic continent() test
437 """
438 queries = [
439 ('1.1.1.0', 24, '"false"'),
440 ('1.2.3.0', 24, '"true"'),
441 ('17.1.0.0', 16, '"false"')
442 ]
443 name = 'continent.geo.example.org.'
444 for (subnet, mask, txt) in queries:
445 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
446 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
447 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
448
449 res = self.sendUDPQuery(query)
450 self.assertRcodeEqual(res, dns.rcode.NOERROR)
451 self.assertRRsetInAnswer(res, expected)
452
453 def testClosest(self):
454 """
455 Basic pickclosest() test
456 """
457 queries = [
458 ('1.1.1.0', 24, '1.1.1.2'),
459 ('1.2.3.0', 24, '1.2.3.4'),
460 ('17.1.0.0', 16, '1.1.1.2')
461 ]
462 name = 'closest.geo.example.org.'
463 for (subnet, mask, ip) in queries:
464 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
465 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
466 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
467
468 res = self.sendUDPQuery(query)
469 self.assertRcodeEqual(res, dns.rcode.NOERROR)
470 self.assertRRsetInAnswer(res, expected)
471
472 def testNetmask(self):
473 """
474 Basic netmask() test
475 """
476 queries = [
477 {
478 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
479 dns.rdataclass.IN, 'TXT',
480 '"true"'),
481 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
482 },
483 {
484 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
485 dns.rdataclass.IN, 'TXT',
486 '"false"'),
487 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
488 }
489 ]
490 for query in queries :
491 res = self.sendUDPQuery(query['query'])
492 self.assertRcodeEqual(res, dns.rcode.NOERROR)
493 self.assertRRsetInAnswer(res, query['expected'])
494
495 def testView(self):
496 """
497 Basic view() test
498 """
499 queries = [
500 {
501 'expected': dns.rrset.from_text('view.example.org.', 0,
502 dns.rdataclass.IN, 'A',
503 '{prefix}.54'.format(prefix=self._PREFIX)),
504 'query': dns.message.make_query('view.example.org', 'A')
505 },
506 {
507 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
508 dns.rdataclass.IN, 'TXT',
509 '"else"'),
510 'query': dns.message.make_query('txt.view.example.org', 'TXT')
511 }
512 ]
513 for query in queries :
514 res = self.sendUDPQuery(query['query'])
515 self.assertRcodeEqual(res, dns.rcode.NOERROR)
516 self.assertRRsetInAnswer(res, query['expected'])
517
518 def testViewNoMatch(self):
519 """
520 view() test where no netmask match
521 """
522 query = dns.message.make_query('none.view.example.org', 'A')
523
524 res = self.sendUDPQuery(query)
525 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
526 self.assertAnswerEmpty(res)
527
528 def testWHashed(self):
529 """
530 Basic pickwhashed() test with a set of A records
531 As the `bestwho` is hashed, we should always get the same answer
532 """
533 expected = [dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
534 dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
535 query = dns.message.make_query('whashed.example.org', 'A')
536
537 first = self.sendUDPQuery(query)
538 self.assertRcodeEqual(first, dns.rcode.NOERROR)
539 self.assertAnyRRsetInAnswer(first, expected)
540 for _ in range(5):
541 res = self.sendUDPQuery(query)
542 self.assertRcodeEqual(res, dns.rcode.NOERROR)
543 self.assertRRsetInAnswer(res, first.answer[0])
544
545 def testTimeout(self):
546 """
547 Test if LUA scripts are aborted if script execution takes too long
548 """
549 query = dns.message.make_query('timeout.example.org', 'A')
550
551 first = self.sendUDPQuery(query)
552 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
553
554
555 def testA(self):
556 """
557 Test A query against `any`
558 """
559 name = 'any.example.org.'
560
561 query = dns.message.make_query(name, 'A')
562
563 response = dns.message.make_response(query)
564
565 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
566
567 res = self.sendUDPQuery(query)
568 self.assertRcodeEqual(res, dns.rcode.NOERROR)
569 self.assertEqual(res.answer, response.answer)
570
571 def testANY(self):
572 """
573 Test ANY query against `any`
574 """
575
576 name = 'any.example.org.'
577
578 query = dns.message.make_query(name, 'ANY')
579
580 response = dns.message.make_response(query)
581
582 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
583 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
584
585 res = self.sendUDPQuery(query)
586 self.assertRcodeEqual(res, dns.rcode.NOERROR)
587 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
588
589 if __name__ == '__main__':
590 unittest.main()
591 exit(0)