3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(makeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
20 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
21 newServer{address="127.0.0.1:%s"}
24 def testSpoofActionA(self
):
26 Spoofing: Spoof A via Action
28 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
29 check that dnsdist sends a spoofed result.
31 name
= 'spoofaction.spoofing.tests.powerdns.com.'
32 query
= dns
.message
.make_query(name
, 'A', 'IN')
33 # dnsdist set RA = RD for spoofed responses
34 query
.flags
&= ~dns
.flags
.RD
35 expectedResponse
= dns
.message
.make_response(query
)
36 rrset
= dns
.rrset
.from_text(name
,
41 expectedResponse
.answer
.append(rrset
)
43 for method
in ("sendUDPQuery", "sendTCPQuery"):
44 sender
= getattr(self
, method
)
45 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
46 self
.assertTrue(receivedResponse
)
47 self
.assertEqual(expectedResponse
, receivedResponse
)
49 def testSpoofActionAAAA(self
):
51 Spoofing: Spoof AAAA via Action
53 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
54 check that dnsdist sends a spoofed result.
56 name
= 'spoofaction.spoofing.tests.powerdns.com.'
57 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
58 # dnsdist set RA = RD for spoofed responses
59 query
.flags
&= ~dns
.flags
.RD
60 expectedResponse
= dns
.message
.make_response(query
)
61 rrset
= dns
.rrset
.from_text(name
,
66 expectedResponse
.answer
.append(rrset
)
68 for method
in ("sendUDPQuery", "sendTCPQuery"):
69 sender
= getattr(self
, method
)
70 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
71 self
.assertTrue(receivedResponse
)
72 self
.assertEqual(expectedResponse
, receivedResponse
)
74 def testSpoofActionCNAME(self
):
76 Spoofing: Spoof CNAME via Action
78 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
79 check that dnsdist sends a spoofed result.
81 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
82 query
= dns
.message
.make_query(name
, 'A', 'IN')
83 # dnsdist set RA = RD for spoofed responses
84 query
.flags
&= ~dns
.flags
.RD
85 expectedResponse
= dns
.message
.make_response(query
)
86 rrset
= dns
.rrset
.from_text(name
,
90 'cnameaction.spoofing.tests.powerdns.com.')
91 expectedResponse
.answer
.append(rrset
)
93 for method
in ("sendUDPQuery", "sendTCPQuery"):
94 sender
= getattr(self
, method
)
95 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
96 self
.assertTrue(receivedResponse
)
97 self
.assertEqual(expectedResponse
, receivedResponse
)
99 def testSpoofActionMultiA(self
):
101 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
103 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
104 check that dnsdist sends a spoofed result.
106 name
= 'multispoof.spoofing.tests.powerdns.com.'
107 query
= dns
.message
.make_query(name
, 'A', 'IN')
108 # dnsdist set RA = RD for spoofed responses
109 query
.flags
&= ~dns
.flags
.RD
110 expectedResponse
= dns
.message
.make_response(query
)
111 rrset
= dns
.rrset
.from_text(name
,
115 '192.0.2.2', '192.0.2.1')
116 expectedResponse
.answer
.append(rrset
)
118 for method
in ("sendUDPQuery", "sendTCPQuery"):
119 sender
= getattr(self
, method
)
120 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
121 self
.assertTrue(receivedResponse
)
122 self
.assertEqual(expectedResponse
, receivedResponse
)
124 def testSpoofActionMultiAAAA(self
):
126 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
128 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
129 check that dnsdist sends a spoofed result.
131 name
= 'multispoof.spoofing.tests.powerdns.com.'
132 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
133 # dnsdist set RA = RD for spoofed responses
134 query
.flags
&= ~dns
.flags
.RD
135 expectedResponse
= dns
.message
.make_response(query
)
136 rrset
= dns
.rrset
.from_text(name
,
140 '2001:DB8::1', '2001:DB8::2')
141 expectedResponse
.answer
.append(rrset
)
143 for method
in ("sendUDPQuery", "sendTCPQuery"):
144 sender
= getattr(self
, method
)
145 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
146 self
.assertTrue(receivedResponse
)
147 self
.assertEqual(expectedResponse
, receivedResponse
)
149 def testSpoofActionMultiANY(self
):
151 Spoofing: Spoof multiple addresses via AddDomainSpoof
153 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
154 check that dnsdist sends a spoofed result.
156 name
= 'multispoof.spoofing.tests.powerdns.com.'
157 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
158 # dnsdist set RA = RD for spoofed responses
159 query
.flags
&= ~dns
.flags
.RD
160 expectedResponse
= dns
.message
.make_response(query
)
162 rrset
= dns
.rrset
.from_text(name
,
166 '192.0.2.2', '192.0.2.1')
167 expectedResponse
.answer
.append(rrset
)
169 rrset
= dns
.rrset
.from_text(name
,
173 '2001:DB8::1', '2001:DB8::2')
174 expectedResponse
.answer
.append(rrset
)
176 for method
in ("sendUDPQuery", "sendTCPQuery"):
177 sender
= getattr(self
, method
)
178 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
179 self
.assertTrue(receivedResponse
)
180 self
.assertEqual(expectedResponse
, receivedResponse
)
182 def testSpoofActionSetAA(self
):
184 Spoofing: Spoof via Action, setting AA=1
186 name
= 'spoofaction-aa.spoofing.tests.powerdns.com.'
187 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
188 # dnsdist set RA = RD for spoofed responses
189 query
.flags
&= ~dns
.flags
.RD
190 expectedResponse
= dns
.message
.make_response(query
)
191 expectedResponse
.flags |
= dns
.flags
.AA
192 rrset
= dns
.rrset
.from_text(name
,
197 expectedResponse
.answer
.append(rrset
)
199 for method
in ("sendUDPQuery", "sendTCPQuery"):
200 sender
= getattr(self
, method
)
201 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
202 self
.assertTrue(receivedResponse
)
203 self
.assertEqual(expectedResponse
, receivedResponse
)
204 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
206 def testSpoofActionSetAD(self
):
208 Spoofing: Spoof via Action, setting AD=1
210 name
= 'spoofaction-ad.spoofing.tests.powerdns.com.'
211 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
212 # dnsdist set RA = RD for spoofed responses
213 query
.flags
&= ~dns
.flags
.RD
214 expectedResponse
= dns
.message
.make_response(query
)
215 expectedResponse
.flags |
= dns
.flags
.AD
216 rrset
= dns
.rrset
.from_text(name
,
221 expectedResponse
.answer
.append(rrset
)
223 for method
in ("sendUDPQuery", "sendTCPQuery"):
224 sender
= getattr(self
, method
)
225 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
226 self
.assertTrue(receivedResponse
)
227 self
.assertEqual(expectedResponse
, receivedResponse
)
228 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
230 def testSpoofActionSetRA(self
):
232 Spoofing: Spoof via Action, setting RA=1
234 name
= 'spoofaction-ra.spoofing.tests.powerdns.com.'
235 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
236 # dnsdist set RA = RD for spoofed responses
237 query
.flags
&= ~dns
.flags
.RD
238 expectedResponse
= dns
.message
.make_response(query
)
239 expectedResponse
.flags |
= dns
.flags
.RA
240 rrset
= dns
.rrset
.from_text(name
,
245 expectedResponse
.answer
.append(rrset
)
247 for method
in ("sendUDPQuery", "sendTCPQuery"):
248 sender
= getattr(self
, method
)
249 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
250 self
.assertTrue(receivedResponse
)
251 self
.assertEqual(expectedResponse
, receivedResponse
)
252 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
254 def testSpoofActionSetNoRA(self
):
256 Spoofing: Spoof via Action, setting RA=0
258 name
= 'spoofaction-nora.spoofing.tests.powerdns.com.'
259 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
260 expectedResponse
= dns
.message
.make_response(query
)
261 expectedResponse
.flags
&= ~dns
.flags
.RA
262 rrset
= dns
.rrset
.from_text(name
,
267 expectedResponse
.answer
.append(rrset
)
269 for method
in ("sendUDPQuery", "sendTCPQuery"):
270 sender
= getattr(self
, method
)
271 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
272 self
.assertTrue(receivedResponse
)
273 self
.assertEqual(expectedResponse
, receivedResponse
)
274 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
276 def testSpoofActionSetTTL(self
):
278 Spoofing: Spoof via Action, setting the TTL to 1500
280 name
= 'spoofaction-ttl.spoofing.tests.powerdns.com.'
281 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
282 expectedResponse
= dns
.message
.make_response(query
)
283 expectedResponse
.flags |
= dns
.flags
.RA
284 rrset
= dns
.rrset
.from_text(name
,
289 expectedResponse
.answer
.append(rrset
)
291 for method
in ("sendUDPQuery", "sendTCPQuery"):
292 sender
= getattr(self
, method
)
293 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
294 self
.assertTrue(receivedResponse
)
295 self
.assertEqual(expectedResponse
, receivedResponse
)
296 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 1500)
298 def testSpoofRawAction(self
):
300 Spoofing: Spoof a response from raw bytes
302 name
= 'raw.spoofing.tests.powerdns.com.'
305 query
= dns
.message
.make_query(name
, 'A', 'IN')
306 query
.flags
&= ~dns
.flags
.RD
307 expectedResponse
= dns
.message
.make_response(query
)
308 expectedResponse
.flags
&= ~dns
.flags
.AA
309 rrset
= dns
.rrset
.from_text(name
,
314 expectedResponse
.answer
.append(rrset
)
316 for method
in ("sendUDPQuery", "sendTCPQuery"):
317 sender
= getattr(self
, method
)
318 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
319 self
.assertTrue(receivedResponse
)
320 self
.assertEqual(expectedResponse
, receivedResponse
)
321 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
324 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
325 query
.flags
&= ~dns
.flags
.RD
326 expectedResponse
= dns
.message
.make_response(query
)
327 expectedResponse
.flags
&= ~dns
.flags
.AA
328 rrset
= dns
.rrset
.from_text(name
,
332 '"aaa" "bbbb" "ccccccccccc"')
333 expectedResponse
.answer
.append(rrset
)
335 for method
in ("sendUDPQuery", "sendTCPQuery"):
336 sender
= getattr(self
, method
)
337 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
338 self
.assertTrue(receivedResponse
)
339 self
.assertEqual(expectedResponse
, receivedResponse
)
340 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
343 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
344 query
.flags
&= ~dns
.flags
.RD
345 expectedResponse
= dns
.message
.make_response(query
)
346 # this one should have the AA flag set
347 expectedResponse
.flags |
= dns
.flags
.AA
348 rrset
= dns
.rrset
.from_text(name
,
352 '0 0 65535 srv.powerdns.com.')
353 expectedResponse
.answer
.append(rrset
)
355 for method
in ("sendUDPQuery", "sendTCPQuery"):
356 sender
= getattr(self
, method
)
357 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
358 self
.assertTrue(receivedResponse
)
359 self
.assertEqual(expectedResponse
, receivedResponse
)
360 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 3600)
362 def testSpoofRawActionMulti(self
):
364 Spoofing: Spoof a response from several raw bytes
366 name
= 'multiraw.spoofing.tests.powerdns.com.'
369 query
= dns
.message
.make_query(name
, 'A', 'IN')
370 query
.flags
&= ~dns
.flags
.RD
371 expectedResponse
= dns
.message
.make_response(query
)
372 expectedResponse
.flags
&= ~dns
.flags
.AA
373 rrset
= dns
.rrset
.from_text(name
,
377 '192.0.2.1', '192.0.2.2')
378 expectedResponse
.answer
.append(rrset
)
380 for method
in ("sendUDPQuery", "sendTCPQuery"):
381 sender
= getattr(self
, method
)
382 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
383 self
.assertTrue(receivedResponse
)
384 self
.assertEqual(expectedResponse
, receivedResponse
)
385 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
388 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
389 query
.flags
&= ~dns
.flags
.RD
390 expectedResponse
= dns
.message
.make_response(query
)
391 expectedResponse
.flags
&= ~dns
.flags
.AA
392 rrset
= dns
.rrset
.from_text(name
,
396 '"aaa" "bbbb"', '"ccccccccccc"')
397 expectedResponse
.answer
.append(rrset
)
399 for method
in ("sendUDPQuery", "sendTCPQuery"):
400 sender
= getattr(self
, method
)
401 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
402 self
.assertTrue(receivedResponse
)
403 self
.assertEqual(expectedResponse
, receivedResponse
)
404 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
406 class TestSpoofingLuaSpoof(DNSDistTest
):
408 _config_template
= """
409 function spoof1rule(dq)
412 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
413 elseif(dq.qtype == 28) -- AAAA
415 return DNSAction.Spoof, "2001:DB8::1"
417 return DNSAction.None, ""
421 function spoof2rule(dq)
422 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
425 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
426 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
428 function spoofrawrule(dq)
429 if dq.qtype == DNSQType.A then
430 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
431 elseif dq.qtype == DNSQType.TXT then
432 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
433 elseif dq.qtype == DNSQType.SRV then
435 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
437 return DNSAction.None, ""
440 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
441 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
442 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
443 newServer{address="127.0.0.1:%s"}
446 def testLuaSpoofA(self
):
448 Spoofing: Spoofing an A via Lua
450 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
451 check that dnsdist sends a spoofed result.
453 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
454 query
= dns
.message
.make_query(name
, 'A', 'IN')
455 # dnsdist set RA = RD for spoofed responses
456 query
.flags
&= ~dns
.flags
.RD
457 expectedResponse
= dns
.message
.make_response(query
)
458 rrset
= dns
.rrset
.from_text(name
,
462 '192.0.2.1', '192.0.2.2')
463 expectedResponse
.answer
.append(rrset
)
465 for method
in ("sendUDPQuery", "sendTCPQuery"):
466 sender
= getattr(self
, method
)
467 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
468 self
.assertTrue(receivedResponse
)
469 self
.assertEqual(expectedResponse
, receivedResponse
)
471 def testLuaSpoofAAAA(self
):
473 Spoofing: Spoofing an AAAA via Lua
475 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
476 check that dnsdist sends a spoofed result.
478 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
479 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
480 # dnsdist set RA = RD for spoofed responses
481 query
.flags
&= ~dns
.flags
.RD
482 expectedResponse
= dns
.message
.make_response(query
)
483 rrset
= dns
.rrset
.from_text(name
,
488 expectedResponse
.answer
.append(rrset
)
490 for method
in ("sendUDPQuery", "sendTCPQuery"):
491 sender
= getattr(self
, method
)
492 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
493 self
.assertTrue(receivedResponse
)
494 self
.assertEqual(expectedResponse
, receivedResponse
)
496 def testLuaSpoofAWithCNAME(self
):
498 Spoofing: Spoofing an A with a CNAME via Lua
500 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
501 check that dnsdist sends a spoofed result.
503 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
504 query
= dns
.message
.make_query(name
, 'A', 'IN')
505 # dnsdist set RA = RD for spoofed responses
506 query
.flags
&= ~dns
.flags
.RD
507 expectedResponse
= dns
.message
.make_response(query
)
508 rrset
= dns
.rrset
.from_text(name
,
512 'spoofedcname.spoofing.tests.powerdns.com.')
513 expectedResponse
.answer
.append(rrset
)
515 for method
in ("sendUDPQuery", "sendTCPQuery"):
516 sender
= getattr(self
, method
)
517 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
518 self
.assertTrue(receivedResponse
)
519 self
.assertEqual(expectedResponse
, receivedResponse
)
521 def testLuaSpoofAAAAWithCNAME(self
):
523 Spoofing: Spoofing an AAAA with a CNAME via Lua
525 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
526 check that dnsdist sends a spoofed result.
528 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
529 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
530 # dnsdist set RA = RD for spoofed responses
531 query
.flags
&= ~dns
.flags
.RD
532 expectedResponse
= dns
.message
.make_response(query
)
533 rrset
= dns
.rrset
.from_text(name
,
537 'spoofedcname.spoofing.tests.powerdns.com.')
538 expectedResponse
.answer
.append(rrset
)
540 for method
in ("sendUDPQuery", "sendTCPQuery"):
541 sender
= getattr(self
, method
)
542 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
543 self
.assertTrue(receivedResponse
)
544 self
.assertEqual(expectedResponse
, receivedResponse
)
546 def testLuaSpoofRawAction(self
):
548 Spoofing: Spoof a response from raw bytes via Lua
550 name
= 'lua-raw.spoofing.tests.powerdns.com.'
553 query
= dns
.message
.make_query(name
, 'A', 'IN')
554 query
.flags
&= ~dns
.flags
.RD
555 expectedResponse
= dns
.message
.make_response(query
)
556 expectedResponse
.flags
&= ~dns
.flags
.AA
557 rrset
= dns
.rrset
.from_text(name
,
562 expectedResponse
.answer
.append(rrset
)
564 for method
in ("sendUDPQuery", "sendTCPQuery"):
565 sender
= getattr(self
, method
)
566 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
567 self
.assertTrue(receivedResponse
)
568 self
.assertEqual(expectedResponse
, receivedResponse
)
569 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
572 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
573 query
.flags
&= ~dns
.flags
.RD
574 expectedResponse
= dns
.message
.make_response(query
)
575 expectedResponse
.flags
&= ~dns
.flags
.AA
576 rrset
= dns
.rrset
.from_text(name
,
580 '"aaa" "bbbb" "ccccccccccc"')
581 expectedResponse
.answer
.append(rrset
)
583 for method
in ("sendUDPQuery", "sendTCPQuery"):
584 sender
= getattr(self
, method
)
585 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
586 self
.assertTrue(receivedResponse
)
587 self
.assertEqual(expectedResponse
, receivedResponse
)
588 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
591 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
592 query
.flags
&= ~dns
.flags
.RD
593 expectedResponse
= dns
.message
.make_response(query
)
594 # this one should have the AA flag set
595 expectedResponse
.flags |
= dns
.flags
.AA
596 rrset
= dns
.rrset
.from_text(name
,
600 '0 0 65535 srv.powerdns.com.')
601 expectedResponse
.answer
.append(rrset
)
603 for method
in ("sendUDPQuery", "sendTCPQuery"):
604 sender
= getattr(self
, method
)
605 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
606 self
.assertTrue(receivedResponse
)
607 self
.assertEqual(expectedResponse
, receivedResponse
)
608 # sorry, we can't set the TTL from the Lua API right now
609 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
611 class TestSpoofingLuaSpoofMulti(DNSDistTest
):
613 _config_template
= """
614 function spoof1multirule(dq)
617 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
618 return DNSAction.HeaderModify
619 elseif(dq.qtype == 28) -- AAAA
621 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
622 return DNSAction.HeaderModify
624 return DNSAction.None, ""
628 function spoofrawmultirule(dq)
629 if dq.qtype == DNSQType.A then
630 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
631 return DNSAction.HeaderModify
632 elseif dq.qtype == DNSQType.TXT then
633 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
634 return DNSAction.HeaderModify
635 elseif dq.qtype == DNSQType.SRV then
637 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
638 return DNSAction.HeaderModify
640 return DNSAction.None, ""
643 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
644 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
645 newServer{address="127.0.0.1:%s"}
648 def testLuaSpoofMultiA(self
):
650 Spoofing: Spoofing multiple A via Lua dq:spoof
652 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
653 check that dnsdist sends a spoofed result.
655 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
656 query
= dns
.message
.make_query(name
, 'A', 'IN')
657 # dnsdist set RA = RD for spoofed responses
658 query
.flags
&= ~dns
.flags
.RD
659 expectedResponse
= dns
.message
.make_response(query
)
660 rrset
= dns
.rrset
.from_text(name
,
664 '192.0.2.1', '192.0.2.2')
665 expectedResponse
.answer
.append(rrset
)
667 for method
in ("sendUDPQuery", "sendTCPQuery"):
668 sender
= getattr(self
, method
)
669 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
670 self
.assertTrue(receivedResponse
)
671 self
.assertEqual(expectedResponse
, receivedResponse
)
673 def testLuaSpoofMultiAAAA(self
):
675 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
677 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
678 check that dnsdist sends a spoofed result.
680 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
681 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
682 # dnsdist set RA = RD for spoofed responses
683 query
.flags
&= ~dns
.flags
.RD
684 expectedResponse
= dns
.message
.make_response(query
)
685 rrset
= dns
.rrset
.from_text(name
,
689 '2001:DB8::1', '2001:DB8::2')
690 expectedResponse
.answer
.append(rrset
)
692 for method
in ("sendUDPQuery", "sendTCPQuery"):
693 sender
= getattr(self
, method
)
694 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
695 self
.assertTrue(receivedResponse
)
696 self
.assertEqual(expectedResponse
, receivedResponse
)
698 def testLuaSpoofMultiRawAction(self
):
700 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
702 name
= 'lua-raw-multi.spoofing.tests.powerdns.com.'
705 query
= dns
.message
.make_query(name
, 'A', 'IN')
706 query
.flags
&= ~dns
.flags
.RD
707 expectedResponse
= dns
.message
.make_response(query
)
708 expectedResponse
.flags
&= ~dns
.flags
.AA
709 rrset
= dns
.rrset
.from_text(name
,
713 '192.0.2.1', '192.0.2.2')
714 expectedResponse
.answer
.append(rrset
)
716 for method
in ("sendUDPQuery", "sendTCPQuery"):
717 sender
= getattr(self
, method
)
718 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
719 self
.assertTrue(receivedResponse
)
720 self
.assertEqual(expectedResponse
, receivedResponse
)
721 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
724 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
725 query
.flags
&= ~dns
.flags
.RD
726 expectedResponse
= dns
.message
.make_response(query
)
727 expectedResponse
.flags
&= ~dns
.flags
.AA
728 rrset
= dns
.rrset
.from_text(name
,
732 '"aaa" "bbbb"', '"ccccccccccc"')
733 expectedResponse
.answer
.append(rrset
)
735 for method
in ("sendUDPQuery", "sendTCPQuery"):
736 sender
= getattr(self
, method
)
737 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
738 self
.assertTrue(receivedResponse
)
739 self
.assertEqual(expectedResponse
, receivedResponse
)
740 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
743 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
744 query
.flags
&= ~dns
.flags
.RD
745 expectedResponse
= dns
.message
.make_response(query
)
746 # this one should have the AA flag set
747 expectedResponse
.flags |
= dns
.flags
.AA
748 rrset
= dns
.rrset
.from_text(name
,
752 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
753 expectedResponse
.answer
.append(rrset
)
755 for method
in ("sendUDPQuery", "sendTCPQuery"):
756 sender
= getattr(self
, method
)
757 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
758 self
.assertTrue(receivedResponse
)
759 self
.assertEqual(expectedResponse
, receivedResponse
)
760 # sorry, we can't set the TTL from the Lua API right now
761 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
763 class TestSpoofingLuaWithStatistics(DNSDistTest
):
765 _config_template
= """
766 function spoof1rule(dq)
767 queriesCount = getStatisticsCounters()['queries']
768 if(queriesCount == 1) then
769 return DNSAction.Spoof, "192.0.2.1"
770 elseif(queriesCount == 2) then
771 return DNSAction.Spoof, "192.0.2.2"
773 return DNSAction.Spoof, "192.0.2.0"
776 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
777 newServer{address="127.0.0.1:%s"}
780 def testLuaSpoofBasedOnStatistics(self
):
782 Spoofing: Spoofing an A via Lua based on statistics counters
785 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
786 query
= dns
.message
.make_query(name
, 'A', 'IN')
787 # dnsdist set RA = RD for spoofed responses
788 query
.flags
&= ~dns
.flags
.RD
789 expectedResponse1
= dns
.message
.make_response(query
)
790 rrset
= dns
.rrset
.from_text(name
,
795 expectedResponse1
.answer
.append(rrset
)
796 expectedResponse2
= dns
.message
.make_response(query
)
797 rrset
= dns
.rrset
.from_text(name
,
802 expectedResponse2
.answer
.append(rrset
)
803 expectedResponseAfterwards
= dns
.message
.make_response(query
)
804 rrset
= dns
.rrset
.from_text(name
,
809 expectedResponseAfterwards
.answer
.append(rrset
)
811 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
812 self
.assertTrue(receivedResponse
)
813 self
.assertEqual(expectedResponse1
, receivedResponse
)
815 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
816 self
.assertTrue(receivedResponse
)
817 self
.assertEqual(expectedResponse2
, receivedResponse
)
819 for method
in ("sendUDPQuery", "sendTCPQuery"):
820 sender
= getattr(self
, method
)
821 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
822 self
.assertTrue(receivedResponse
)
823 self
.assertEqual(expectedResponseAfterwards
, receivedResponse
)