]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
7 import clientsubnetoption
9 from authtests
import AuthTest
11 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
13 class FakeHTTPServer(BaseHTTPRequestHandler
):
14 def _set_headers(self
):
15 self
.send_response(200)
16 self
.send_header('Content-type', 'text/html')
21 if (self
.path
== '/ping.json'):
22 self
.wfile
.write('{"ping":"pong"}')
24 self
.wfile
.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
26 def log_message(self
, format
, *args
):
32 class TestLuaRecords(AuthTest
):
33 _config_template
= """
34 geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
35 edns-subnet-processing=yes
42 example.org. 3600 IN SOA {soa}
43 example.org. 3600 IN NS ns1.example.org.
44 example.org. 3600 IN NS ns2.example.org.
45 ns1.example.org. 3600 IN A {prefix}.10
46 ns2.example.org. 3600 IN A {prefix}.11
48 web1.example.org. 3600 IN A {prefix}.101
49 web2.example.org. 3600 IN A {prefix}.102
50 web3.example.org. 3600 IN A {prefix}.103
52 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
53 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
54 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
55 all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
57 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
58 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
59 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
61 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
62 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
63 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
64 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
66 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
67 "EUWips={{'{prefix}.101','{prefix}.102'}} "
68 "EUEips={{'192.168.42.101','192.168.42.102'}} "
69 "NLips={{'{prefix}.111', '{prefix}.112'}} "
70 "USAips={{'{prefix}.103'}} ")
72 usa IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{USAips, EUEips}}, settings) ")
76 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
77 "{{ '192.168.42.101', '{prefix}.101' }}, "
78 "{{ stringmatch='pong' }}) ")
80 eu-west IN LUA A ( ";include('config') "
81 "return ifurlup('http://www.lua.org:8080/', "
82 "{{EUWips, EUEips, USAips}}, settings) ")
84 nl IN LUA A ( ";include('config') "
85 "return ifportup(8081, NLips) ")
86 latlon.geo IN LUA TXT "latlon()"
87 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
88 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
89 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
90 latlonloc.geo IN LUA TXT "latlonloc()"
92 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
94 "else return 'false' end " )
95 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
97 "else return 'false' end " )
99 view IN LUA A ("view({{ "
100 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
101 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
102 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
104 txt.view IN LUA TXT ("view({{ "
105 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
106 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
108 none.view IN LUA A ("view({{ "
109 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
110 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
112 *.magic IN LUA A "closestMagic()"
113 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
115 any IN LUA A "'192.0.2.1'"
116 any IN TXT "hello there"
123 def startResponders(cls
):
124 webserver
= threading
.Thread(name
='HTTP Listener',
125 target
=cls
.HTTPResponder
,
128 webserver
.setDaemon(True)
132 def HTTPResponder(cls
, port
):
133 server_address
= ('', port
)
134 httpd
= HTTPServer(server_address
, FakeHTTPServer
)
135 httpd
.serve_forever()
140 super(TestLuaRecords
, cls
).setUpClass()
142 cls
._web
_rrsets
= [dns
.rrset
.from_text('web1.example.org.', 0, dns
.rdataclass
.IN
, 'A',
143 '{prefix}.101'.format(prefix
=cls
._PREFIX
)),
144 dns
.rrset
.from_text('web2.example.org.', 0, dns
.rdataclass
.IN
, 'A',
145 '{prefix}.102'.format(prefix
=cls
._PREFIX
)),
146 dns
.rrset
.from_text('web3.example.org.', 0, dns
.rdataclass
.IN
, 'A',
147 '{prefix}.103'.format(prefix
=cls
._PREFIX
))
150 def testPickRandom(self
):
152 Basic pickrandom() test with a set of A records
154 expected
= [dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
155 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
156 dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
157 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
158 query
= dns
.message
.make_query('rand.example.org', 'A')
160 res
= self
.sendUDPQuery(query
)
161 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
162 self
.assertAnyRRsetInAnswer(res
, expected
)
164 def testBogusV6PickRandom(self
):
166 Test a bogus AAAA pickrandom() record with a set of v4 addr
168 query
= dns
.message
.make_query('v6-bogus.rand.example.org', 'AAAA')
170 res
= self
.sendUDPQuery(query
)
171 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
173 def testV6PickRandom(self
):
175 Test pickrandom() AAAA record
177 expected
= [dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
178 '2001:db8:a0b:12f0::1'),
179 dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
180 'fe80::2a1:9bff:fe9b:f268')]
181 query
= dns
.message
.make_query('v6.rand.example.org', 'AAAA')
183 res
= self
.sendUDPQuery(query
)
184 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
185 self
.assertAnyRRsetInAnswer(res
, expected
)
187 def testEmptyRandom(self
):
189 Basic pickrandom() test with an empty set
191 query
= dns
.message
.make_query('empty.rand.example.org', 'A')
193 res
= self
.sendUDPQuery(query
)
194 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
196 def testWRandom(self
):
198 Basic pickwrandom() test with a set of A records
200 expected
= [dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
201 '{prefix}.103'.format(prefix
=self
._PREFIX
)),
202 dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
203 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
204 query
= dns
.message
.make_query('wrand.example.org', 'A')
206 res
= self
.sendUDPQuery(query
)
207 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
208 self
.assertAnyRRsetInAnswer(res
, expected
)
210 def testIfportup(self
):
212 Basic ifportup() test
214 query
= dns
.message
.make_query('all.ifportup.example.org', 'A')
216 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
217 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
218 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
219 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
221 res
= self
.sendUDPQuery(query
)
222 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
223 self
.assertAnyRRsetInAnswer(res
, expected
)
225 def testIfportupWithSomeDown(self
):
227 Basic ifportup() test with some ports DOWN
229 query
= dns
.message
.make_query('some.ifportup.example.org', 'A')
231 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
233 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
234 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
236 # we first expect any of the IPs as no check has been performed yet
237 res
= self
.sendUDPQuery(query
)
238 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
239 self
.assertAnyRRsetInAnswer(res
, expected
)
241 # the first IP should not be up so only second shoud be returned
242 expected
= [expected
[1]]
243 res
= self
.sendUDPQuery(query
)
244 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
245 self
.assertAnyRRsetInAnswer(res
, expected
)
247 def testIfportupWithAllDown(self
):
249 Basic ifportup() test with all ports DOWN
251 query
= dns
.message
.make_query('none.ifportup.example.org', 'A')
253 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
255 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
256 '192.168.21.42'.format(prefix
=self
._PREFIX
))]
258 # we first expect any of the IPs as no check has been performed yet
259 res
= self
.sendUDPQuery(query
)
260 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
261 self
.assertAnyRRsetInAnswer(res
, expected
)
263 # no port should be up so we expect any
264 res
= self
.sendUDPQuery(query
)
265 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
266 self
.assertAnyRRsetInAnswer(res
, expected
)
268 def testIfportupWithAllDownAndAllBackupSelector(self
):
270 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
272 name
= 'all.noneup.ifportup.example.org.'
273 query
= dns
.message
.make_query(name
, dns
.rdatatype
.A
)
274 expected
= [dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.168.42.21', '192.168.21.42')]
276 res
= self
.sendUDPQuery(query
)
277 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
280 res
= self
.sendUDPQuery(query
)
281 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
282 self
.assertEqual(res
.answer
, expected
)
284 def testIfurlup(self
):
289 '{prefix}.103'.format(prefix
=self
._PREFIX
)
291 unreachable
= ['192.168.42.101', '192.168.42.102']
292 ips
= reachable
+ unreachable
296 rr
= dns
.rrset
.from_text('usa.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
299 reachable_rrs
.append(rr
)
301 query
= dns
.message
.make_query('usa.example.org', 'A')
302 res
= self
.sendUDPQuery(query
)
303 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
304 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
307 res
= self
.sendUDPQuery(query
)
308 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
309 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
311 def testIfurlupSimplified(self
):
313 Basic ifurlup() test with the simplified list of ips
314 Also ensures the correct path is queried
317 '{prefix}.101'.format(prefix
=self
._PREFIX
)
319 unreachable
= ['192.168.42.101']
320 ips
= reachable
+ unreachable
324 rr
= dns
.rrset
.from_text('mix.ifurlup.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
327 reachable_rrs
.append(rr
)
329 query
= dns
.message
.make_query('mix.ifurlup.example.org', 'A')
330 res
= self
.sendUDPQuery(query
)
331 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
332 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
335 res
= self
.sendUDPQuery(query
)
336 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
337 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
339 def testLatlon(self
):
343 name
= 'latlon.geo.example.org.'
344 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
345 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
346 expected
= dns
.rrset
.from_text(name
, 0,
347 dns
.rdataclass
.IN
, 'TXT',
348 '"47.913000 -122.304200"')
350 res
= self
.sendUDPQuery(query
)
351 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
352 self
.assertRRsetInAnswer(res
, expected
)
354 def testLatlonloc(self
):
356 Basic latlonloc() test
358 name
= 'latlonloc.geo.example.org.'
359 expected
= dns
.rrset
.from_text(name
, 0,dns
.rdataclass
.IN
, 'TXT',
360 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
361 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
362 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
364 res
= self
.sendUDPQuery(query
)
365 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
366 self
.assertRRsetInAnswer(res
, expected
)
368 def testWildcardError(self
):
370 Ensure errors coming from LUA wildcards are reported
372 query
= dns
.message
.make_query('failure.magic.example.org', 'A')
374 res
= self
.sendUDPQuery(query
)
375 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
376 self
.assertAnswerEmpty(res
)
378 def testClosestMagic(self
):
380 Basic closestMagic() test
382 name
= 'www-balanced.example.org.'
383 cname
= '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
385 ('1.1.1.0', 24, '1.1.1.3'),
386 ('1.2.3.0', 24, '1.2.3.5'),
387 ('17.1.0.0', 16, '17.1.2.4')
390 for (subnet
, mask
, ip
) in queries
:
391 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
392 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
394 response
= dns
.message
.make_response(query
)
396 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.CNAME
, cname
))
397 response
.answer
.append(dns
.rrset
.from_text(cname
, 0, dns
.rdataclass
.IN
, 'A', ip
))
399 res
= self
.sendUDPQuery(query
)
400 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
401 self
.assertEqual(res
.answer
, response
.answer
)
408 ('1.1.1.0', 24, '"true"'),
409 ('1.2.3.0', 24, '"false"'),
410 ('17.1.0.0', 16, '"false"')
412 name
= 'asnum.geo.example.org.'
413 for (subnet
, mask
, txt
) in queries
:
414 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
415 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
416 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
418 res
= self
.sendUDPQuery(query
)
419 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
420 self
.assertRRsetInAnswer(res
, expected
)
422 def testCountry(self
):
427 ('1.1.1.0', 24, '"false"'),
428 ('1.2.3.0', 24, '"true"'),
429 ('17.1.0.0', 16, '"false"')
431 name
= 'country.geo.example.org.'
432 for (subnet
, mask
, txt
) in queries
:
433 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
434 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
435 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
437 res
= self
.sendUDPQuery(query
)
438 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
439 self
.assertRRsetInAnswer(res
, expected
)
441 def testContinent(self
):
443 Basic continent() test
446 ('1.1.1.0', 24, '"false"'),
447 ('1.2.3.0', 24, '"true"'),
448 ('17.1.0.0', 16, '"false"')
450 name
= 'continent.geo.example.org.'
451 for (subnet
, mask
, txt
) in queries
:
452 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
453 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
454 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
456 res
= self
.sendUDPQuery(query
)
457 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
458 self
.assertRRsetInAnswer(res
, expected
)
460 def testClosest(self
):
462 Basic pickclosest() test
465 ('1.1.1.0', 24, '1.1.1.2'),
466 ('1.2.3.0', 24, '1.2.3.4'),
467 ('17.1.0.0', 16, '1.1.1.2')
469 name
= 'closest.geo.example.org.'
470 for (subnet
, mask
, ip
) in queries
:
471 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
472 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
473 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', ip
)
475 res
= self
.sendUDPQuery(query
)
476 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
477 self
.assertRRsetInAnswer(res
, expected
)
479 def testNetmask(self
):
485 'expected': dns
.rrset
.from_text('true.netmask.example.org.', 0,
486 dns
.rdataclass
.IN
, 'TXT',
488 'query': dns
.message
.make_query('true.netmask.example.org', 'TXT')
491 'expected': dns
.rrset
.from_text('false.netmask.example.org.', 0,
492 dns
.rdataclass
.IN
, 'TXT',
494 'query': dns
.message
.make_query('false.netmask.example.org', 'TXT')
497 for query
in queries
:
498 res
= self
.sendUDPQuery(query
['query'])
499 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
500 self
.assertRRsetInAnswer(res
, query
['expected'])
508 'expected': dns
.rrset
.from_text('view.example.org.', 0,
509 dns
.rdataclass
.IN
, 'A',
510 '{prefix}.54'.format(prefix
=self
._PREFIX
)),
511 'query': dns
.message
.make_query('view.example.org', 'A')
514 'expected': dns
.rrset
.from_text('txt.view.example.org.', 0,
515 dns
.rdataclass
.IN
, 'TXT',
517 'query': dns
.message
.make_query('txt.view.example.org', 'TXT')
520 for query
in queries
:
521 res
= self
.sendUDPQuery(query
['query'])
522 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
523 self
.assertRRsetInAnswer(res
, query
['expected'])
525 def testViewNoMatch(self
):
527 view() test where no netmask match
529 query
= dns
.message
.make_query('none.view.example.org', 'A')
531 res
= self
.sendUDPQuery(query
)
532 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
533 self
.assertAnswerEmpty(res
)
535 def testWHashed(self
):
537 Basic pickwhashed() test with a set of A records
538 As the `bestwho` is hashed, we should always get the same answer
540 expected
= [dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '1.2.3.4'),
541 dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '4.3.2.1')]
542 query
= dns
.message
.make_query('whashed.example.org', 'A')
544 first
= self
.sendUDPQuery(query
)
545 self
.assertRcodeEqual(first
, dns
.rcode
.NOERROR
)
546 self
.assertAnyRRsetInAnswer(first
, expected
)
548 res
= self
.sendUDPQuery(query
)
549 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
550 self
.assertRRsetInAnswer(res
, first
.answer
[0])
552 def testTimeout(self
):
554 Test if LUA scripts are aborted if script execution takes too long
556 query
= dns
.message
.make_query('timeout.example.org', 'A')
558 first
= self
.sendUDPQuery(query
)
559 self
.assertRcodeEqual(first
, dns
.rcode
.SERVFAIL
)
564 Test A query against `any`
566 name
= 'any.example.org.'
568 query
= dns
.message
.make_query(name
, 'A')
570 response
= dns
.message
.make_response(query
)
572 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
574 res
= self
.sendUDPQuery(query
)
575 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
576 self
.assertEqual(res
.answer
, response
.answer
)
580 Test ANY query against `any`
583 name
= 'any.example.org.'
585 query
= dns
.message
.make_query(name
, 'ANY')
587 response
= dns
.message
.make_response(query
)
589 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
590 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', '"hello there"'))
592 res
= self
.sendUDPQuery(query
)
593 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
594 self
.assertEqual(self
.sortRRsets(res
.answer
), self
.sortRRsets(response
.answer
))
596 if __name__
== '__main__':