3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(SuffixMatchNodeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(SuffixMatchNodeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(SuffixMatchNodeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(SuffixMatchNodeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(SuffixMatchNodeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(SuffixMatchNodeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(SuffixMatchNodeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{SuffixMatchNodeRule("rawchaos.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT), QClassRule(DNSClass.CHAOS)}, SpoofRawAction("\\005chaos"))
20 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
21 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
23 addAction(AndRule{SuffixMatchNodeRule("raw-any.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.ANY)}, SpoofRawAction("\\007rfc\\056\\052\\056\\050\\000", { typeForAny=DNSQType.HINFO }))
24 newServer{address="127.0.0.1:%s"}
27 def testSpoofActionA(self
):
29 Spoofing: Spoof A via Action
31 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
32 check that dnsdist sends a spoofed result.
34 name
= 'spoofaction.spoofing.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 # dnsdist set RA = RD for spoofed responses
37 query
.flags
&= ~dns
.flags
.RD
38 expectedResponse
= dns
.message
.make_response(query
)
39 rrset
= dns
.rrset
.from_text(name
,
44 expectedResponse
.answer
.append(rrset
)
46 for method
in ("sendUDPQuery", "sendTCPQuery"):
47 sender
= getattr(self
, method
)
48 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
49 self
.assertTrue(receivedResponse
)
50 self
.assertEqual(expectedResponse
, receivedResponse
)
52 def testSpoofActionAAAA(self
):
54 Spoofing: Spoof AAAA via Action
56 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
57 check that dnsdist sends a spoofed result.
59 name
= 'spoofaction.spoofing.tests.powerdns.com.'
60 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
61 # dnsdist set RA = RD for spoofed responses
62 query
.flags
&= ~dns
.flags
.RD
63 expectedResponse
= dns
.message
.make_response(query
)
64 rrset
= dns
.rrset
.from_text(name
,
69 expectedResponse
.answer
.append(rrset
)
71 for method
in ("sendUDPQuery", "sendTCPQuery"):
72 sender
= getattr(self
, method
)
73 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
74 self
.assertTrue(receivedResponse
)
75 self
.assertEqual(expectedResponse
, receivedResponse
)
77 def testSpoofActionCNAME(self
):
79 Spoofing: Spoof CNAME via Action
81 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
82 check that dnsdist sends a spoofed result.
84 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
85 query
= dns
.message
.make_query(name
, 'A', 'IN')
86 # dnsdist set RA = RD for spoofed responses
87 query
.flags
&= ~dns
.flags
.RD
88 expectedResponse
= dns
.message
.make_response(query
)
89 rrset
= dns
.rrset
.from_text(name
,
93 'cnameaction.spoofing.tests.powerdns.com.')
94 expectedResponse
.answer
.append(rrset
)
96 for method
in ("sendUDPQuery", "sendTCPQuery"):
97 sender
= getattr(self
, method
)
98 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
99 self
.assertTrue(receivedResponse
)
100 self
.assertEqual(expectedResponse
, receivedResponse
)
102 def testSpoofActionMultiA(self
):
104 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
106 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
107 check that dnsdist sends a spoofed result.
109 name
= 'multispoof.spoofing.tests.powerdns.com.'
110 query
= dns
.message
.make_query(name
, 'A', 'IN')
111 # dnsdist set RA = RD for spoofed responses
112 query
.flags
&= ~dns
.flags
.RD
113 expectedResponse
= dns
.message
.make_response(query
)
114 rrset
= dns
.rrset
.from_text(name
,
118 '192.0.2.2', '192.0.2.1')
119 expectedResponse
.answer
.append(rrset
)
121 for method
in ("sendUDPQuery", "sendTCPQuery"):
122 sender
= getattr(self
, method
)
123 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
124 self
.assertTrue(receivedResponse
)
125 self
.assertEqual(expectedResponse
, receivedResponse
)
127 def testSpoofActionMultiAAAA(self
):
129 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
131 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
132 check that dnsdist sends a spoofed result.
134 name
= 'multispoof.spoofing.tests.powerdns.com.'
135 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
136 # dnsdist set RA = RD for spoofed responses
137 query
.flags
&= ~dns
.flags
.RD
138 expectedResponse
= dns
.message
.make_response(query
)
139 rrset
= dns
.rrset
.from_text(name
,
143 '2001:DB8::1', '2001:DB8::2')
144 expectedResponse
.answer
.append(rrset
)
146 for method
in ("sendUDPQuery", "sendTCPQuery"):
147 sender
= getattr(self
, method
)
148 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
149 self
.assertTrue(receivedResponse
)
150 self
.assertEqual(expectedResponse
, receivedResponse
)
152 def testSpoofActionMultiANY(self
):
154 Spoofing: Spoof multiple addresses via AddDomainSpoof
156 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
157 check that dnsdist sends a spoofed result.
159 name
= 'multispoof.spoofing.tests.powerdns.com.'
160 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
161 # dnsdist set RA = RD for spoofed responses
162 query
.flags
&= ~dns
.flags
.RD
163 expectedResponse
= dns
.message
.make_response(query
)
165 rrset
= dns
.rrset
.from_text(name
,
169 '192.0.2.2', '192.0.2.1')
170 expectedResponse
.answer
.append(rrset
)
172 rrset
= dns
.rrset
.from_text(name
,
176 '2001:DB8::1', '2001:DB8::2')
177 expectedResponse
.answer
.append(rrset
)
179 for method
in ("sendUDPQuery", "sendTCPQuery"):
180 sender
= getattr(self
, method
)
181 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
182 self
.assertTrue(receivedResponse
)
183 self
.assertEqual(expectedResponse
, receivedResponse
)
185 def testSpoofActionSetAA(self
):
187 Spoofing: Spoof via Action, setting AA=1
189 name
= 'spoofaction-aa.spoofing.tests.powerdns.com.'
190 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
191 # dnsdist set RA = RD for spoofed responses
192 query
.flags
&= ~dns
.flags
.RD
193 expectedResponse
= dns
.message
.make_response(query
)
194 expectedResponse
.flags |
= dns
.flags
.AA
195 rrset
= dns
.rrset
.from_text(name
,
200 expectedResponse
.answer
.append(rrset
)
202 for method
in ("sendUDPQuery", "sendTCPQuery"):
203 sender
= getattr(self
, method
)
204 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
205 self
.assertTrue(receivedResponse
)
206 self
.assertEqual(expectedResponse
, receivedResponse
)
207 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
209 def testSpoofActionSetAD(self
):
211 Spoofing: Spoof via Action, setting AD=1
213 name
= 'spoofaction-ad.spoofing.tests.powerdns.com.'
214 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
215 # dnsdist set RA = RD for spoofed responses
216 query
.flags
&= ~dns
.flags
.RD
217 expectedResponse
= dns
.message
.make_response(query
)
218 expectedResponse
.flags |
= dns
.flags
.AD
219 rrset
= dns
.rrset
.from_text(name
,
224 expectedResponse
.answer
.append(rrset
)
226 for method
in ("sendUDPQuery", "sendTCPQuery"):
227 sender
= getattr(self
, method
)
228 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
229 self
.assertTrue(receivedResponse
)
230 self
.assertEqual(expectedResponse
, receivedResponse
)
231 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
233 def testSpoofActionSetRA(self
):
235 Spoofing: Spoof via Action, setting RA=1
237 name
= 'spoofaction-ra.spoofing.tests.powerdns.com.'
238 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
239 # dnsdist set RA = RD for spoofed responses
240 query
.flags
&= ~dns
.flags
.RD
241 expectedResponse
= dns
.message
.make_response(query
)
242 expectedResponse
.flags |
= dns
.flags
.RA
243 rrset
= dns
.rrset
.from_text(name
,
248 expectedResponse
.answer
.append(rrset
)
250 for method
in ("sendUDPQuery", "sendTCPQuery"):
251 sender
= getattr(self
, method
)
252 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
253 self
.assertTrue(receivedResponse
)
254 self
.assertEqual(expectedResponse
, receivedResponse
)
255 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
257 def testSpoofActionSetNoRA(self
):
259 Spoofing: Spoof via Action, setting RA=0
261 name
= 'spoofaction-nora.spoofing.tests.powerdns.com.'
262 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
263 expectedResponse
= dns
.message
.make_response(query
)
264 expectedResponse
.flags
&= ~dns
.flags
.RA
265 rrset
= dns
.rrset
.from_text(name
,
270 expectedResponse
.answer
.append(rrset
)
272 for method
in ("sendUDPQuery", "sendTCPQuery"):
273 sender
= getattr(self
, method
)
274 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
275 self
.assertTrue(receivedResponse
)
276 self
.assertEqual(expectedResponse
, receivedResponse
)
277 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
279 def testSpoofActionSetTTL(self
):
281 Spoofing: Spoof via Action, setting the TTL to 1500
283 name
= 'spoofaction-ttl.spoofing.tests.powerdns.com.'
284 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
285 expectedResponse
= dns
.message
.make_response(query
)
286 expectedResponse
.flags |
= dns
.flags
.RA
287 rrset
= dns
.rrset
.from_text(name
,
292 expectedResponse
.answer
.append(rrset
)
294 for method
in ("sendUDPQuery", "sendTCPQuery"):
295 sender
= getattr(self
, method
)
296 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
297 self
.assertTrue(receivedResponse
)
298 self
.assertEqual(expectedResponse
, receivedResponse
)
299 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 1500)
301 def testSpoofRawAction(self
):
303 Spoofing: Spoof a response from raw bytes
305 name
= 'raw.spoofing.tests.powerdns.com.'
308 query
= dns
.message
.make_query(name
, 'A', 'IN')
309 query
.flags
&= ~dns
.flags
.RD
310 expectedResponse
= dns
.message
.make_response(query
)
311 expectedResponse
.flags
&= ~dns
.flags
.AA
312 rrset
= dns
.rrset
.from_text(name
,
317 expectedResponse
.answer
.append(rrset
)
319 for method
in ("sendUDPQuery", "sendTCPQuery"):
320 sender
= getattr(self
, method
)
321 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
322 self
.assertTrue(receivedResponse
)
323 self
.assertEqual(expectedResponse
, receivedResponse
)
324 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
327 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
328 query
.flags
&= ~dns
.flags
.RD
329 expectedResponse
= dns
.message
.make_response(query
)
330 expectedResponse
.flags
&= ~dns
.flags
.AA
331 rrset
= dns
.rrset
.from_text(name
,
335 '"aaa" "bbbb" "ccccccccccc"')
336 expectedResponse
.answer
.append(rrset
)
338 for method
in ("sendUDPQuery", "sendTCPQuery"):
339 sender
= getattr(self
, method
)
340 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
341 self
.assertTrue(receivedResponse
)
342 self
.assertEqual(expectedResponse
, receivedResponse
)
343 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
346 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
347 query
.flags
&= ~dns
.flags
.RD
348 expectedResponse
= dns
.message
.make_response(query
)
349 # this one should have the AA flag set
350 expectedResponse
.flags |
= dns
.flags
.AA
351 rrset
= dns
.rrset
.from_text(name
,
355 '0 0 65535 srv.powerdns.com.')
356 expectedResponse
.answer
.append(rrset
)
358 for method
in ("sendUDPQuery", "sendTCPQuery"):
359 sender
= getattr(self
, method
)
360 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
361 self
.assertTrue(receivedResponse
)
362 self
.assertEqual(expectedResponse
, receivedResponse
)
363 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 3600)
365 def testSpoofRawChaosAction(self
):
367 Spoofing: Spoof a response from several raw bytes in QCLass CH
369 name
= 'rawchaos.spoofing.tests.powerdns.com.'
372 query
= dns
.message
.make_query(name
, 'TXT', 'CH')
373 query
.flags
&= ~dns
.flags
.RD
374 expectedResponse
= dns
.message
.make_response(query
)
375 expectedResponse
.flags
&= ~dns
.flags
.AA
376 rrset
= dns
.rrset
.from_text(name
,
381 expectedResponse
.answer
.append(rrset
)
383 for method
in ("sendUDPQuery", "sendTCPQuery"):
384 sender
= getattr(self
, method
)
385 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
386 self
.assertTrue(receivedResponse
)
387 self
.assertEqual(expectedResponse
, receivedResponse
)
388 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
390 def testSpoofRawANYAction(self
):
392 Spoofing: Spoof a HINFO response for ANY queries
394 name
= 'raw-any.spoofing.tests.powerdns.com.'
396 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
397 query
.flags
&= ~dns
.flags
.RD
398 expectedResponse
= dns
.message
.make_response(query
)
399 expectedResponse
.flags
&= ~dns
.flags
.AA
400 rrset
= dns
.rrset
.from_text(name
,
405 expectedResponse
.answer
.append(rrset
)
407 for method
in ("sendUDPQuery", "sendTCPQuery"):
408 sender
= getattr(self
, method
)
409 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
410 self
.assertTrue(receivedResponse
)
411 self
.assertEqual(expectedResponse
, receivedResponse
)
412 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
414 def testSpoofRawActionMulti(self
):
416 Spoofing: Spoof a response from several raw bytes
418 name
= 'multiraw.spoofing.tests.powerdns.com.'
421 query
= dns
.message
.make_query(name
, 'A', 'IN')
422 query
.flags
&= ~dns
.flags
.RD
423 expectedResponse
= dns
.message
.make_response(query
)
424 expectedResponse
.flags
&= ~dns
.flags
.AA
425 rrset
= dns
.rrset
.from_text(name
,
429 '192.0.2.1', '192.0.2.2')
430 expectedResponse
.answer
.append(rrset
)
432 for method
in ("sendUDPQuery", "sendTCPQuery"):
433 sender
= getattr(self
, method
)
434 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
435 self
.assertTrue(receivedResponse
)
436 self
.assertEqual(expectedResponse
, receivedResponse
)
437 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
440 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
441 query
.flags
&= ~dns
.flags
.RD
442 expectedResponse
= dns
.message
.make_response(query
)
443 expectedResponse
.flags
&= ~dns
.flags
.AA
444 rrset
= dns
.rrset
.from_text(name
,
448 '"aaa" "bbbb"', '"ccccccccccc"')
449 expectedResponse
.answer
.append(rrset
)
451 for method
in ("sendUDPQuery", "sendTCPQuery"):
452 sender
= getattr(self
, method
)
453 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
454 self
.assertTrue(receivedResponse
)
455 self
.assertEqual(expectedResponse
, receivedResponse
)
456 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
458 class TestSpoofingLuaSpoof(DNSDistTest
):
460 _config_template
= """
461 function spoof1rule(dq)
464 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
465 elseif(dq.qtype == 28) -- AAAA
467 return DNSAction.Spoof, "2001:DB8::1"
469 return DNSAction.None, ""
473 function spoof2rule(dq)
474 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
477 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
478 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
480 function spoofrawrule(dq)
481 if dq.qtype == DNSQType.A then
482 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
483 elseif dq.qtype == DNSQType.TXT then
484 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
485 elseif dq.qtype == DNSQType.SRV then
487 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
489 return DNSAction.None, ""
492 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
493 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
494 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
495 newServer{address="127.0.0.1:%s"}
498 def testLuaSpoofA(self
):
500 Spoofing: Spoofing an A via Lua
502 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
503 check that dnsdist sends a spoofed result.
505 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
506 query
= dns
.message
.make_query(name
, 'A', 'IN')
507 # dnsdist set RA = RD for spoofed responses
508 query
.flags
&= ~dns
.flags
.RD
509 expectedResponse
= dns
.message
.make_response(query
)
510 rrset
= dns
.rrset
.from_text(name
,
514 '192.0.2.1', '192.0.2.2')
515 expectedResponse
.answer
.append(rrset
)
517 for method
in ("sendUDPQuery", "sendTCPQuery"):
518 sender
= getattr(self
, method
)
519 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
520 self
.assertTrue(receivedResponse
)
521 self
.assertEqual(expectedResponse
, receivedResponse
)
523 def testLuaSpoofAAAA(self
):
525 Spoofing: Spoofing an AAAA via Lua
527 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
528 check that dnsdist sends a spoofed result.
530 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
531 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
532 # dnsdist set RA = RD for spoofed responses
533 query
.flags
&= ~dns
.flags
.RD
534 expectedResponse
= dns
.message
.make_response(query
)
535 rrset
= dns
.rrset
.from_text(name
,
540 expectedResponse
.answer
.append(rrset
)
542 for method
in ("sendUDPQuery", "sendTCPQuery"):
543 sender
= getattr(self
, method
)
544 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
545 self
.assertTrue(receivedResponse
)
546 self
.assertEqual(expectedResponse
, receivedResponse
)
548 def testLuaSpoofAWithCNAME(self
):
550 Spoofing: Spoofing an A with a CNAME via Lua
552 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
553 check that dnsdist sends a spoofed result.
555 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
556 query
= dns
.message
.make_query(name
, 'A', 'IN')
557 # dnsdist set RA = RD for spoofed responses
558 query
.flags
&= ~dns
.flags
.RD
559 expectedResponse
= dns
.message
.make_response(query
)
560 rrset
= dns
.rrset
.from_text(name
,
564 'spoofedcname.spoofing.tests.powerdns.com.')
565 expectedResponse
.answer
.append(rrset
)
567 for method
in ("sendUDPQuery", "sendTCPQuery"):
568 sender
= getattr(self
, method
)
569 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
570 self
.assertTrue(receivedResponse
)
571 self
.assertEqual(expectedResponse
, receivedResponse
)
573 def testLuaSpoofAAAAWithCNAME(self
):
575 Spoofing: Spoofing an AAAA with a CNAME via Lua
577 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
578 check that dnsdist sends a spoofed result.
580 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
581 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
582 # dnsdist set RA = RD for spoofed responses
583 query
.flags
&= ~dns
.flags
.RD
584 expectedResponse
= dns
.message
.make_response(query
)
585 rrset
= dns
.rrset
.from_text(name
,
589 'spoofedcname.spoofing.tests.powerdns.com.')
590 expectedResponse
.answer
.append(rrset
)
592 for method
in ("sendUDPQuery", "sendTCPQuery"):
593 sender
= getattr(self
, method
)
594 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
595 self
.assertTrue(receivedResponse
)
596 self
.assertEqual(expectedResponse
, receivedResponse
)
598 def testLuaSpoofRawAction(self
):
600 Spoofing: Spoof a response from raw bytes via Lua
602 name
= 'lua-raw.spoofing.tests.powerdns.com.'
605 query
= dns
.message
.make_query(name
, 'A', 'IN')
606 query
.flags
&= ~dns
.flags
.RD
607 expectedResponse
= dns
.message
.make_response(query
)
608 expectedResponse
.flags
&= ~dns
.flags
.AA
609 rrset
= dns
.rrset
.from_text(name
,
614 expectedResponse
.answer
.append(rrset
)
616 for method
in ("sendUDPQuery", "sendTCPQuery"):
617 sender
= getattr(self
, method
)
618 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
619 self
.assertTrue(receivedResponse
)
620 self
.assertEqual(expectedResponse
, receivedResponse
)
621 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
624 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
625 query
.flags
&= ~dns
.flags
.RD
626 expectedResponse
= dns
.message
.make_response(query
)
627 expectedResponse
.flags
&= ~dns
.flags
.AA
628 rrset
= dns
.rrset
.from_text(name
,
632 '"aaa" "bbbb" "ccccccccccc"')
633 expectedResponse
.answer
.append(rrset
)
635 for method
in ("sendUDPQuery", "sendTCPQuery"):
636 sender
= getattr(self
, method
)
637 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
638 self
.assertTrue(receivedResponse
)
639 self
.assertEqual(expectedResponse
, receivedResponse
)
640 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
643 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
644 query
.flags
&= ~dns
.flags
.RD
645 expectedResponse
= dns
.message
.make_response(query
)
646 # this one should have the AA flag set
647 expectedResponse
.flags |
= dns
.flags
.AA
648 rrset
= dns
.rrset
.from_text(name
,
652 '0 0 65535 srv.powerdns.com.')
653 expectedResponse
.answer
.append(rrset
)
655 for method
in ("sendUDPQuery", "sendTCPQuery"):
656 sender
= getattr(self
, method
)
657 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
658 self
.assertTrue(receivedResponse
)
659 self
.assertEqual(expectedResponse
, receivedResponse
)
660 # sorry, we can't set the TTL from the Lua API right now
661 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
663 class TestSpoofingLuaSpoofMulti(DNSDistTest
):
665 _config_template
= """
666 function spoof1multirule(dq)
669 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
670 return DNSAction.HeaderModify
671 elseif(dq.qtype == 28) -- AAAA
673 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
674 return DNSAction.HeaderModify
676 return DNSAction.None, ""
680 function spoofrawmultirule(dq)
681 if dq.qtype == DNSQType.A then
682 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
683 return DNSAction.HeaderModify
684 elseif dq.qtype == DNSQType.TXT then
685 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
686 return DNSAction.HeaderModify
687 elseif dq.qtype == DNSQType.SRV then
689 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
690 return DNSAction.HeaderModify
692 return DNSAction.None, ""
695 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
696 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
697 newServer{address="127.0.0.1:%s"}
700 def testLuaSpoofMultiA(self
):
702 Spoofing: Spoofing multiple A via Lua dq:spoof
704 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
705 check that dnsdist sends a spoofed result.
707 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
708 query
= dns
.message
.make_query(name
, 'A', 'IN')
709 # dnsdist set RA = RD for spoofed responses
710 query
.flags
&= ~dns
.flags
.RD
711 expectedResponse
= dns
.message
.make_response(query
)
712 rrset
= dns
.rrset
.from_text(name
,
716 '192.0.2.1', '192.0.2.2')
717 expectedResponse
.answer
.append(rrset
)
719 for method
in ("sendUDPQuery", "sendTCPQuery"):
720 sender
= getattr(self
, method
)
721 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
722 self
.assertTrue(receivedResponse
)
723 self
.assertEqual(expectedResponse
, receivedResponse
)
725 def testLuaSpoofMultiAAAA(self
):
727 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
729 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
730 check that dnsdist sends a spoofed result.
732 name
= 'luaspoof1multi.spoofing.tests.powerdns.com.'
733 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
734 # dnsdist set RA = RD for spoofed responses
735 query
.flags
&= ~dns
.flags
.RD
736 expectedResponse
= dns
.message
.make_response(query
)
737 rrset
= dns
.rrset
.from_text(name
,
741 '2001:DB8::1', '2001:DB8::2')
742 expectedResponse
.answer
.append(rrset
)
744 for method
in ("sendUDPQuery", "sendTCPQuery"):
745 sender
= getattr(self
, method
)
746 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
747 self
.assertTrue(receivedResponse
)
748 self
.assertEqual(expectedResponse
, receivedResponse
)
750 def testLuaSpoofMultiRawAction(self
):
752 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
754 name
= 'lua-raw-multi.spoofing.tests.powerdns.com.'
757 query
= dns
.message
.make_query(name
, 'A', 'IN')
758 query
.flags
&= ~dns
.flags
.RD
759 expectedResponse
= dns
.message
.make_response(query
)
760 expectedResponse
.flags
&= ~dns
.flags
.AA
761 rrset
= dns
.rrset
.from_text(name
,
765 '192.0.2.1', '192.0.2.2')
766 expectedResponse
.answer
.append(rrset
)
768 for method
in ("sendUDPQuery", "sendTCPQuery"):
769 sender
= getattr(self
, method
)
770 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
771 self
.assertTrue(receivedResponse
)
772 self
.assertEqual(expectedResponse
, receivedResponse
)
773 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
776 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
777 query
.flags
&= ~dns
.flags
.RD
778 expectedResponse
= dns
.message
.make_response(query
)
779 expectedResponse
.flags
&= ~dns
.flags
.AA
780 rrset
= dns
.rrset
.from_text(name
,
784 '"aaa" "bbbb"', '"ccccccccccc"')
785 expectedResponse
.answer
.append(rrset
)
787 for method
in ("sendUDPQuery", "sendTCPQuery"):
788 sender
= getattr(self
, method
)
789 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
790 self
.assertTrue(receivedResponse
)
791 self
.assertEqual(expectedResponse
, receivedResponse
)
792 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
795 query
= dns
.message
.make_query(name
, 'SRV', 'IN')
796 query
.flags
&= ~dns
.flags
.RD
797 expectedResponse
= dns
.message
.make_response(query
)
798 # this one should have the AA flag set
799 expectedResponse
.flags |
= dns
.flags
.AA
800 rrset
= dns
.rrset
.from_text(name
,
804 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
805 expectedResponse
.answer
.append(rrset
)
807 for method
in ("sendUDPQuery", "sendTCPQuery"):
808 sender
= getattr(self
, method
)
809 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
810 self
.assertTrue(receivedResponse
)
811 self
.assertEqual(expectedResponse
, receivedResponse
)
812 # sorry, we can't set the TTL from the Lua API right now
813 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
815 class TestSpoofingLuaFFISpoofMulti(DNSDistTest
):
817 _config_template
= """
818 local ffi = require("ffi")
820 function spoofrawmultirule(dq)
821 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
823 if qtype == DNSQType.A then
824 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
826 local str = "\\192\\000\\002\\001"
827 records[0].size = #str
828 records[0].value = str
830 local str = "\\192\\000\\002\\255"
831 records[1].value = str
832 records[1].size = #str
834 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
835 return DNSAction.HeaderModify
836 elseif qtype == DNSQType.TXT then
837 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
839 local str = "\\033this text has a comma at the end,"
840 records[0].size = #str
841 records[0].value = str
843 local str = "\\003aaa\\004bbbb"
844 records[1].size = #str
845 records[1].value = str
847 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
848 return DNSAction.HeaderModify
851 return DNSAction.None, ""
854 addAction("lua-raw-multi.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofrawmultirule))
855 newServer{address="127.0.0.1:%s"}
859 def testLuaSpoofMultiRawAction(self
):
861 Spoofing via Lua FFI: Spoof responses from raw bytes via Lua FFI
863 name
= 'lua-raw-multi.ffi-spoofing.tests.powerdns.com.'
866 query
= dns
.message
.make_query(name
, 'A', 'IN')
867 query
.flags
&= ~dns
.flags
.RD
868 expectedResponse
= dns
.message
.make_response(query
)
869 expectedResponse
.flags
&= ~dns
.flags
.AA
870 rrset
= dns
.rrset
.from_text(name
,
874 '192.0.2.1', '192.0.2.255')
875 expectedResponse
.answer
.append(rrset
)
877 for method
in ("sendUDPQuery", "sendTCPQuery"):
878 sender
= getattr(self
, method
)
879 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
880 self
.assertTrue(receivedResponse
)
881 self
.assertEqual(expectedResponse
, receivedResponse
)
882 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
885 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
886 query
.flags
&= ~dns
.flags
.RD
887 expectedResponse
= dns
.message
.make_response(query
)
888 expectedResponse
.flags
&= ~dns
.flags
.AA
889 rrset
= dns
.rrset
.from_text(name
,
893 '"this text has a comma at the end,"', '"aaa" "bbbb"')
894 expectedResponse
.answer
.append(rrset
)
896 for method
in ("sendUDPQuery", "sendTCPQuery"):
897 sender
= getattr(self
, method
)
898 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
899 self
.assertTrue(receivedResponse
)
900 self
.assertEqual(expectedResponse
, receivedResponse
)
901 self
.assertEqual(receivedResponse
.answer
[0].ttl
, 60)
903 class TestSpoofingLuaWithStatistics(DNSDistTest
):
905 _config_template
= """
906 function spoof1rule(dq)
907 queriesCount = getStatisticsCounters()['queries']
908 if(queriesCount == 1) then
909 return DNSAction.Spoof, "192.0.2.1"
910 elseif(queriesCount == 2) then
911 return DNSAction.Spoof, "192.0.2.2"
913 return DNSAction.Spoof, "192.0.2.0"
916 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
917 newServer{address="127.0.0.1:%s"}
920 def testLuaSpoofBasedOnStatistics(self
):
922 Spoofing: Spoofing an A via Lua based on statistics counters
925 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
926 query
= dns
.message
.make_query(name
, 'A', 'IN')
927 # dnsdist set RA = RD for spoofed responses
928 query
.flags
&= ~dns
.flags
.RD
929 expectedResponse1
= dns
.message
.make_response(query
)
930 rrset
= dns
.rrset
.from_text(name
,
935 expectedResponse1
.answer
.append(rrset
)
936 expectedResponse2
= dns
.message
.make_response(query
)
937 rrset
= dns
.rrset
.from_text(name
,
942 expectedResponse2
.answer
.append(rrset
)
943 expectedResponseAfterwards
= dns
.message
.make_response(query
)
944 rrset
= dns
.rrset
.from_text(name
,
949 expectedResponseAfterwards
.answer
.append(rrset
)
951 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
952 self
.assertTrue(receivedResponse
)
953 self
.assertEqual(expectedResponse1
, receivedResponse
)
955 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
956 self
.assertTrue(receivedResponse
)
957 self
.assertEqual(expectedResponse2
, receivedResponse
)
959 for method
in ("sendUDPQuery", "sendTCPQuery"):
960 sender
= getattr(self
, method
)
961 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
962 self
.assertTrue(receivedResponse
)
963 self
.assertEqual(expectedResponseAfterwards
, receivedResponse
)
965 class TestSpoofingLuaSpoofPacket(DNSDistTest
):
967 _config_template
= """
969 function spoofpacket(dq)
970 if dq.qtype == DNSQType.A then
971 return DNSAction.SpoofPacket, "\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
973 return DNSAction.None, ""
976 addAction("lua-raw-packet.spoofing.tests.powerdns.com.", LuaAction(spoofpacket))
977 local rawResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\019rule\\045lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
978 addAction(AndRule{QTypeRule(DNSQType.A), SuffixMatchNodeRule("rule-lua-raw-packet.spoofing.tests.powerdns.com.")}, SpoofPacketAction(rawResponse, string.len(rawResponse)))
980 local ffi = require("ffi")
982 function spoofpacketffi(dq)
983 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
984 if qtype == DNSQType.A then
986 local refusedResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\012ffi\\045spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
988 ffi.C.dnsdist_ffi_dnsquestion_spoof_packet(dq, refusedResponse, string.len(refusedResponse))
989 return DNSAction.HeaderModify
991 return DNSAction.None, ""
994 addAction("lua-raw-packet.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofpacketffi))
995 newServer{address="127.0.0.1:%s"}
999 def testLuaSpoofPacket(self
):
1001 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1003 for name
in ('lua-raw-packet.spoofing.tests.powerdns.com.', 'rule-lua-raw-packet.spoofing.tests.powerdns.com.'):
1005 query
= dns
.message
.make_query(name
, 'A', 'IN')
1006 expectedResponse
= dns
.message
.make_response(query
)
1007 expectedResponse
.flags |
= dns
.flags
.RA
1008 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1010 for method
in ("sendUDPQuery", "sendTCPQuery"):
1011 sender
= getattr(self
, method
)
1012 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
1013 self
.assertTrue(receivedResponse
)
1014 self
.assertEqual(expectedResponse
, receivedResponse
)
1016 def testLuaFFISpoofPacket(self
):
1018 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1020 name
= 'lua-raw-packet.ffi-spoofing.tests.powerdns.com.'
1023 query
= dns
.message
.make_query(name
, 'A', 'IN')
1024 expectedResponse
= dns
.message
.make_response(query
)
1025 expectedResponse
.flags |
= dns
.flags
.RA
1026 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1028 for method
in ("sendUDPQuery", "sendTCPQuery"):
1029 sender
= getattr(self
, method
)
1030 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
1031 self
.assertTrue(receivedResponse
)
1032 self
.assertEqual(expectedResponse
, receivedResponse
)