3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(makeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{makeRule("rawchaos.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT), QClassRule(DNSClass.CHAOS)}, SpoofRawAction("\\005chaos"))
20 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
21 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
22 newServer{address="127.0.0.1:%s"}
25 def testSpoofActionA(self
):
27 Spoofing: Spoof A via Action
29 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
30 check that dnsdist sends a spoofed result.
32 name
= 'spoofaction.spoofing.tests.powerdns.com.'
33 query
= dns
.message
.make_query(name
, 'A', 'IN')
34 # dnsdist set RA = RD for spoofed responses
35 query
.flags
&= ~dns
.flags
.RD
36 expectedResponse
= dns
.message
.make_response(query
)
37 rrset
= dns
.rrset
.from_text(name
,
42 expectedResponse
.answer
.append(rrset
)
44 for method
in ("sendUDPQuery", "sendTCPQuery"):
45 sender
= getattr(self
, method
)
46 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
47 self
.assertTrue(receivedResponse
)
48 self
.assertEqual(expectedResponse
, receivedResponse
)
50 def testSpoofActionAAAA(self
):
52 Spoofing: Spoof AAAA via Action
54 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
55 check that dnsdist sends a spoofed result.
57 name
= 'spoofaction.spoofing.tests.powerdns.com.'
58 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
59 # dnsdist set RA = RD for spoofed responses
60 query
.flags
&= ~dns
.flags
.RD
61 expectedResponse
= dns
.message
.make_response(query
)
62 rrset
= dns
.rrset
.from_text(name
,
67 expectedResponse
.answer
.append(rrset
)
69 for method
in ("sendUDPQuery", "sendTCPQuery"):
70 sender
= getattr(self
, method
)
71 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
72 self
.assertTrue(receivedResponse
)
73 self
.assertEqual(expectedResponse
, receivedResponse
)
75 def testSpoofActionCNAME(self
):
77 Spoofing: Spoof CNAME via Action
79 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
80 check that dnsdist sends a spoofed result.
82 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
83 query
= dns
.message
.make_query(name
, 'A', 'IN')
84 # dnsdist set RA = RD for spoofed responses
85 query
.flags
&= ~dns
.flags
.RD
86 expectedResponse
= dns
.message
.make_response(query
)
87 rrset
= dns
.rrset
.from_text(name
,
91 'cnameaction.spoofing.tests.powerdns.com.')
92 expectedResponse
.answer
.append(rrset
)
94 for method
in ("sendUDPQuery", "sendTCPQuery"):
95 sender
= getattr(self
, method
)
96 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
97 self
.assertTrue(receivedResponse
)
98 self
.assertEqual(expectedResponse
, receivedResponse
)
100 def testSpoofActionMultiA(self
):
102 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
104 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
105 check that dnsdist sends a spoofed result.
107 name
= 'multispoof.spoofing.tests.powerdns.com.'
108 query
= dns
.message
.make_query(name
, 'A', 'IN')
109 # dnsdist set RA = RD for spoofed responses
110 query
.flags
&= ~dns
.flags
.RD
111 expectedResponse
= dns
.message
.make_response(query
)
112 rrset
= dns
.rrset
.from_text(name
,
116 '192.0.2.2', '192.0.2.1')
117 expectedResponse
.answer
.append(rrset
)
119 for method
in ("sendUDPQuery", "sendTCPQuery"):
120 sender
= getattr(self
, method
)
121 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
122 self
.assertTrue(receivedResponse
)
123 self
.assertEqual(expectedResponse
, receivedResponse
)
125 def testSpoofActionMultiAAAA(self
):
127 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
129 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
130 check that dnsdist sends a spoofed result.
132 name
= 'multispoof.spoofing.tests.powerdns.com.'
133 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
134 # dnsdist set RA = RD for spoofed responses
135 query
.flags
&= ~dns
.flags
.RD
136 expectedResponse
= dns
.message
.make_response(query
)
137 rrset
= dns
.rrset
.from_text(name
,
141 '2001:DB8::1', '2001:DB8::2')
142 expectedResponse
.answer
.append(rrset
)
144 for method
in ("sendUDPQuery", "sendTCPQuery"):
145 sender
= getattr(self
, method
)
146 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
147 self
.assertTrue(receivedResponse
)
148 self
.assertEqual(expectedResponse
, receivedResponse
)
150 def testSpoofActionMultiANY(self
):
152 Spoofing: Spoof multiple addresses via AddDomainSpoof
154 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
155 check that dnsdist sends a spoofed result.
157 name
= 'multispoof.spoofing.tests.powerdns.com.'
158 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
159 # dnsdist set RA = RD for spoofed responses
160 query
.flags
&= ~dns
.flags
.RD
161 expectedResponse
= dns
.message
.make_response(query
)
163 rrset
= dns
.rrset
.from_text(name
,
167 '192.0.2.2', '192.0.2.1')
168 expectedResponse
.answer
.append(rrset
)
170 rrset
= dns
.rrset
.from_text(name
,
174 '2001:DB8::1', '2001:DB8::2')
175 expectedResponse
.answer
.append(rrset
)
177 for method
in ("sendUDPQuery", "sendTCPQuery"):
178 sender
= getattr(self
, method
)
179 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
180 self
.assertTrue(receivedResponse
)
181 self
.assertEqual(expectedResponse
, receivedResponse
)
183 def testSpoofActionSetAA(self
):
185 Spoofing: Spoof via Action, setting AA=1
187 name
= 'spoofaction-aa.spoofing.tests.powerdns.com.'
188 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
189 # dnsdist set RA = RD for spoofed responses
190 query
.flags
&= ~dns
.flags
.RD
191 expectedResponse
= dns
.message
.make_response(query
)
192 expectedResponse
.flags |
= dns
.flags
.AA
193 rrset
= dns
.rrset
.from_text(name
,
198 expectedResponse
.answer
.append(rrset
)
200 for method
in ("sendUDPQuery", "sendTCPQuery"):
201 sender
= getattr(self
, method
)
202 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
203 self
.assertTrue(receivedResponse
)
204 self
.assertEqual(expectedResponse
, receivedResponse
)
205 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
207 def testSpoofActionSetAD(self
):
209 Spoofing: Spoof via Action, setting AD=1
211 name
= 'spoofaction-ad.spoofing.tests.powerdns.com.'
212 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
213 # dnsdist set RA = RD for spoofed responses
214 query
.flags
&= ~dns
.flags
.RD
215 expectedResponse
= dns
.message
.make_response(query
)
216 expectedResponse
.flags |
= dns
.flags
.AD
217 rrset
= dns
.rrset
.from_text(name
,
222 expectedResponse
.answer
.append(rrset
)
224 for method
in ("sendUDPQuery", "sendTCPQuery"):
225 sender
= getattr(self
, method
)
226 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
227 self
.assertTrue(receivedResponse
)
228 self
.assertEqual(expectedResponse
, receivedResponse
)
229 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
231 def testSpoofActionSetRA(self
):
233 Spoofing: Spoof via Action, setting RA=1
235 name
= 'spoofaction-ra.spoofing.tests.powerdns.com.'
236 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
237 # dnsdist set RA = RD for spoofed responses
238 query
.flags
&= ~dns
.flags
.RD
239 expectedResponse
= dns
.message
.make_response(query
)
240 expectedResponse
.flags |
= dns
.flags
.RA
241 rrset
= dns
.rrset
.from_text(name
,
246 expectedResponse
.answer
.append(rrset
)
248 for method
in ("sendUDPQuery", "sendTCPQuery"):
249 sender
= getattr(self
, method
)
250 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
251 self
.assertTrue(receivedResponse
)
252 self
.assertEqual(expectedResponse
, receivedResponse
)
253 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
255 def testSpoofActionSetNoRA(self
):
257 Spoofing: Spoof via Action, setting RA=0
259 name
= 'spoofaction-nora.spoofing.tests.powerdns.com.'
260 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
261 expectedResponse
= dns
.message
.make_response(query
)
262 expectedResponse
.flags
&= ~dns
.flags
.RA
263 rrset
= dns
.rrset
.from_text(name
,
268 expectedResponse
.answer
.append(rrset
)
270 for method
in ("sendUDPQuery", "sendTCPQuery"):
271 sender
= getattr(self
, method
)
272 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
273 self
.assertTrue(receivedResponse
)
274 self
.assertEqual(expectedResponse
, receivedResponse
)
275 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
277 def testSpoofActionSetTTL(self
):
279 Spoofing: Spoof via Action, setting the TTL to 1500
281 name
= 'spoofaction-ttl.spoofing.tests.powerdns.com.'
282 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
283 expectedResponse
= dns
.message
.make_response(query
)
284 expectedResponse
.flags |
= dns
.flags
.RA
285 rrset
= dns
.rrset
.from_text(name
,
290 expectedResponse
.answer
.append(rrset
)
292 for method
in ("sendUDPQuery", "sendTCPQuery"):
293 sender
= getattr(self
, method
)
294 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
295 self
.assertTrue(receivedResponse
)
296 self
.assertEqual(expectedResponse
, receivedResponse
)
297 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 1500)
299 def testSpoofRawAction(self
):
301 Spoofing: Spoof a response from raw bytes
303 name
= 'raw.spoofing.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN')
307 query
.flags
&= ~dns
.flags
.RD
308 expectedResponse
= dns
.message
.make_response(query
)
309 expectedResponse
.flags
&= ~dns
.flags
.AA
310 rrset
= dns
.rrset
.from_text(name
,
315 expectedResponse
.answer
.append(rrset
)
317 for method
in ("sendUDPQuery", "sendTCPQuery"):
318 sender
= getattr(self
, method
)
319 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
320 self
.assertTrue(receivedResponse
)
321 self
.assertEqual(expectedResponse
, receivedResponse
)
322 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
325 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
326 query
.flags
&= ~dns
.flags
.RD
327 expectedResponse
= dns
.message
.make_response(query
)
328 expectedResponse
.flags
&= ~dns
.flags
.AA
329 rrset
= dns
.rrset
.from_text(name
,
333 '"aaa" "bbbb" "ccccccccccc"')
334 expectedResponse
.answer
.append(rrset
)
336 for method
in ("sendUDPQuery", "sendTCPQuery"):
337 sender
= getattr(self
, method
)
338 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
339 self
.assertTrue(receivedResponse
)
340 self
.assertEqual(expectedResponse
, receivedResponse
)
341 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
344 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
345 query
.flags
&= ~dns
.flags
.RD
346 expectedResponse
= dns
.message
.make_response(query
)
347 # this one should have the AA flag set
348 expectedResponse
.flags |
= dns
.flags
.AA
349 rrset
= dns
.rrset
.from_text(name
,
353 '0 0 65535 srv.powerdns.com.')
354 expectedResponse
.answer
.append(rrset
)
356 for method
in ("sendUDPQuery", "sendTCPQuery"):
357 sender
= getattr(self
, method
)
358 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
359 self
.assertTrue(receivedResponse
)
360 self
.assertEqual(expectedResponse
, receivedResponse
)
361 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 3600)
363 def testSpoofRawChaosAction(self
):
365 Spoofing: Spoof a response from several raw bytes in QCLass CH
367 name
= 'rawchaos.spoofing.tests.powerdns.com.'
370 query
= dns
.message
.make_query(name
, 'TXT', 'CH')
371 query
.flags
&= ~dns
.flags
.RD
372 expectedResponse
= dns
.message
.make_response(query
)
373 expectedResponse
.flags
&= ~dns
.flags
.AA
374 rrset
= dns
.rrset
.from_text(name
,
379 expectedResponse
.answer
.append(rrset
)
381 for method
in ("sendUDPQuery", "sendTCPQuery"):
382 sender
= getattr(self
, method
)
383 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
384 self
.assertTrue(receivedResponse
)
385 self
.assertEqual(expectedResponse
, receivedResponse
)
386 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
389 def testSpoofRawActionMulti(self
):
391 Spoofing: Spoof a response from several raw bytes
393 name
= 'multiraw.spoofing.tests.powerdns.com.'
396 query
= dns
.message
.make_query(name
, 'A', 'IN')
397 query
.flags
&= ~dns
.flags
.RD
398 expectedResponse
= dns
.message
.make_response(query
)
399 expectedResponse
.flags
&= ~dns
.flags
.AA
400 rrset
= dns
.rrset
.from_text(name
,
404 '192.0.2.1', '192.0.2.2')
405 expectedResponse
.answer
.append(rrset
)
407 for method
in ("sendUDPQuery", "sendTCPQuery"):
408 sender
= getattr(self
, method
)
409 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
410 self
.assertTrue(receivedResponse
)
411 self
.assertEqual(expectedResponse
, receivedResponse
)
412 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
415 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
416 query
.flags
&= ~dns
.flags
.RD
417 expectedResponse
= dns
.message
.make_response(query
)
418 expectedResponse
.flags
&= ~dns
.flags
.AA
419 rrset
= dns
.rrset
.from_text(name
,
423 '"aaa" "bbbb"', '"ccccccccccc"')
424 expectedResponse
.answer
.append(rrset
)
426 for method
in ("sendUDPQuery", "sendTCPQuery"):
427 sender
= getattr(self
, method
)
428 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
429 self
.assertTrue(receivedResponse
)
430 self
.assertEqual(expectedResponse
, receivedResponse
)
431 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
433 class TestSpoofingLuaSpoof(DNSDistTest
):
435 _config_template
= """
436 function spoof1rule(dq)
439 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
440 elseif(dq.qtype == 28) -- AAAA
442 return DNSAction.Spoof, "2001:DB8::1"
444 return DNSAction.None, ""
448 function spoof2rule(dq)
449 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
452 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
453 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
455 function spoofrawrule(dq)
456 if dq.qtype == DNSQType.A then
457 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
458 elseif dq.qtype == DNSQType.TXT then
459 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
460 elseif dq.qtype == DNSQType.SRV then
462 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
464 return DNSAction.None, ""
467 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
468 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
469 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
470 newServer{address="127.0.0.1:%s"}
473 def testLuaSpoofA(self
):
475 Spoofing: Spoofing an A via Lua
477 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
478 check that dnsdist sends a spoofed result.
480 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
481 query
= dns
.message
.make_query(name
, 'A', 'IN')
482 # dnsdist set RA = RD for spoofed responses
483 query
.flags
&= ~dns
.flags
.RD
484 expectedResponse
= dns
.message
.make_response(query
)
485 rrset
= dns
.rrset
.from_text(name
,
489 '192.0.2.1', '192.0.2.2')
490 expectedResponse
.answer
.append(rrset
)
492 for method
in ("sendUDPQuery", "sendTCPQuery"):
493 sender
= getattr(self
, method
)
494 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
495 self
.assertTrue(receivedResponse
)
496 self
.assertEqual(expectedResponse
, receivedResponse
)
498 def testLuaSpoofAAAA(self
):
500 Spoofing: Spoofing an AAAA via Lua
502 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
503 check that dnsdist sends a spoofed result.
505 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
506 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
507 # dnsdist set RA = RD for spoofed responses
508 query
.flags
&= ~dns
.flags
.RD
509 expectedResponse
= dns
.message
.make_response(query
)
510 rrset
= dns
.rrset
.from_text(name
,
515 expectedResponse
.answer
.append(rrset
)
517 for method
in ("sendUDPQuery", "sendTCPQuery"):
518 sender
= getattr(self
, method
)
519 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
520 self
.assertTrue(receivedResponse
)
521 self
.assertEqual(expectedResponse
, receivedResponse
)
523 def testLuaSpoofAWithCNAME(self
):
525 Spoofing: Spoofing an A with a CNAME via Lua
527 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
528 check that dnsdist sends a spoofed result.
530 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
531 query
= dns
.message
.make_query(name
, 'A', 'IN')
532 # dnsdist set RA = RD for spoofed responses
533 query
.flags
&= ~dns
.flags
.RD
534 expectedResponse
= dns
.message
.make_response(query
)
535 rrset
= dns
.rrset
.from_text(name
,
539 'spoofedcname.spoofing.tests.powerdns.com.')
540 expectedResponse
.answer
.append(rrset
)
542 for method
in ("sendUDPQuery", "sendTCPQuery"):
543 sender
= getattr(self
, method
)
544 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
545 self
.assertTrue(receivedResponse
)
546 self
.assertEqual(expectedResponse
, receivedResponse
)
548 def testLuaSpoofAAAAWithCNAME(self
):
550 Spoofing: Spoofing an AAAA with a CNAME via Lua
552 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
553 check that dnsdist sends a spoofed result.
555 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
556 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
557 # dnsdist set RA = RD for spoofed responses
558 query
.flags
&= ~dns
.flags
.RD
559 expectedResponse
= dns
.message
.make_response(query
)
560 rrset
= dns
.rrset
.from_text(name
,
564 'spoofedcname.spoofing.tests.powerdns.com.')
565 expectedResponse
.answer
.append(rrset
)
567 for method
in ("sendUDPQuery", "sendTCPQuery"):
568 sender
= getattr(self
, method
)
569 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
570 self
.assertTrue(receivedResponse
)
571 self
.assertEqual(expectedResponse
, receivedResponse
)
573 def testLuaSpoofRawAction(self
):
575 Spoofing: Spoof a response from raw bytes via Lua
577 name
= 'lua-raw.spoofing.tests.powerdns.com.'
580 query
= dns
.message
.make_query(name
, 'A', 'IN')
581 query
.flags
&= ~dns
.flags
.RD
582 expectedResponse
= dns
.message
.make_response(query
)
583 expectedResponse
.flags
&= ~dns
.flags
.AA
584 rrset
= dns
.rrset
.from_text(name
,
589 expectedResponse
.answer
.append(rrset
)
591 for method
in ("sendUDPQuery", "sendTCPQuery"):
592 sender
= getattr(self
, method
)
593 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
594 self
.assertTrue(receivedResponse
)
595 self
.assertEqual(expectedResponse
, receivedResponse
)
596 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
599 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
600 query
.flags
&= ~dns
.flags
.RD
601 expectedResponse
= dns
.message
.make_response(query
)
602 expectedResponse
.flags
&= ~dns
.flags
.AA
603 rrset
= dns
.rrset
.from_text(name
,
607 '"aaa" "bbbb" "ccccccccccc"')
608 expectedResponse
.answer
.append(rrset
)
610 for method
in ("sendUDPQuery", "sendTCPQuery"):
611 sender
= getattr(self
, method
)
612 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
613 self
.assertTrue(receivedResponse
)
614 self
.assertEqual(expectedResponse
, receivedResponse
)
615 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
618 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
619 query
.flags
&= ~dns
.flags
.RD
620 expectedResponse
= dns
.message
.make_response(query
)
621 # this one should have the AA flag set
622 expectedResponse
.flags |
= dns
.flags
.AA
623 rrset
= dns
.rrset
.from_text(name
,
627 '0 0 65535 srv.powerdns.com.')
628 expectedResponse
.answer
.append(rrset
)
630 for method
in ("sendUDPQuery", "sendTCPQuery"):
631 sender
= getattr(self
, method
)
632 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
633 self
.assertTrue(receivedResponse
)
634 self
.assertEqual(expectedResponse
, receivedResponse
)
635 # sorry, we can't set the TTL from the Lua API right now
636 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
638 class TestSpoofingLuaSpoofMulti(DNSDistTest
):
640 _config_template
= """
641 function spoof1multirule(dq)
644 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
645 return DNSAction.HeaderModify
646 elseif(dq.qtype == 28) -- AAAA
648 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
649 return DNSAction.HeaderModify
651 return DNSAction.None, ""
655 function spoofrawmultirule(dq)
656 if dq.qtype == DNSQType.A then
657 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
658 return DNSAction.HeaderModify
659 elseif dq.qtype == DNSQType.TXT then
660 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
661 return DNSAction.HeaderModify
662 elseif dq.qtype == DNSQType.SRV then
664 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
665 return DNSAction.HeaderModify
667 return DNSAction.None, ""
670 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
671 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
672 newServer{address="127.0.0.1:%s"}
675 def testLuaSpoofMultiA(self
):
677 Spoofing: Spoofing multiple A via Lua dq:spoof
679 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
680 check that dnsdist sends a spoofed result.
682 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
683 query
= dns
.message
.make_query(name
, 'A', 'IN')
684 # dnsdist set RA = RD for spoofed responses
685 query
.flags
&= ~dns
.flags
.RD
686 expectedResponse
= dns
.message
.make_response(query
)
687 rrset
= dns
.rrset
.from_text(name
,
691 '192.0.2.1', '192.0.2.2')
692 expectedResponse
.answer
.append(rrset
)
694 for method
in ("sendUDPQuery", "sendTCPQuery"):
695 sender
= getattr(self
, method
)
696 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
697 self
.assertTrue(receivedResponse
)
698 self
.assertEqual(expectedResponse
, receivedResponse
)
700 def testLuaSpoofMultiAAAA(self
):
702 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
704 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
705 check that dnsdist sends a spoofed result.
707 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
708 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
709 # dnsdist set RA = RD for spoofed responses
710 query
.flags
&= ~dns
.flags
.RD
711 expectedResponse
= dns
.message
.make_response(query
)
712 rrset
= dns
.rrset
.from_text(name
,
716 '2001:DB8::1', '2001:DB8::2')
717 expectedResponse
.answer
.append(rrset
)
719 for method
in ("sendUDPQuery", "sendTCPQuery"):
720 sender
= getattr(self
, method
)
721 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
722 self
.assertTrue(receivedResponse
)
723 self
.assertEqual(expectedResponse
, receivedResponse
)
725 def testLuaSpoofMultiRawAction(self
):
727 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
729 name
= 'lua-raw-multi.spoofing.tests.powerdns.com.'
732 query
= dns
.message
.make_query(name
, 'A', 'IN')
733 query
.flags
&= ~dns
.flags
.RD
734 expectedResponse
= dns
.message
.make_response(query
)
735 expectedResponse
.flags
&= ~dns
.flags
.AA
736 rrset
= dns
.rrset
.from_text(name
,
740 '192.0.2.1', '192.0.2.2')
741 expectedResponse
.answer
.append(rrset
)
743 for method
in ("sendUDPQuery", "sendTCPQuery"):
744 sender
= getattr(self
, method
)
745 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
746 self
.assertTrue(receivedResponse
)
747 self
.assertEqual(expectedResponse
, receivedResponse
)
748 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
751 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
752 query
.flags
&= ~dns
.flags
.RD
753 expectedResponse
= dns
.message
.make_response(query
)
754 expectedResponse
.flags
&= ~dns
.flags
.AA
755 rrset
= dns
.rrset
.from_text(name
,
759 '"aaa" "bbbb"', '"ccccccccccc"')
760 expectedResponse
.answer
.append(rrset
)
762 for method
in ("sendUDPQuery", "sendTCPQuery"):
763 sender
= getattr(self
, method
)
764 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
765 self
.assertTrue(receivedResponse
)
766 self
.assertEqual(expectedResponse
, receivedResponse
)
767 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
770 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
771 query
.flags
&= ~dns
.flags
.RD
772 expectedResponse
= dns
.message
.make_response(query
)
773 # this one should have the AA flag set
774 expectedResponse
.flags |
= dns
.flags
.AA
775 rrset
= dns
.rrset
.from_text(name
,
779 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
780 expectedResponse
.answer
.append(rrset
)
782 for method
in ("sendUDPQuery", "sendTCPQuery"):
783 sender
= getattr(self
, method
)
784 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
785 self
.assertTrue(receivedResponse
)
786 self
.assertEqual(expectedResponse
, receivedResponse
)
787 # sorry, we can't set the TTL from the Lua API right now
788 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
790 class TestSpoofingLuaFFISpoofMulti(DNSDistTest
):
792 _config_template
= """
793 local ffi = require("ffi")
795 function spoofrawmultirule(dq)
796 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
798 if qtype == DNSQType.A then
799 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
801 local str = "\\192\\000\\002\\001"
802 records[0].size = #str
803 records[0].value = str
805 local str = "\\192\\000\\002\\255"
806 records[1].value = str
807 records[1].size = #str
809 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
810 return DNSAction.HeaderModify
811 elseif qtype == DNSQType.TXT then
812 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
814 local str = "\\033this text has a comma at the end,"
815 records[0].size = #str
816 records[0].value = str
818 local str = "\\003aaa\\004bbbb"
819 records[1].size = #str
820 records[1].value = str
822 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
823 return DNSAction.HeaderModify
826 return DNSAction.None, ""
829 addAction("lua-raw-multi.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofrawmultirule))
830 newServer{address="127.0.0.1:%s"}
834 def testLuaSpoofMultiRawAction(self
):
836 Spoofing via Lua FFI: Spoof responses from raw bytes via Lua FFI
838 name
= 'lua-raw-multi.ffi-spoofing.tests.powerdns.com.'
841 query
= dns
.message
.make_query(name
, 'A', 'IN')
842 query
.flags
&= ~dns
.flags
.RD
843 expectedResponse
= dns
.message
.make_response(query
)
844 expectedResponse
.flags
&= ~dns
.flags
.AA
845 rrset
= dns
.rrset
.from_text(name
,
849 '192.0.2.1', '192.0.2.255')
850 expectedResponse
.answer
.append(rrset
)
852 for method
in ("sendUDPQuery", "sendTCPQuery"):
853 sender
= getattr(self
, method
)
854 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
855 self
.assertTrue(receivedResponse
)
856 self
.assertEqual(expectedResponse
, receivedResponse
)
857 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
860 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
861 query
.flags
&= ~dns
.flags
.RD
862 expectedResponse
= dns
.message
.make_response(query
)
863 expectedResponse
.flags
&= ~dns
.flags
.AA
864 rrset
= dns
.rrset
.from_text(name
,
868 '"this text has a comma at the end,"', '"aaa" "bbbb"')
869 expectedResponse
.answer
.append(rrset
)
871 for method
in ("sendUDPQuery", "sendTCPQuery"):
872 sender
= getattr(self
, method
)
873 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
874 self
.assertTrue(receivedResponse
)
875 self
.assertEqual(expectedResponse
, receivedResponse
)
876 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
878 class TestSpoofingLuaWithStatistics(DNSDistTest
):
880 _config_template
= """
881 function spoof1rule(dq)
882 queriesCount = getStatisticsCounters()['queries']
883 if(queriesCount == 1) then
884 return DNSAction.Spoof, "192.0.2.1"
885 elseif(queriesCount == 2) then
886 return DNSAction.Spoof, "192.0.2.2"
888 return DNSAction.Spoof, "192.0.2.0"
891 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
892 newServer{address="127.0.0.1:%s"}
895 def testLuaSpoofBasedOnStatistics(self
):
897 Spoofing: Spoofing an A via Lua based on statistics counters
900 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
901 query
= dns
.message
.make_query(name
, 'A', 'IN')
902 # dnsdist set RA = RD for spoofed responses
903 query
.flags
&= ~dns
.flags
.RD
904 expectedResponse1
= dns
.message
.make_response(query
)
905 rrset
= dns
.rrset
.from_text(name
,
910 expectedResponse1
.answer
.append(rrset
)
911 expectedResponse2
= dns
.message
.make_response(query
)
912 rrset
= dns
.rrset
.from_text(name
,
917 expectedResponse2
.answer
.append(rrset
)
918 expectedResponseAfterwards
= dns
.message
.make_response(query
)
919 rrset
= dns
.rrset
.from_text(name
,
924 expectedResponseAfterwards
.answer
.append(rrset
)
926 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
927 self
.assertTrue(receivedResponse
)
928 self
.assertEqual(expectedResponse1
, receivedResponse
)
930 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
931 self
.assertTrue(receivedResponse
)
932 self
.assertEqual(expectedResponse2
, receivedResponse
)
934 for method
in ("sendUDPQuery", "sendTCPQuery"):
935 sender
= getattr(self
, method
)
936 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
937 self
.assertTrue(receivedResponse
)
938 self
.assertEqual(expectedResponseAfterwards
, receivedResponse
)
940 class TestSpoofingLuaSpoofPacket(DNSDistTest
):
942 _config_template
= """
944 function spoofpacket(dq)
945 if dq.qtype == DNSQType.A then
946 return DNSAction.SpoofPacket, "\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
948 return DNSAction.None, ""
951 addAction("lua-raw-packet.spoofing.tests.powerdns.com.", LuaAction(spoofpacket))
952 local rawResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\019rule\\045lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
953 addAction(AndRule{QTypeRule(DNSQType.A), makeRule("rule-lua-raw-packet.spoofing.tests.powerdns.com.")}, SpoofPacketAction(rawResponse, string.len(rawResponse)))
955 local ffi = require("ffi")
957 function spoofpacketffi(dq)
958 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
959 if qtype == DNSQType.A then
961 local refusedResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\012ffi\\045spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
963 ffi.C.dnsdist_ffi_dnsquestion_spoof_packet(dq, refusedResponse, string.len(refusedResponse))
964 return DNSAction.HeaderModify
966 return DNSAction.None, ""
969 addAction("lua-raw-packet.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofpacketffi))
970 newServer{address="127.0.0.1:%s"}
974 def testLuaSpoofPacket(self
):
976 Spoofing via Lua FFI: Spoof raw response via Lua FFI
978 for name
in ('lua-raw-packet.spoofing.tests.powerdns.com.', 'rule-lua-raw-packet.spoofing.tests.powerdns.com.'):
980 query
= dns
.message
.make_query(name
, 'A', 'IN')
981 expectedResponse
= dns
.message
.make_response(query
)
982 expectedResponse
.flags |
= dns
.flags
.RA
983 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
985 for method
in ("sendUDPQuery", "sendTCPQuery"):
986 sender
= getattr(self
, method
)
987 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
988 self
.assertTrue(receivedResponse
)
989 self
.assertEqual(expectedResponse
, receivedResponse
)
991 def testLuaFFISpoofPacket(self
):
993 Spoofing via Lua FFI: Spoof raw response via Lua FFI
995 name
= 'lua-raw-packet.ffi-spoofing.tests.powerdns.com.'
998 query
= dns
.message
.make_query(name
, 'A', 'IN')
999 expectedResponse
= dns
.message
.make_response(query
)
1000 expectedResponse
.flags |
= dns
.flags
.RA
1001 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1003 for method
in ("sendUDPQuery", "sendTCPQuery"):
1004 sender
= getattr(self
, method
)
1005 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
1006 self
.assertTrue(receivedResponse
)
1007 self
.assertEqual(expectedResponse
, receivedResponse
)