3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
13 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
14 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
15 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
18 newServer{address="127.0.0.1:%s"}
21 def testSpoofActionA(self
):
23 Spoofing: Spoof A via Action
25 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
26 check that dnsdist sends a spoofed result.
28 name
= 'spoofaction.spoofing.tests.powerdns.com.'
29 query
= dns
.message
.make_query(name
, 'A', 'IN')
30 # dnsdist set RA = RD for spoofed responses
31 query
.flags
&= ~dns
.flags
.RD
32 expectedResponse
= dns
.message
.make_response(query
)
33 rrset
= dns
.rrset
.from_text(name
,
38 expectedResponse
.answer
.append(rrset
)
40 for method
in ("sendUDPQuery", "sendTCPQuery"):
41 sender
= getattr(self
, method
)
42 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
43 self
.assertTrue(receivedResponse
)
44 self
.assertEquals(expectedResponse
, receivedResponse
)
46 def testSpoofActionAAAA(self
):
48 Spoofing: Spoof AAAA via Action
50 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
51 check that dnsdist sends a spoofed result.
53 name
= 'spoofaction.spoofing.tests.powerdns.com.'
54 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
55 # dnsdist set RA = RD for spoofed responses
56 query
.flags
&= ~dns
.flags
.RD
57 expectedResponse
= dns
.message
.make_response(query
)
58 rrset
= dns
.rrset
.from_text(name
,
63 expectedResponse
.answer
.append(rrset
)
65 for method
in ("sendUDPQuery", "sendTCPQuery"):
66 sender
= getattr(self
, method
)
67 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
68 self
.assertTrue(receivedResponse
)
69 self
.assertEquals(expectedResponse
, receivedResponse
)
71 def testSpoofActionCNAME(self
):
73 Spoofing: Spoof CNAME via Action
75 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
76 check that dnsdist sends a spoofed result.
78 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
79 query
= dns
.message
.make_query(name
, 'A', 'IN')
80 # dnsdist set RA = RD for spoofed responses
81 query
.flags
&= ~dns
.flags
.RD
82 expectedResponse
= dns
.message
.make_response(query
)
83 rrset
= dns
.rrset
.from_text(name
,
87 'cnameaction.spoofing.tests.powerdns.com.')
88 expectedResponse
.answer
.append(rrset
)
90 for method
in ("sendUDPQuery", "sendTCPQuery"):
91 sender
= getattr(self
, method
)
92 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
93 self
.assertTrue(receivedResponse
)
94 self
.assertEquals(expectedResponse
, receivedResponse
)
96 def testSpoofActionMultiA(self
):
98 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
100 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
101 check that dnsdist sends a spoofed result.
103 name
= 'multispoof.spoofing.tests.powerdns.com.'
104 query
= dns
.message
.make_query(name
, 'A', 'IN')
105 # dnsdist set RA = RD for spoofed responses
106 query
.flags
&= ~dns
.flags
.RD
107 expectedResponse
= dns
.message
.make_response(query
)
108 rrset
= dns
.rrset
.from_text(name
,
112 '192.0.2.2', '192.0.2.1')
113 expectedResponse
.answer
.append(rrset
)
115 for method
in ("sendUDPQuery", "sendTCPQuery"):
116 sender
= getattr(self
, method
)
117 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
118 self
.assertTrue(receivedResponse
)
119 self
.assertEquals(expectedResponse
, receivedResponse
)
121 def testSpoofActionMultiAAAA(self
):
123 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
125 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
126 check that dnsdist sends a spoofed result.
128 name
= 'multispoof.spoofing.tests.powerdns.com.'
129 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
130 # dnsdist set RA = RD for spoofed responses
131 query
.flags
&= ~dns
.flags
.RD
132 expectedResponse
= dns
.message
.make_response(query
)
133 rrset
= dns
.rrset
.from_text(name
,
137 '2001:DB8::1', '2001:DB8::2')
138 expectedResponse
.answer
.append(rrset
)
140 for method
in ("sendUDPQuery", "sendTCPQuery"):
141 sender
= getattr(self
, method
)
142 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
143 self
.assertTrue(receivedResponse
)
144 self
.assertEquals(expectedResponse
, receivedResponse
)
146 def testSpoofActionMultiANY(self
):
148 Spoofing: Spoof multiple addresses via AddDomainSpoof
150 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
151 check that dnsdist sends a spoofed result.
153 name
= 'multispoof.spoofing.tests.powerdns.com.'
154 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
155 # dnsdist set RA = RD for spoofed responses
156 query
.flags
&= ~dns
.flags
.RD
157 expectedResponse
= dns
.message
.make_response(query
)
159 rrset
= dns
.rrset
.from_text(name
,
163 '192.0.2.2', '192.0.2.1')
164 expectedResponse
.answer
.append(rrset
)
166 rrset
= dns
.rrset
.from_text(name
,
170 '2001:DB8::1', '2001:DB8::2')
171 expectedResponse
.answer
.append(rrset
)
173 for method
in ("sendUDPQuery", "sendTCPQuery"):
174 sender
= getattr(self
, method
)
175 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
176 self
.assertTrue(receivedResponse
)
177 self
.assertEquals(expectedResponse
, receivedResponse
)
179 def testSpoofActionSetAA(self
):
181 Spoofing: Spoof via Action, setting AA=1
183 name
= 'spoofaction-aa.spoofing.tests.powerdns.com.'
184 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
185 # dnsdist set RA = RD for spoofed responses
186 query
.flags
&= ~dns
.flags
.RD
187 expectedResponse
= dns
.message
.make_response(query
)
188 expectedResponse
.flags |
= dns
.flags
.AA
189 rrset
= dns
.rrset
.from_text(name
,
194 expectedResponse
.answer
.append(rrset
)
196 for method
in ("sendUDPQuery", "sendTCPQuery"):
197 sender
= getattr(self
, method
)
198 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
199 self
.assertTrue(receivedResponse
)
200 self
.assertEquals(expectedResponse
, receivedResponse
)
201 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
203 def testSpoofActionSetAD(self
):
205 Spoofing: Spoof via Action, setting AD=1
207 name
= 'spoofaction-ad.spoofing.tests.powerdns.com.'
208 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
209 # dnsdist set RA = RD for spoofed responses
210 query
.flags
&= ~dns
.flags
.RD
211 expectedResponse
= dns
.message
.make_response(query
)
212 expectedResponse
.flags |
= dns
.flags
.AD
213 rrset
= dns
.rrset
.from_text(name
,
218 expectedResponse
.answer
.append(rrset
)
220 for method
in ("sendUDPQuery", "sendTCPQuery"):
221 sender
= getattr(self
, method
)
222 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
223 self
.assertTrue(receivedResponse
)
224 self
.assertEquals(expectedResponse
, receivedResponse
)
225 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
227 def testSpoofActionSetRA(self
):
229 Spoofing: Spoof via Action, setting RA=1
231 name
= 'spoofaction-ra.spoofing.tests.powerdns.com.'
232 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
233 # dnsdist set RA = RD for spoofed responses
234 query
.flags
&= ~dns
.flags
.RD
235 expectedResponse
= dns
.message
.make_response(query
)
236 expectedResponse
.flags |
= dns
.flags
.RA
237 rrset
= dns
.rrset
.from_text(name
,
242 expectedResponse
.answer
.append(rrset
)
244 for method
in ("sendUDPQuery", "sendTCPQuery"):
245 sender
= getattr(self
, method
)
246 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
247 self
.assertTrue(receivedResponse
)
248 self
.assertEquals(expectedResponse
, receivedResponse
)
249 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
251 def testSpoofActionSetNoRA(self
):
253 Spoofing: Spoof via Action, setting RA=0
255 name
= 'spoofaction-nora.spoofing.tests.powerdns.com.'
256 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
257 expectedResponse
= dns
.message
.make_response(query
)
258 expectedResponse
.flags
&= ~dns
.flags
.RA
259 rrset
= dns
.rrset
.from_text(name
,
264 expectedResponse
.answer
.append(rrset
)
266 for method
in ("sendUDPQuery", "sendTCPQuery"):
267 sender
= getattr(self
, method
)
268 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
269 self
.assertTrue(receivedResponse
)
270 self
.assertEquals(expectedResponse
, receivedResponse
)
271 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
273 def testSpoofRawAction(self
):
275 Spoofing: Spoof a response from raw bytes
277 name
= 'raw.spoofing.tests.powerdns.com.'
280 query
= dns
.message
.make_query(name
, 'A', 'IN')
281 query
.flags
&= ~dns
.flags
.RD
282 expectedResponse
= dns
.message
.make_response(query
)
283 expectedResponse
.flags
&= ~dns
.flags
.AA
284 rrset
= dns
.rrset
.from_text(name
,
289 expectedResponse
.answer
.append(rrset
)
291 for method
in ("sendUDPQuery", "sendTCPQuery"):
292 sender
= getattr(self
, method
)
293 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
294 self
.assertTrue(receivedResponse
)
295 self
.assertEquals(expectedResponse
, receivedResponse
)
296 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
299 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
300 query
.flags
&= ~dns
.flags
.RD
301 expectedResponse
= dns
.message
.make_response(query
)
302 expectedResponse
.flags
&= ~dns
.flags
.AA
303 rrset
= dns
.rrset
.from_text(name
,
307 '"aaa" "bbbb" "ccccccccccc"')
308 expectedResponse
.answer
.append(rrset
)
310 for method
in ("sendUDPQuery", "sendTCPQuery"):
311 sender
= getattr(self
, method
)
312 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
313 self
.assertTrue(receivedResponse
)
314 self
.assertEquals(expectedResponse
, receivedResponse
)
315 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
318 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
319 query
.flags
&= ~dns
.flags
.RD
320 expectedResponse
= dns
.message
.make_response(query
)
321 # this one should have the AA flag set
322 expectedResponse
.flags |
= dns
.flags
.AA
323 rrset
= dns
.rrset
.from_text(name
,
327 '0 0 65535 srv.powerdns.com.')
328 expectedResponse
.answer
.append(rrset
)
330 for method
in ("sendUDPQuery", "sendTCPQuery"):
331 sender
= getattr(self
, method
)
332 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
333 self
.assertTrue(receivedResponse
)
334 self
.assertEquals(expectedResponse
, receivedResponse
)
335 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 3600)
337 class TestSpoofingLuaSpoof(DNSDistTest
):
339 _config_template
= """
340 function spoof1rule(dq)
343 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
344 elseif(dq.qtype == 28) -- AAAA
346 return DNSAction.Spoof, "2001:DB8::1"
348 return DNSAction.None, ""
352 function spoof2rule(dq)
353 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
356 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
357 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
359 function spoofrawrule(dq)
360 if dq.qtype == DNSQType.A then
361 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
362 elseif dq.qtype == DNSQType.TXT then
363 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
364 elseif dq.qtype == DNSQType.SRV then
366 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
368 return DNSAction.None, ""
371 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
372 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
373 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
374 newServer{address="127.0.0.1:%s"}
377 def testLuaSpoofA(self
):
379 Spoofing: Spoofing an A via Lua
381 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
382 check that dnsdist sends a spoofed result.
384 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
385 query
= dns
.message
.make_query(name
, 'A', 'IN')
386 # dnsdist set RA = RD for spoofed responses
387 query
.flags
&= ~dns
.flags
.RD
388 expectedResponse
= dns
.message
.make_response(query
)
389 rrset
= dns
.rrset
.from_text(name
,
393 '192.0.2.1', '192.0.2.2')
394 expectedResponse
.answer
.append(rrset
)
396 for method
in ("sendUDPQuery", "sendTCPQuery"):
397 sender
= getattr(self
, method
)
398 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
399 self
.assertTrue(receivedResponse
)
400 self
.assertEquals(expectedResponse
, receivedResponse
)
402 def testLuaSpoofAAAA(self
):
404 Spoofing: Spoofing an AAAA via Lua
406 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
407 check that dnsdist sends a spoofed result.
409 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
410 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
411 # dnsdist set RA = RD for spoofed responses
412 query
.flags
&= ~dns
.flags
.RD
413 expectedResponse
= dns
.message
.make_response(query
)
414 rrset
= dns
.rrset
.from_text(name
,
419 expectedResponse
.answer
.append(rrset
)
421 for method
in ("sendUDPQuery", "sendTCPQuery"):
422 sender
= getattr(self
, method
)
423 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
424 self
.assertTrue(receivedResponse
)
425 self
.assertEquals(expectedResponse
, receivedResponse
)
427 def testLuaSpoofAWithCNAME(self
):
429 Spoofing: Spoofing an A with a CNAME via Lua
431 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
432 check that dnsdist sends a spoofed result.
434 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
435 query
= dns
.message
.make_query(name
, 'A', 'IN')
436 # dnsdist set RA = RD for spoofed responses
437 query
.flags
&= ~dns
.flags
.RD
438 expectedResponse
= dns
.message
.make_response(query
)
439 rrset
= dns
.rrset
.from_text(name
,
443 'spoofedcname.spoofing.tests.powerdns.com.')
444 expectedResponse
.answer
.append(rrset
)
446 for method
in ("sendUDPQuery", "sendTCPQuery"):
447 sender
= getattr(self
, method
)
448 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
449 self
.assertTrue(receivedResponse
)
450 self
.assertEquals(expectedResponse
, receivedResponse
)
452 def testLuaSpoofAAAAWithCNAME(self
):
454 Spoofing: Spoofing an AAAA with a CNAME via Lua
456 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
457 check that dnsdist sends a spoofed result.
459 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
460 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
461 # dnsdist set RA = RD for spoofed responses
462 query
.flags
&= ~dns
.flags
.RD
463 expectedResponse
= dns
.message
.make_response(query
)
464 rrset
= dns
.rrset
.from_text(name
,
468 'spoofedcname.spoofing.tests.powerdns.com.')
469 expectedResponse
.answer
.append(rrset
)
471 for method
in ("sendUDPQuery", "sendTCPQuery"):
472 sender
= getattr(self
, method
)
473 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
474 self
.assertTrue(receivedResponse
)
475 self
.assertEquals(expectedResponse
, receivedResponse
)
477 def testLuaSpoofRawAction(self
):
479 Spoofing: Spoof a response from raw bytes via Lua
481 name
= 'lua-raw.spoofing.tests.powerdns.com.'
484 query
= dns
.message
.make_query(name
, 'A', 'IN')
485 query
.flags
&= ~dns
.flags
.RD
486 expectedResponse
= dns
.message
.make_response(query
)
487 expectedResponse
.flags
&= ~dns
.flags
.AA
488 rrset
= dns
.rrset
.from_text(name
,
493 expectedResponse
.answer
.append(rrset
)
495 for method
in ("sendUDPQuery", "sendTCPQuery"):
496 sender
= getattr(self
, method
)
497 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
498 self
.assertTrue(receivedResponse
)
499 self
.assertEquals(expectedResponse
, receivedResponse
)
500 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
503 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
504 query
.flags
&= ~dns
.flags
.RD
505 expectedResponse
= dns
.message
.make_response(query
)
506 expectedResponse
.flags
&= ~dns
.flags
.AA
507 rrset
= dns
.rrset
.from_text(name
,
511 '"aaa" "bbbb" "ccccccccccc"')
512 expectedResponse
.answer
.append(rrset
)
514 for method
in ("sendUDPQuery", "sendTCPQuery"):
515 sender
= getattr(self
, method
)
516 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
517 self
.assertTrue(receivedResponse
)
518 self
.assertEquals(expectedResponse
, receivedResponse
)
519 self
.assertEquals(receivedResponse
.answer
[0].ttl
, 60)
522 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
523 query
.flags
&= ~dns
.flags
.RD
524 expectedResponse
= dns
.message
.make_response(query
)
525 # this one should have the AA flag set
526 expectedResponse
.flags |
= dns
.flags
.AA
527 rrset
= dns
.rrset
.from_text(name
,
531 '0 0 65535 srv.powerdns.com.')
532 expectedResponse
.answer
.append(rrset
)
534 for method
in ("sendUDPQuery", "sendTCPQuery"):
535 sender
= getattr(self
, method
)
536 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
537 self
.assertTrue(receivedResponse
)
538 self
.assertEquals(expectedResponse
, receivedResponse
)
539 # sorry, we can't set the TTL from the Lua API right now
540 #self.assertEquals(receivedResponse.answer[0].ttl, 3600)
542 class TestSpoofingLuaWithStatistics(DNSDistTest
):
544 _config_template
= """
545 function spoof1rule(dq)
546 queriesCount = getStatisticsCounters()['queries']
547 if(queriesCount == 1) then
548 return DNSAction.Spoof, "192.0.2.1"
549 elseif(queriesCount == 2) then
550 return DNSAction.Spoof, "192.0.2.2"
552 return DNSAction.Spoof, "192.0.2.0"
555 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
556 newServer{address="127.0.0.1:%s"}
559 def testLuaSpoofBasedOnStatistics(self
):
561 Spoofing: Spoofing an A via Lua based on statistics counters
564 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
565 query
= dns
.message
.make_query(name
, 'A', 'IN')
566 # dnsdist set RA = RD for spoofed responses
567 query
.flags
&= ~dns
.flags
.RD
568 expectedResponse1
= dns
.message
.make_response(query
)
569 rrset
= dns
.rrset
.from_text(name
,
574 expectedResponse1
.answer
.append(rrset
)
575 expectedResponse2
= dns
.message
.make_response(query
)
576 rrset
= dns
.rrset
.from_text(name
,
581 expectedResponse2
.answer
.append(rrset
)
582 expectedResponseAfterwards
= dns
.message
.make_response(query
)
583 rrset
= dns
.rrset
.from_text(name
,
588 expectedResponseAfterwards
.answer
.append(rrset
)
590 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
591 self
.assertTrue(receivedResponse
)
592 self
.assertEquals(expectedResponse1
, receivedResponse
)
594 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
595 self
.assertTrue(receivedResponse
)
596 self
.assertEquals(expectedResponse2
, receivedResponse
)
598 for method
in ("sendUDPQuery", "sendTCPQuery"):
599 sender
= getattr(self
, method
)
600 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
601 self
.assertTrue(receivedResponse
)
602 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)