]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
7 import clientsubnetoption
9 from authtests
import AuthTest
11 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
13 class FakeHTTPServer(BaseHTTPRequestHandler
):
14 def _set_headers(self
):
15 self
.send_response(200)
16 self
.send_header('Content-type', 'text/html')
21 if (self
.path
== '/ping.json'):
22 self
.wfile
.write('{"ping":"pong"}')
24 self
.wfile
.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
26 def log_message(self
, format
, *args
):
32 class TestLuaRecords(AuthTest
):
35 example.org. 3600 IN SOA {soa}
36 example.org. 3600 IN NS ns1.example.org.
37 example.org. 3600 IN NS ns2.example.org.
38 ns1.example.org. 3600 IN A {prefix}.10
39 ns2.example.org. 3600 IN A {prefix}.11
41 web1.example.org. 3600 IN A {prefix}.101
42 web2.example.org. 3600 IN A {prefix}.102
43 web3.example.org. 3600 IN A {prefix}.103
45 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
46 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
47 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
48 all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
50 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
51 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
52 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
53 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
54 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
55 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
56 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
57 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
59 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
60 "EUWips={{'{prefix}.101','{prefix}.102'}} "
61 "EUEips={{'192.168.42.101','192.168.42.102'}} "
62 "NLips={{'{prefix}.111', '{prefix}.112'}} "
63 "USAips={{'{prefix}.103'}} ")
65 usa IN LUA A ( ";include('config') "
66 "return ifurlup('http://www.lua.org:8080/', "
67 "{{USAips, EUEips}}, settings) ")
69 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
70 "{{ '192.168.42.101', '{prefix}.101' }}, "
71 "{{ stringmatch='pong' }}) ")
73 eu-west IN LUA A ( ";include('config') "
74 "return ifurlup('http://www.lua.org:8080/', "
75 "{{EUWips, EUEips, USAips}}, settings) ")
77 nl IN LUA A ( ";include('config') "
78 "return ifportup(8081, NLips) ")
79 latlon.geo IN LUA TXT "latlon()"
80 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
81 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
82 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
83 latlonloc.geo IN LUA TXT "latlonloc()"
85 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
87 "else return 'false' end " )
88 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
90 "else return 'false' end " )
92 view IN LUA A ("view({{ "
93 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
94 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
95 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
97 txt.view IN LUA TXT ("view({{ "
98 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
99 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
101 none.view IN LUA A ("view({{ "
102 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
103 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
105 *.magic IN LUA A "closestMagic()"
106 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
108 any IN LUA A "'192.0.2.1'"
109 any IN TXT "hello there"
116 def startResponders(cls
):
117 webserver
= threading
.Thread(name
='HTTP Listener',
118 target
=cls
.HTTPResponder
,
121 webserver
.setDaemon(True)
125 def HTTPResponder(cls
, port
):
126 server_address
= ('', port
)
127 httpd
= HTTPServer(server_address
, FakeHTTPServer
)
128 httpd
.serve_forever()
133 super(TestLuaRecords
, cls
).setUpClass()
135 cls
._web
_rrsets
= [dns
.rrset
.from_text('web1.example.org.', 0, dns
.rdataclass
.IN
, 'A',
136 '{prefix}.101'.format(prefix
=cls
._PREFIX
)),
137 dns
.rrset
.from_text('web2.example.org.', 0, dns
.rdataclass
.IN
, 'A',
138 '{prefix}.102'.format(prefix
=cls
._PREFIX
)),
139 dns
.rrset
.from_text('web3.example.org.', 0, dns
.rdataclass
.IN
, 'A',
140 '{prefix}.103'.format(prefix
=cls
._PREFIX
))
143 def testPickRandom(self
):
145 Basic pickrandom() test with a set of A records
147 expected
= [dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
148 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
149 dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
150 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
151 query
= dns
.message
.make_query('rand.example.org', 'A')
153 res
= self
.sendUDPQuery(query
)
154 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
155 self
.assertAnyRRsetInAnswer(res
, expected
)
157 def testBogusV6PickRandom(self
):
159 Test a bogus AAAA pickrandom() record with a set of v4 addr
161 query
= dns
.message
.make_query('v6-bogus.rand.example.org', 'AAAA')
163 res
= self
.sendUDPQuery(query
)
164 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
166 def testV6PickRandom(self
):
168 Test pickrandom() AAAA record
170 expected
= [dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
171 '2001:db8:a0b:12f0::1'),
172 dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
173 'fe80::2a1:9bff:fe9b:f268')]
174 query
= dns
.message
.make_query('v6.rand.example.org', 'AAAA')
176 res
= self
.sendUDPQuery(query
)
177 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
178 self
.assertAnyRRsetInAnswer(res
, expected
)
180 def testEmptyRandom(self
):
182 Basic pickrandom() test with an empty set
184 query
= dns
.message
.make_query('empty.rand.example.org', 'A')
186 res
= self
.sendUDPQuery(query
)
187 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
189 def testWRandom(self
):
191 Basic pickwrandom() test with a set of A records
193 expected
= [dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
194 '{prefix}.103'.format(prefix
=self
._PREFIX
)),
195 dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
196 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
197 query
= dns
.message
.make_query('wrand.example.org', 'A')
199 res
= self
.sendUDPQuery(query
)
200 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
201 self
.assertAnyRRsetInAnswer(res
, expected
)
203 def testIfportup(self
):
205 Basic ifportup() test
207 query
= dns
.message
.make_query('all.ifportup.example.org', 'A')
209 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
210 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
211 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
212 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
214 res
= self
.sendUDPQuery(query
)
215 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
216 self
.assertAnyRRsetInAnswer(res
, expected
)
218 def testIfportupWithSomeDown(self
):
220 Basic ifportup() test with some ports DOWN
222 query
= dns
.message
.make_query('some.ifportup.example.org', 'A')
224 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
226 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
227 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
229 # we first expect any of the IPs as no check has been performed yet
230 res
= self
.sendUDPQuery(query
)
231 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
232 self
.assertAnyRRsetInAnswer(res
, expected
)
234 # the first IP should not be up so only second shoud be returned
235 expected
= [expected
[1]]
236 res
= self
.sendUDPQuery(query
)
237 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
238 self
.assertAnyRRsetInAnswer(res
, expected
)
240 def testIfportupWithAllDown(self
):
242 Basic ifportup() test with all ports DOWN
244 query
= dns
.message
.make_query('none.ifportup.example.org', 'A')
246 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
248 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
249 '192.168.21.42'.format(prefix
=self
._PREFIX
))]
251 # we first expect any of the IPs as no check has been performed yet
252 res
= self
.sendUDPQuery(query
)
253 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
254 self
.assertAnyRRsetInAnswer(res
, expected
)
256 # no port should be up so we expect any
257 res
= self
.sendUDPQuery(query
)
258 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
259 self
.assertAnyRRsetInAnswer(res
, expected
)
261 def testIfportupWithAllDownAndAllBackupSelector(self
):
263 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
265 name
= 'all.noneup.ifportup.example.org.'
266 query
= dns
.message
.make_query(name
, dns
.rdatatype
.A
)
267 expected
= [dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.168.42.21', '192.168.21.42')]
269 res
= self
.sendUDPQuery(query
)
270 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
273 res
= self
.sendUDPQuery(query
)
274 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
275 self
.assertEqual(res
.answer
, expected
)
277 def testIfurlup(self
):
282 '{prefix}.103'.format(prefix
=self
._PREFIX
)
284 unreachable
= ['192.168.42.101', '192.168.42.102']
285 ips
= reachable
+ unreachable
289 rr
= dns
.rrset
.from_text('usa.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
292 reachable_rrs
.append(rr
)
294 query
= dns
.message
.make_query('usa.example.org', 'A')
295 res
= self
.sendUDPQuery(query
)
296 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
297 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
300 res
= self
.sendUDPQuery(query
)
301 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
302 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
304 def testIfurlupSimplified(self
):
306 Basic ifurlup() test with the simplified list of ips
307 Also ensures the correct path is queried
310 '{prefix}.101'.format(prefix
=self
._PREFIX
)
312 unreachable
= ['192.168.42.101']
313 ips
= reachable
+ unreachable
317 rr
= dns
.rrset
.from_text('mix.ifurlup.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
320 reachable_rrs
.append(rr
)
322 query
= dns
.message
.make_query('mix.ifurlup.example.org', 'A')
323 res
= self
.sendUDPQuery(query
)
324 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
325 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
328 res
= self
.sendUDPQuery(query
)
329 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
330 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
332 def testLatlon(self
):
336 name
= 'latlon.geo.example.org.'
337 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
338 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
339 expected
= dns
.rrset
.from_text(name
, 0,
340 dns
.rdataclass
.IN
, 'TXT',
341 '"47.913000 -122.304200"')
343 res
= self
.sendUDPQuery(query
)
344 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
345 self
.assertRRsetInAnswer(res
, expected
)
347 def testLatlonloc(self
):
349 Basic latlonloc() test
351 name
= 'latlonloc.geo.example.org.'
352 expected
= dns
.rrset
.from_text(name
, 0,dns
.rdataclass
.IN
, 'TXT',
353 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
354 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
355 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
357 res
= self
.sendUDPQuery(query
)
358 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
359 self
.assertRRsetInAnswer(res
, expected
)
361 def testWildcardError(self
):
363 Ensure errors coming from LUA wildcards are reported
365 query
= dns
.message
.make_query('failure.magic.example.org', 'A')
367 res
= self
.sendUDPQuery(query
)
368 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
369 self
.assertAnswerEmpty(res
)
371 def testClosestMagic(self
):
373 Basic closestMagic() test
375 name
= 'www-balanced.example.org.'
376 cname
= '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
378 ('1.1.1.0', 24, '1.1.1.3'),
379 ('1.2.3.0', 24, '1.2.3.5'),
380 ('17.1.0.0', 16, '17.1.2.4')
383 for (subnet
, mask
, ip
) in queries
:
384 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
385 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
387 response
= dns
.message
.make_response(query
)
389 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.CNAME
, cname
))
390 response
.answer
.append(dns
.rrset
.from_text(cname
, 0, dns
.rdataclass
.IN
, 'A', ip
))
392 res
= self
.sendUDPQuery(query
)
393 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
394 self
.assertEqual(res
.answer
, response
.answer
)
401 ('1.1.1.0', 24, '"true"'),
402 ('1.2.3.0', 24, '"false"'),
403 ('17.1.0.0', 16, '"false"')
405 name
= 'asnum.geo.example.org.'
406 for (subnet
, mask
, txt
) in queries
:
407 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
408 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
409 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
411 res
= self
.sendUDPQuery(query
)
412 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
413 self
.assertRRsetInAnswer(res
, expected
)
415 def testCountry(self
):
420 ('1.1.1.0', 24, '"false"'),
421 ('1.2.3.0', 24, '"true"'),
422 ('17.1.0.0', 16, '"false"')
424 name
= 'country.geo.example.org.'
425 for (subnet
, mask
, txt
) in queries
:
426 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
427 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
428 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
430 res
= self
.sendUDPQuery(query
)
431 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
432 self
.assertRRsetInAnswer(res
, expected
)
434 def testContinent(self
):
436 Basic continent() test
439 ('1.1.1.0', 24, '"false"'),
440 ('1.2.3.0', 24, '"true"'),
441 ('17.1.0.0', 16, '"false"')
443 name
= 'continent.geo.example.org.'
444 for (subnet
, mask
, txt
) in queries
:
445 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
446 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
447 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
449 res
= self
.sendUDPQuery(query
)
450 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
451 self
.assertRRsetInAnswer(res
, expected
)
453 def testClosest(self
):
455 Basic pickclosest() test
458 ('1.1.1.0', 24, '1.1.1.2'),
459 ('1.2.3.0', 24, '1.2.3.4'),
460 ('17.1.0.0', 16, '1.1.1.2')
462 name
= 'closest.geo.example.org.'
463 for (subnet
, mask
, ip
) in queries
:
464 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
465 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
466 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', ip
)
468 res
= self
.sendUDPQuery(query
)
469 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
470 self
.assertRRsetInAnswer(res
, expected
)
472 def testNetmask(self
):
478 'expected': dns
.rrset
.from_text('true.netmask.example.org.', 0,
479 dns
.rdataclass
.IN
, 'TXT',
481 'query': dns
.message
.make_query('true.netmask.example.org', 'TXT')
484 'expected': dns
.rrset
.from_text('false.netmask.example.org.', 0,
485 dns
.rdataclass
.IN
, 'TXT',
487 'query': dns
.message
.make_query('false.netmask.example.org', 'TXT')
490 for query
in queries
:
491 res
= self
.sendUDPQuery(query
['query'])
492 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
493 self
.assertRRsetInAnswer(res
, query
['expected'])
501 'expected': dns
.rrset
.from_text('view.example.org.', 0,
502 dns
.rdataclass
.IN
, 'A',
503 '{prefix}.54'.format(prefix
=self
._PREFIX
)),
504 'query': dns
.message
.make_query('view.example.org', 'A')
507 'expected': dns
.rrset
.from_text('txt.view.example.org.', 0,
508 dns
.rdataclass
.IN
, 'TXT',
510 'query': dns
.message
.make_query('txt.view.example.org', 'TXT')
513 for query
in queries
:
514 res
= self
.sendUDPQuery(query
['query'])
515 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
516 self
.assertRRsetInAnswer(res
, query
['expected'])
518 def testViewNoMatch(self
):
520 view() test where no netmask match
522 query
= dns
.message
.make_query('none.view.example.org', 'A')
524 res
= self
.sendUDPQuery(query
)
525 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
526 self
.assertAnswerEmpty(res
)
528 def testWHashed(self
):
530 Basic pickwhashed() test with a set of A records
531 As the `bestwho` is hashed, we should always get the same answer
533 expected
= [dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '1.2.3.4'),
534 dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '4.3.2.1')]
535 query
= dns
.message
.make_query('whashed.example.org', 'A')
537 first
= self
.sendUDPQuery(query
)
538 self
.assertRcodeEqual(first
, dns
.rcode
.NOERROR
)
539 self
.assertAnyRRsetInAnswer(first
, expected
)
541 res
= self
.sendUDPQuery(query
)
542 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
543 self
.assertRRsetInAnswer(res
, first
.answer
[0])
545 def testTimeout(self
):
547 Test if LUA scripts are aborted if script execution takes too long
549 query
= dns
.message
.make_query('timeout.example.org', 'A')
551 first
= self
.sendUDPQuery(query
)
552 self
.assertRcodeEqual(first
, dns
.rcode
.SERVFAIL
)
557 Test A query against `any`
559 name
= 'any.example.org.'
561 query
= dns
.message
.make_query(name
, 'A')
563 response
= dns
.message
.make_response(query
)
565 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
567 res
= self
.sendUDPQuery(query
)
568 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
569 self
.assertEqual(res
.answer
, response
.answer
)
573 Test ANY query against `any`
576 name
= 'any.example.org.'
578 query
= dns
.message
.make_query(name
, 'ANY')
580 response
= dns
.message
.make_response(query
)
582 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
583 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', '"hello there"'))
585 res
= self
.sendUDPQuery(query
)
586 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
587 self
.assertEqual(self
.sortRRsets(res
.answer
), self
.sortRRsets(response
.answer
))
589 if __name__
== '__main__':