]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
Merge pull request #6838 from mind04/autoserial
[thirdparty/pdns.git] / regression-tests.auth-py / test_LuaRecords.py
1 #!/usr/bin/env python
2 import unittest
3 import requests
4 import threading
5 import dns
6 import time
7 import clientsubnetoption
8
9 from authtests import AuthTest
10
11 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
12
13 class FakeHTTPServer(BaseHTTPRequestHandler):
14 def _set_headers(self):
15 self.send_response(200)
16 self.send_header('Content-type', 'text/html')
17 self.end_headers()
18
19 def do_GET(self):
20 self._set_headers()
21 if (self.path == '/ping.json'):
22 self.wfile.write('{"ping":"pong"}')
23 else:
24 self.wfile.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
25
26 def log_message(self, format, *args):
27 return
28
29 def do_HEAD(self):
30 self._set_headers()
31
32 class TestLuaRecords(AuthTest):
33 _zones = {
34 'example.org': """
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
40
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
44
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
48
49 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
50 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
51 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
53 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
54 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
55 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
56 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
57
58 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
59 "EUWips={{'{prefix}.101','{prefix}.102'}} "
60 "EUEips={{'192.168.42.101','192.168.42.102'}} "
61 "NLips={{'{prefix}.111', '{prefix}.112'}} "
62 "USAips={{'{prefix}.103'}} ")
63
64 usa IN LUA A ( ";include('config') "
65 "return ifurlup('http://www.lua.org:8080/', "
66 "{{USAips, EUEips}}, settings) ")
67
68 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
69 "{{ '192.168.42.101', '{prefix}.101' }}, "
70 "{{ stringmatch='pong' }}) ")
71
72 eu-west IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{EUWips, EUEips, USAips}}, settings) ")
75
76 nl IN LUA A ( ";include('config') "
77 "return ifportup(8081, NLips) ")
78 latlon.geo IN LUA TXT "latlon()"
79 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
80 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
81 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
82 latlonloc.geo IN LUA TXT "latlonloc()"
83
84 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
85 "then return 'true' "
86 "else return 'false' end " )
87 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
88 "then return 'true' "
89 "else return 'false' end " )
90
91 view IN LUA A ("view({{ "
92 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
93 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
94 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
95 " }}) " )
96 txt.view IN LUA TXT ("view({{ "
97 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
98 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
99 " }}) " )
100 none.view IN LUA A ("view({{ "
101 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
102 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
103 " }}) " )
104 *.magic IN LUA A "closestMagic()"
105 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
106
107 any IN LUA A "'192.0.2.1'"
108 any IN TXT "hello there"
109
110 """,
111 }
112 _web_rrsets = []
113
114 @classmethod
115 def startResponders(cls):
116 webserver = threading.Thread(name='HTTP Listener',
117 target=cls.HTTPResponder,
118 args=[8080]
119 )
120 webserver.setDaemon(True)
121 webserver.start()
122
123 @classmethod
124 def HTTPResponder(cls, port):
125 server_address = ('', port)
126 httpd = HTTPServer(server_address, FakeHTTPServer)
127 httpd.serve_forever()
128
129 @classmethod
130 def setUpClass(cls):
131
132 super(TestLuaRecords, cls).setUpClass()
133
134 cls._web_rrsets = [dns.rrset.from_text('web1.example.org.', 0, dns.rdataclass.IN, 'A',
135 '{prefix}.101'.format(prefix=cls._PREFIX)),
136 dns.rrset.from_text('web2.example.org.', 0, dns.rdataclass.IN, 'A',
137 '{prefix}.102'.format(prefix=cls._PREFIX)),
138 dns.rrset.from_text('web3.example.org.', 0, dns.rdataclass.IN, 'A',
139 '{prefix}.103'.format(prefix=cls._PREFIX))
140 ]
141
142 def testPickRandom(self):
143 """
144 Basic pickrandom() test with a set of A records
145 """
146 expected = [dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
147 '{prefix}.101'.format(prefix=self._PREFIX)),
148 dns.rrset.from_text('rand.example.org.', 0, dns.rdataclass.IN, 'A',
149 '{prefix}.102'.format(prefix=self._PREFIX))]
150 query = dns.message.make_query('rand.example.org', 'A')
151
152 res = self.sendUDPQuery(query)
153 self.assertRcodeEqual(res, dns.rcode.NOERROR)
154 self.assertAnyRRsetInAnswer(res, expected)
155
156 def testBogusV6PickRandom(self):
157 """
158 Test a bogus AAAA pickrandom() record with a set of v4 addr
159 """
160 query = dns.message.make_query('v6-bogus.rand.example.org', 'AAAA')
161
162 res = self.sendUDPQuery(query)
163 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
164
165 def testV6PickRandom(self):
166 """
167 Test pickrandom() AAAA record
168 """
169 expected = [dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
170 '2001:db8:a0b:12f0::1'),
171 dns.rrset.from_text('v6.rand.example.org.', 0, dns.rdataclass.IN, 'AAAA',
172 'fe80::2a1:9bff:fe9b:f268')]
173 query = dns.message.make_query('v6.rand.example.org', 'AAAA')
174
175 res = self.sendUDPQuery(query)
176 self.assertRcodeEqual(res, dns.rcode.NOERROR)
177 self.assertAnyRRsetInAnswer(res, expected)
178
179 def testEmptyRandom(self):
180 """
181 Basic pickrandom() test with an empty set
182 """
183 query = dns.message.make_query('empty.rand.example.org', 'A')
184
185 res = self.sendUDPQuery(query)
186 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
187
188 def testWRandom(self):
189 """
190 Basic pickwrandom() test with a set of A records
191 """
192 expected = [dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
193 '{prefix}.103'.format(prefix=self._PREFIX)),
194 dns.rrset.from_text('wrand.example.org.', 0, dns.rdataclass.IN, 'A',
195 '{prefix}.102'.format(prefix=self._PREFIX))]
196 query = dns.message.make_query('wrand.example.org', 'A')
197
198 res = self.sendUDPQuery(query)
199 self.assertRcodeEqual(res, dns.rcode.NOERROR)
200 self.assertAnyRRsetInAnswer(res, expected)
201
202 def testIfportup(self):
203 """
204 Basic ifportup() test
205 """
206 query = dns.message.make_query('all.ifportup.example.org', 'A')
207 expected = [
208 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
209 '{prefix}.101'.format(prefix=self._PREFIX)),
210 dns.rrset.from_text('all.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
211 '{prefix}.102'.format(prefix=self._PREFIX))]
212
213 res = self.sendUDPQuery(query)
214 self.assertRcodeEqual(res, dns.rcode.NOERROR)
215 self.assertAnyRRsetInAnswer(res, expected)
216
217 def testIfportupWithSomeDown(self):
218 """
219 Basic ifportup() test with some ports DOWN
220 """
221 query = dns.message.make_query('some.ifportup.example.org', 'A')
222 expected = [
223 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
224 '192.168.42.21'),
225 dns.rrset.from_text('some.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
226 '{prefix}.102'.format(prefix=self._PREFIX))]
227
228 # we first expect any of the IPs as no check has been performed yet
229 res = self.sendUDPQuery(query)
230 self.assertRcodeEqual(res, dns.rcode.NOERROR)
231 self.assertAnyRRsetInAnswer(res, expected)
232
233 # the first IP should not be up so only second shoud be returned
234 expected = [expected[1]]
235 res = self.sendUDPQuery(query)
236 self.assertRcodeEqual(res, dns.rcode.NOERROR)
237 self.assertAnyRRsetInAnswer(res, expected)
238
239 def testIfportupWithAllDown(self):
240 """
241 Basic ifportup() test with all ports DOWN
242 """
243 query = dns.message.make_query('none.ifportup.example.org', 'A')
244 expected = [
245 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
246 '192.168.42.21'),
247 dns.rrset.from_text('none.ifportup.example.org.', 0, dns.rdataclass.IN, 'A',
248 '192.168.21.42'.format(prefix=self._PREFIX))]
249
250 # we first expect any of the IPs as no check has been performed yet
251 res = self.sendUDPQuery(query)
252 self.assertRcodeEqual(res, dns.rcode.NOERROR)
253 self.assertAnyRRsetInAnswer(res, expected)
254
255 # no port should be up so we expect any
256 res = self.sendUDPQuery(query)
257 self.assertRcodeEqual(res, dns.rcode.NOERROR)
258 self.assertAnyRRsetInAnswer(res, expected)
259
260 def testIfurlup(self):
261 """
262 Basic ifurlup() test
263 """
264 reachable = [
265 '{prefix}.103'.format(prefix=self._PREFIX)
266 ]
267 unreachable = ['192.168.42.101', '192.168.42.102']
268 ips = reachable + unreachable
269 all_rrs = []
270 reachable_rrs = []
271 for ip in ips:
272 rr = dns.rrset.from_text('usa.example.org.', 0, dns.rdataclass.IN, 'A', ip)
273 all_rrs.append(rr)
274 if ip in reachable:
275 reachable_rrs.append(rr)
276
277 query = dns.message.make_query('usa.example.org', 'A')
278 res = self.sendUDPQuery(query)
279 self.assertRcodeEqual(res, dns.rcode.NOERROR)
280 self.assertAnyRRsetInAnswer(res, all_rrs)
281
282 time.sleep(1)
283 res = self.sendUDPQuery(query)
284 self.assertRcodeEqual(res, dns.rcode.NOERROR)
285 self.assertAnyRRsetInAnswer(res, reachable_rrs)
286
287 def testIfurlupSimplified(self):
288 """
289 Basic ifurlup() test with the simplified list of ips
290 Also ensures the correct path is queried
291 """
292 reachable = [
293 '{prefix}.101'.format(prefix=self._PREFIX)
294 ]
295 unreachable = ['192.168.42.101']
296 ips = reachable + unreachable
297 all_rrs = []
298 reachable_rrs = []
299 for ip in ips:
300 rr = dns.rrset.from_text('mix.ifurlup.example.org.', 0, dns.rdataclass.IN, 'A', ip)
301 all_rrs.append(rr)
302 if ip in reachable:
303 reachable_rrs.append(rr)
304
305 query = dns.message.make_query('mix.ifurlup.example.org', 'A')
306 res = self.sendUDPQuery(query)
307 self.assertRcodeEqual(res, dns.rcode.NOERROR)
308 self.assertAnyRRsetInAnswer(res, all_rrs)
309
310 time.sleep(1)
311 res = self.sendUDPQuery(query)
312 self.assertRcodeEqual(res, dns.rcode.NOERROR)
313 self.assertAnyRRsetInAnswer(res, reachable_rrs)
314
315 def testLatlon(self):
316 """
317 Basic latlon() test
318 """
319 name = 'latlon.geo.example.org.'
320 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
321 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
322 expected = dns.rrset.from_text(name, 0,
323 dns.rdataclass.IN, 'TXT',
324 '"47.913000 -122.304200"')
325
326 res = self.sendUDPQuery(query)
327 self.assertRcodeEqual(res, dns.rcode.NOERROR)
328 self.assertRRsetInAnswer(res, expected)
329
330 def testLatlonloc(self):
331 """
332 Basic latlonloc() test
333 """
334 name = 'latlonloc.geo.example.org.'
335 expected = dns.rrset.from_text(name, 0,dns.rdataclass.IN, 'TXT',
336 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
337 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
338 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
339
340 res = self.sendUDPQuery(query)
341 self.assertRcodeEqual(res, dns.rcode.NOERROR)
342 self.assertRRsetInAnswer(res, expected)
343
344 def testClosestMagic(self):
345 """
346 Basic closestMagic() test
347 """
348 name = 'www-balanced.example.org.'
349 cname = '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
350 queries = [
351 ('1.1.1.0', 24, '1.1.1.3'),
352 ('1.2.3.0', 24, '1.2.3.5'),
353 ('17.1.0.0', 16, '17.1.2.4')
354 ]
355
356 for (subnet, mask, ip) in queries:
357 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
358 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
359
360 response = dns.message.make_response(query)
361
362 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.CNAME, cname))
363 response.answer.append(dns.rrset.from_text(cname, 0, dns.rdataclass.IN, 'A', ip))
364
365 res = self.sendUDPQuery(query)
366 self.assertRcodeEqual(res, dns.rcode.NOERROR)
367 self.assertEqual(res.answer, response.answer)
368
369 def testAsnum(self):
370 """
371 Basic asnum() test
372 """
373 queries = [
374 ('1.1.1.0', 24, '"true"'),
375 ('1.2.3.0', 24, '"false"'),
376 ('17.1.0.0', 16, '"false"')
377 ]
378 name = 'asnum.geo.example.org.'
379 for (subnet, mask, txt) in queries:
380 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
381 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
382 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
383
384 res = self.sendUDPQuery(query)
385 self.assertRcodeEqual(res, dns.rcode.NOERROR)
386 self.assertRRsetInAnswer(res, expected)
387
388 def testCountry(self):
389 """
390 Basic country() test
391 """
392 queries = [
393 ('1.1.1.0', 24, '"false"'),
394 ('1.2.3.0', 24, '"true"'),
395 ('17.1.0.0', 16, '"false"')
396 ]
397 name = 'country.geo.example.org.'
398 for (subnet, mask, txt) in queries:
399 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
400 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
401 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
402
403 res = self.sendUDPQuery(query)
404 self.assertRcodeEqual(res, dns.rcode.NOERROR)
405 self.assertRRsetInAnswer(res, expected)
406
407 def testContinent(self):
408 """
409 Basic continent() test
410 """
411 queries = [
412 ('1.1.1.0', 24, '"false"'),
413 ('1.2.3.0', 24, '"true"'),
414 ('17.1.0.0', 16, '"false"')
415 ]
416 name = 'continent.geo.example.org.'
417 for (subnet, mask, txt) in queries:
418 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
419 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096, options=[ecso])
420 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', txt)
421
422 res = self.sendUDPQuery(query)
423 self.assertRcodeEqual(res, dns.rcode.NOERROR)
424 self.assertRRsetInAnswer(res, expected)
425
426 def testClosest(self):
427 """
428 Basic pickclosest() test
429 """
430 queries = [
431 ('1.1.1.0', 24, '1.1.1.2'),
432 ('1.2.3.0', 24, '1.2.3.4'),
433 ('17.1.0.0', 16, '1.1.1.2')
434 ]
435 name = 'closest.geo.example.org.'
436 for (subnet, mask, ip) in queries:
437 ecso = clientsubnetoption.ClientSubnetOption(subnet, mask)
438 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
439 expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', ip)
440
441 res = self.sendUDPQuery(query)
442 self.assertRcodeEqual(res, dns.rcode.NOERROR)
443 self.assertRRsetInAnswer(res, expected)
444
445 def testNetmask(self):
446 """
447 Basic netmask() test
448 """
449 queries = [
450 {
451 'expected': dns.rrset.from_text('true.netmask.example.org.', 0,
452 dns.rdataclass.IN, 'TXT',
453 '"true"'),
454 'query': dns.message.make_query('true.netmask.example.org', 'TXT')
455 },
456 {
457 'expected': dns.rrset.from_text('false.netmask.example.org.', 0,
458 dns.rdataclass.IN, 'TXT',
459 '"false"'),
460 'query': dns.message.make_query('false.netmask.example.org', 'TXT')
461 }
462 ]
463 for query in queries :
464 res = self.sendUDPQuery(query['query'])
465 self.assertRcodeEqual(res, dns.rcode.NOERROR)
466 self.assertRRsetInAnswer(res, query['expected'])
467
468 def testView(self):
469 """
470 Basic view() test
471 """
472 queries = [
473 {
474 'expected': dns.rrset.from_text('view.example.org.', 0,
475 dns.rdataclass.IN, 'A',
476 '{prefix}.54'.format(prefix=self._PREFIX)),
477 'query': dns.message.make_query('view.example.org', 'A')
478 },
479 {
480 'expected': dns.rrset.from_text('txt.view.example.org.', 0,
481 dns.rdataclass.IN, 'TXT',
482 '"else"'),
483 'query': dns.message.make_query('txt.view.example.org', 'TXT')
484 }
485 ]
486 for query in queries :
487 res = self.sendUDPQuery(query['query'])
488 self.assertRcodeEqual(res, dns.rcode.NOERROR)
489 self.assertRRsetInAnswer(res, query['expected'])
490
491 def testViewNoMatch(self):
492 """
493 view() test where no netmask match
494 """
495 expected = dns.rrset.from_text('none.view.example.org.', 0,
496 dns.rdataclass.IN, 'A')
497 query = dns.message.make_query('none.view.example.org', 'A')
498
499 res = self.sendUDPQuery(query)
500 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
501 self.assertAnswerEmpty(res)
502
503 def testWHashed(self):
504 """
505 Basic pickwhashed() test with a set of A records
506 As the `bestwho` is hashed, we should always get the same answer
507 """
508 expected = [dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '1.2.3.4'),
509 dns.rrset.from_text('whashed.example.org.', 0, dns.rdataclass.IN, 'A', '4.3.2.1')]
510 query = dns.message.make_query('whashed.example.org', 'A')
511
512 first = self.sendUDPQuery(query)
513 self.assertRcodeEqual(first, dns.rcode.NOERROR)
514 self.assertAnyRRsetInAnswer(first, expected)
515 for _ in range(5):
516 res = self.sendUDPQuery(query)
517 self.assertRcodeEqual(res, dns.rcode.NOERROR)
518 self.assertRRsetInAnswer(res, first.answer[0])
519
520 def testTimeout(self):
521 """
522 Test if LUA scripts are aborted if script execution takes too long
523 """
524 query = dns.message.make_query('timeout.example.org', 'A')
525
526 first = self.sendUDPQuery(query)
527 self.assertRcodeEqual(first, dns.rcode.SERVFAIL)
528
529
530 def testA(self):
531 """
532 Test A query against `any`
533 """
534 name = 'any.example.org.'
535
536 query = dns.message.make_query(name, 'A')
537
538 response = dns.message.make_response(query)
539
540 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
541
542 res = self.sendUDPQuery(query)
543 self.assertRcodeEqual(res, dns.rcode.NOERROR)
544 self.assertEqual(res.answer, response.answer)
545
546 def testANY(self):
547 """
548 Test ANY query against `any`
549 """
550
551 name = 'any.example.org.'
552
553 query = dns.message.make_query(name, 'ANY')
554
555 response = dns.message.make_response(query)
556
557 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
558 response.answer.append(dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'TXT', '"hello there"'))
559
560 res = self.sendUDPQuery(query)
561 self.assertRcodeEqual(res, dns.rcode.NOERROR)
562 self.assertEqual(self.sortRRsets(res.answer), self.sortRRsets(response.answer))
563
564 if __name__ == '__main__':
565 unittest.main()
566 exit(0)