]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_LuaRecords.py
7 import clientsubnetoption
9 from authtests
import AuthTest
11 from BaseHTTPServer
import BaseHTTPRequestHandler
, HTTPServer
13 class FakeHTTPServer(BaseHTTPRequestHandler
):
14 def _set_headers(self
):
15 self
.send_response(200)
16 self
.send_header('Content-type', 'text/html')
21 if (self
.path
== '/ping.json'):
22 self
.wfile
.write('{"ping":"pong"}')
24 self
.wfile
.write("<html><body><h1>hi!</h1><h2>Programming in Lua !</h2></body></html>")
26 def log_message(self
, format
, *args
):
32 class TestLuaRecords(AuthTest
):
33 _config_template
= """
34 geoip-database-files=../modules/geoipbackend/regression-tests/GeoLiteCity.mmdb
35 edns-subnet-processing=yes
42 example.org. 3600 IN SOA {soa}
43 example.org. 3600 IN NS ns1.example.org.
44 example.org. 3600 IN NS ns2.example.org.
45 ns1.example.org. 3600 IN A {prefix}.10
46 ns2.example.org. 3600 IN A {prefix}.11
48 web1.example.org. 3600 IN A {prefix}.101
49 web2.example.org. 3600 IN A {prefix}.102
50 web3.example.org. 3600 IN A {prefix}.103
52 all.ifportup 3600 IN LUA A "ifportup(8080, {{'{prefix}.101', '{prefix}.102'}})"
53 some.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '{prefix}.102'}})"
54 none.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}})"
55 all.noneup.ifportup 3600 IN LUA A "ifportup(8080, {{'192.168.42.21', '192.168.21.42'}}, {{ backupSelector='all' }})"
57 whashed.example.org. 3600 IN LUA A "pickwhashed({{ {{15, '1.2.3.4'}}, {{42, '4.3.2.1'}} }})"
58 rand.example.org. 3600 IN LUA A "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
59 v6-bogus.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'{prefix}.101', '{prefix}.102'}})"
60 v6.rand.example.org. 3600 IN LUA AAAA "pickrandom({{'2001:db8:a0b:12f0::1', 'fe80::2a1:9bff:fe9b:f268'}})"
61 closest.geo 3600 IN LUA A "pickclosest({{'1.1.1.2','1.2.3.4'}})"
62 empty.rand.example.org. 3600 IN LUA A "pickrandom()"
63 timeout.example.org. 3600 IN LUA A "; local i = 0 ; while i < 1000 do pickrandom() ; i = i + 1 end return '1.2.3.4'"
64 wrand.example.org. 3600 IN LUA A "pickwrandom({{ {{30, '{prefix}.102'}}, {{15, '{prefix}.103'}} }})"
66 config IN LUA LUA ("settings={{stringmatch='Programming in Lua'}} "
67 "EUWips={{'{prefix}.101','{prefix}.102'}} "
68 "EUEips={{'192.168.42.101','192.168.42.102'}} "
69 "NLips={{'{prefix}.111', '{prefix}.112'}} "
70 "USAips={{'{prefix}.103'}} ")
72 usa IN LUA A ( ";include('config') "
73 "return ifurlup('http://www.lua.org:8080/', "
74 "{{USAips, EUEips}}, settings) ")
76 mix.ifurlup IN LUA A ("ifurlup('http://www.other.org:8080/ping.json', "
77 "{{ '192.168.42.101', '{prefix}.101' }}, "
78 "{{ stringmatch='pong' }}) ")
80 eu-west IN LUA A ( ";include('config') "
81 "return ifurlup('http://www.lua.org:8080/', "
82 "{{EUWips, EUEips, USAips}}, settings) ")
84 nl IN LUA A ( ";include('config') "
85 "return ifportup(8081, NLips) ")
86 latlon.geo IN LUA TXT "latlon()"
87 continent.geo IN LUA TXT ";if(continent('NA')) then return 'true' else return 'false' end"
88 asnum.geo IN LUA TXT ";if(asnum('4242')) then return 'true' else return 'false' end"
89 country.geo IN LUA TXT ";if(country('US')) then return 'true' else return 'false' end"
90 latlonloc.geo IN LUA TXT "latlonloc()"
92 true.netmask IN LUA TXT ( ";if(netmask({{ '{prefix}.0/24' }})) "
94 "else return 'false' end " )
95 false.netmask IN LUA TXT ( ";if(netmask({{ '1.2.3.4/8' }})) "
97 "else return 'false' end " )
99 view IN LUA A ("view({{ "
100 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
101 "{{ {{'{prefix}.0/16'}}, {{'{prefix}.54'}}}}, "
102 "{{ {{'0.0.0.0/0'}}, {{'192.0.2.1'}}}} "
104 txt.view IN LUA TXT ("view({{ "
105 "{{ {{'192.168.0.0/16'}}, {{'txt'}}}}, "
106 "{{ {{'0.0.0.0/0'}}, {{'else'}}}} "
108 none.view IN LUA A ("view({{ "
109 "{{ {{'192.168.0.0/16'}}, {{'192.168.1.54'}}}},"
110 "{{ {{'1.2.0.0/16'}}, {{'1.2.3.4'}}}}, "
112 *.magic IN LUA A "closestMagic()"
113 www-balanced IN CNAME 1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.
115 any IN LUA A "'192.0.2.1'"
116 any IN TXT "hello there"
118 resolve IN LUA A ";local r=resolve('localhost', 1) local t={{}} for _,v in ipairs(r) do table.insert(t, v:toString()) end return t"
125 def startResponders(cls
):
126 webserver
= threading
.Thread(name
='HTTP Listener',
127 target
=cls
.HTTPResponder
,
130 webserver
.setDaemon(True)
134 def HTTPResponder(cls
, port
):
135 server_address
= ('', port
)
136 httpd
= HTTPServer(server_address
, FakeHTTPServer
)
137 httpd
.serve_forever()
142 super(TestLuaRecords
, cls
).setUpClass()
144 cls
._web
_rrsets
= [dns
.rrset
.from_text('web1.example.org.', 0, dns
.rdataclass
.IN
, 'A',
145 '{prefix}.101'.format(prefix
=cls
._PREFIX
)),
146 dns
.rrset
.from_text('web2.example.org.', 0, dns
.rdataclass
.IN
, 'A',
147 '{prefix}.102'.format(prefix
=cls
._PREFIX
)),
148 dns
.rrset
.from_text('web3.example.org.', 0, dns
.rdataclass
.IN
, 'A',
149 '{prefix}.103'.format(prefix
=cls
._PREFIX
))
152 def testPickRandom(self
):
154 Basic pickrandom() test with a set of A records
156 expected
= [dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
157 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
158 dns
.rrset
.from_text('rand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
159 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
160 query
= dns
.message
.make_query('rand.example.org', 'A')
162 res
= self
.sendUDPQuery(query
)
163 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
164 self
.assertAnyRRsetInAnswer(res
, expected
)
166 def testBogusV6PickRandom(self
):
168 Test a bogus AAAA pickrandom() record with a set of v4 addr
170 query
= dns
.message
.make_query('v6-bogus.rand.example.org', 'AAAA')
172 res
= self
.sendUDPQuery(query
)
173 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
175 def testV6PickRandom(self
):
177 Test pickrandom() AAAA record
179 expected
= [dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
180 '2001:db8:a0b:12f0::1'),
181 dns
.rrset
.from_text('v6.rand.example.org.', 0, dns
.rdataclass
.IN
, 'AAAA',
182 'fe80::2a1:9bff:fe9b:f268')]
183 query
= dns
.message
.make_query('v6.rand.example.org', 'AAAA')
185 res
= self
.sendUDPQuery(query
)
186 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
187 self
.assertAnyRRsetInAnswer(res
, expected
)
189 def testEmptyRandom(self
):
191 Basic pickrandom() test with an empty set
193 query
= dns
.message
.make_query('empty.rand.example.org', 'A')
195 res
= self
.sendUDPQuery(query
)
196 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
198 def testWRandom(self
):
200 Basic pickwrandom() test with a set of A records
202 expected
= [dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
203 '{prefix}.103'.format(prefix
=self
._PREFIX
)),
204 dns
.rrset
.from_text('wrand.example.org.', 0, dns
.rdataclass
.IN
, 'A',
205 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
206 query
= dns
.message
.make_query('wrand.example.org', 'A')
208 res
= self
.sendUDPQuery(query
)
209 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
210 self
.assertAnyRRsetInAnswer(res
, expected
)
212 def testIfportup(self
):
214 Basic ifportup() test
216 query
= dns
.message
.make_query('all.ifportup.example.org', 'A')
218 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
219 '{prefix}.101'.format(prefix
=self
._PREFIX
)),
220 dns
.rrset
.from_text('all.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
221 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
223 res
= self
.sendUDPQuery(query
)
224 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
225 self
.assertAnyRRsetInAnswer(res
, expected
)
227 def testIfportupWithSomeDown(self
):
229 Basic ifportup() test with some ports DOWN
231 query
= dns
.message
.make_query('some.ifportup.example.org', 'A')
233 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
235 dns
.rrset
.from_text('some.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
236 '{prefix}.102'.format(prefix
=self
._PREFIX
))]
238 # we first expect any of the IPs as no check has been performed yet
239 res
= self
.sendUDPQuery(query
)
240 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
241 self
.assertAnyRRsetInAnswer(res
, expected
)
243 # the first IP should not be up so only second should be returned
244 expected
= [expected
[1]]
245 res
= self
.sendUDPQuery(query
)
246 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
247 self
.assertAnyRRsetInAnswer(res
, expected
)
249 def testIfportupWithAllDown(self
):
251 Basic ifportup() test with all ports DOWN
253 query
= dns
.message
.make_query('none.ifportup.example.org', 'A')
255 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
257 dns
.rrset
.from_text('none.ifportup.example.org.', 0, dns
.rdataclass
.IN
, 'A',
258 '192.168.21.42'.format(prefix
=self
._PREFIX
))]
260 # we first expect any of the IPs as no check has been performed yet
261 res
= self
.sendUDPQuery(query
)
262 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
263 self
.assertAnyRRsetInAnswer(res
, expected
)
265 # no port should be up so we expect any
266 res
= self
.sendUDPQuery(query
)
267 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
268 self
.assertAnyRRsetInAnswer(res
, expected
)
270 def testIfportupWithAllDownAndAllBackupSelector(self
):
272 Basic ifportup() test with all ports DOWN, fallback to 'all' backup selector
274 name
= 'all.noneup.ifportup.example.org.'
275 query
= dns
.message
.make_query(name
, dns
.rdatatype
.A
)
276 expected
= [dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.168.42.21', '192.168.21.42')]
278 res
= self
.sendUDPQuery(query
)
279 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
282 res
= self
.sendUDPQuery(query
)
283 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
284 self
.assertEqual(res
.answer
, expected
)
286 def testIfurlup(self
):
291 '{prefix}.103'.format(prefix
=self
._PREFIX
)
293 unreachable
= ['192.168.42.101', '192.168.42.102']
294 ips
= reachable
+ unreachable
298 rr
= dns
.rrset
.from_text('usa.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
301 reachable_rrs
.append(rr
)
303 query
= dns
.message
.make_query('usa.example.org', 'A')
304 res
= self
.sendUDPQuery(query
)
305 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
306 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
309 res
= self
.sendUDPQuery(query
)
310 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
311 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
313 def testIfurlupSimplified(self
):
315 Basic ifurlup() test with the simplified list of ips
316 Also ensures the correct path is queried
319 '{prefix}.101'.format(prefix
=self
._PREFIX
)
321 unreachable
= ['192.168.42.101']
322 ips
= reachable
+ unreachable
326 rr
= dns
.rrset
.from_text('mix.ifurlup.example.org.', 0, dns
.rdataclass
.IN
, 'A', ip
)
329 reachable_rrs
.append(rr
)
331 query
= dns
.message
.make_query('mix.ifurlup.example.org', 'A')
332 res
= self
.sendUDPQuery(query
)
333 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
334 self
.assertAnyRRsetInAnswer(res
, all_rrs
)
337 res
= self
.sendUDPQuery(query
)
338 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
339 self
.assertAnyRRsetInAnswer(res
, reachable_rrs
)
341 def testLatlon(self
):
345 name
= 'latlon.geo.example.org.'
346 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
347 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
348 expected
= dns
.rrset
.from_text(name
, 0,
349 dns
.rdataclass
.IN
, 'TXT',
350 '"47.913000 -122.304200"')
352 res
= self
.sendUDPQuery(query
)
353 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
354 self
.assertRRsetInAnswer(res
, expected
)
356 def testLatlonloc(self
):
358 Basic latlonloc() test
360 name
= 'latlonloc.geo.example.org.'
361 expected
= dns
.rrset
.from_text(name
, 0,dns
.rdataclass
.IN
, 'TXT',
362 '"47 54 46.8 N 122 18 15.12 W 0.00m 1.00m 10000.00m 10.00m"')
363 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.0', 24)
364 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
366 res
= self
.sendUDPQuery(query
)
367 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
368 self
.assertRRsetInAnswer(res
, expected
)
370 def testWildcardError(self
):
372 Ensure errors coming from LUA wildcards are reported
374 query
= dns
.message
.make_query('failure.magic.example.org', 'A')
376 res
= self
.sendUDPQuery(query
)
377 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
378 self
.assertAnswerEmpty(res
)
380 def testClosestMagic(self
):
382 Basic closestMagic() test
384 name
= 'www-balanced.example.org.'
385 cname
= '1-1-1-3.17-1-2-4.1-2-3-5.magic.example.org.'
387 ('1.1.1.0', 24, '1.1.1.3'),
388 ('1.2.3.0', 24, '1.2.3.5'),
389 ('17.1.0.0', 16, '17.1.2.4')
392 for (subnet
, mask
, ip
) in queries
:
393 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
394 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
396 response
= dns
.message
.make_response(query
)
398 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.CNAME
, cname
))
399 response
.answer
.append(dns
.rrset
.from_text(cname
, 0, dns
.rdataclass
.IN
, 'A', ip
))
401 res
= self
.sendUDPQuery(query
)
402 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
403 self
.assertEqual(res
.answer
, response
.answer
)
410 ('1.1.1.0', 24, '"true"'),
411 ('1.2.3.0', 24, '"false"'),
412 ('17.1.0.0', 16, '"false"')
414 name
= 'asnum.geo.example.org.'
415 for (subnet
, mask
, txt
) in queries
:
416 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
417 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
418 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
420 res
= self
.sendUDPQuery(query
)
421 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
422 self
.assertRRsetInAnswer(res
, expected
)
424 def testCountry(self
):
429 ('1.1.1.0', 24, '"false"'),
430 ('1.2.3.0', 24, '"true"'),
431 ('17.1.0.0', 16, '"false"')
433 name
= 'country.geo.example.org.'
434 for (subnet
, mask
, txt
) in queries
:
435 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
436 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
437 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
439 res
= self
.sendUDPQuery(query
)
440 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
441 self
.assertRRsetInAnswer(res
, expected
)
443 def testContinent(self
):
445 Basic continent() test
448 ('1.1.1.0', 24, '"false"'),
449 ('1.2.3.0', 24, '"true"'),
450 ('17.1.0.0', 16, '"false"')
452 name
= 'continent.geo.example.org.'
453 for (subnet
, mask
, txt
) in queries
:
454 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
455 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
456 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', txt
)
458 res
= self
.sendUDPQuery(query
)
459 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
460 self
.assertRRsetInAnswer(res
, expected
)
462 def testClosest(self
):
464 Basic pickclosest() test
467 ('1.1.1.0', 24, '1.1.1.2'),
468 ('1.2.3.0', 24, '1.2.3.4'),
469 ('17.1.0.0', 16, '1.1.1.2')
471 name
= 'closest.geo.example.org.'
472 for (subnet
, mask
, ip
) in queries
:
473 ecso
= clientsubnetoption
.ClientSubnetOption(subnet
, mask
)
474 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
475 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', ip
)
477 res
= self
.sendUDPQuery(query
)
478 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
479 self
.assertRRsetInAnswer(res
, expected
)
481 def testNetmask(self
):
487 'expected': dns
.rrset
.from_text('true.netmask.example.org.', 0,
488 dns
.rdataclass
.IN
, 'TXT',
490 'query': dns
.message
.make_query('true.netmask.example.org', 'TXT')
493 'expected': dns
.rrset
.from_text('false.netmask.example.org.', 0,
494 dns
.rdataclass
.IN
, 'TXT',
496 'query': dns
.message
.make_query('false.netmask.example.org', 'TXT')
499 for query
in queries
:
500 res
= self
.sendUDPQuery(query
['query'])
501 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
502 self
.assertRRsetInAnswer(res
, query
['expected'])
510 'expected': dns
.rrset
.from_text('view.example.org.', 0,
511 dns
.rdataclass
.IN
, 'A',
512 '{prefix}.54'.format(prefix
=self
._PREFIX
)),
513 'query': dns
.message
.make_query('view.example.org', 'A')
516 'expected': dns
.rrset
.from_text('txt.view.example.org.', 0,
517 dns
.rdataclass
.IN
, 'TXT',
519 'query': dns
.message
.make_query('txt.view.example.org', 'TXT')
522 for query
in queries
:
523 res
= self
.sendUDPQuery(query
['query'])
524 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
525 self
.assertRRsetInAnswer(res
, query
['expected'])
527 def testViewNoMatch(self
):
529 view() test where no netmask match
531 query
= dns
.message
.make_query('none.view.example.org', 'A')
533 res
= self
.sendUDPQuery(query
)
534 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
535 self
.assertAnswerEmpty(res
)
537 def testWHashed(self
):
539 Basic pickwhashed() test with a set of A records
540 As the `bestwho` is hashed, we should always get the same answer
542 expected
= [dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '1.2.3.4'),
543 dns
.rrset
.from_text('whashed.example.org.', 0, dns
.rdataclass
.IN
, 'A', '4.3.2.1')]
544 query
= dns
.message
.make_query('whashed.example.org', 'A')
546 first
= self
.sendUDPQuery(query
)
547 self
.assertRcodeEqual(first
, dns
.rcode
.NOERROR
)
548 self
.assertAnyRRsetInAnswer(first
, expected
)
550 res
= self
.sendUDPQuery(query
)
551 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
552 self
.assertRRsetInAnswer(res
, first
.answer
[0])
554 def testTimeout(self
):
556 Test if LUA scripts are aborted if script execution takes too long
558 query
= dns
.message
.make_query('timeout.example.org', 'A')
560 first
= self
.sendUDPQuery(query
)
561 self
.assertRcodeEqual(first
, dns
.rcode
.SERVFAIL
)
566 Test A query against `any`
568 name
= 'any.example.org.'
570 query
= dns
.message
.make_query(name
, 'A')
572 response
= dns
.message
.make_response(query
)
574 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
576 res
= self
.sendUDPQuery(query
)
577 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
578 self
.assertEqual(res
.answer
, response
.answer
)
582 Test ANY query against `any`
585 name
= 'any.example.org.'
587 query
= dns
.message
.make_query(name
, 'ANY')
589 response
= dns
.message
.make_response(query
)
591 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '192.0.2.1'))
592 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'TXT', '"hello there"'))
594 res
= self
.sendUDPQuery(query
)
595 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
596 self
.assertEqual(self
.sortRRsets(res
.answer
), self
.sortRRsets(response
.answer
))
598 def testResolve(self
):
600 Test resolve() function
602 name
= 'resolve.example.org.'
604 query
= dns
.message
.make_query(name
, 'A')
606 response
= dns
.message
.make_response(query
)
608 response
.answer
.append(dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, '127.0.0.1'))
610 res
= self
.sendUDPQuery(query
)
611 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
612 self
.assertEqual(res
.answer
, response
.answer
)
614 if __name__
== '__main__':