]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #14182 from rgacogne/ddist-dynblock-tag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addAction(SuffixMatchNodeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(SuffixMatchNodeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(SuffixMatchNodeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(SuffixMatchNodeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(SuffixMatchNodeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(SuffixMatchNodeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(SuffixMatchNodeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{SuffixMatchNodeRule("rawchaos.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT), QClassRule(DNSClass.CHAOS)}, SpoofRawAction("\\005chaos"))
20 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
21 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
22 -- rfc8482
23 addAction(AndRule{SuffixMatchNodeRule("raw-any.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.ANY)}, SpoofRawAction("\\007rfc\\056\\052\\056\\050\\000", { typeForAny=DNSQType.HINFO }))
24 newServer{address="127.0.0.1:%s"}
25 """
26
27 def testSpoofActionA(self):
28 """
29 Spoofing: Spoof A via Action
30
31 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
32 check that dnsdist sends a spoofed result.
33 """
34 name = 'spoofaction.spoofing.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 # dnsdist set RA = RD for spoofed responses
37 query.flags &= ~dns.flags.RD
38 expectedResponse = dns.message.make_response(query)
39 rrset = dns.rrset.from_text(name,
40 60,
41 dns.rdataclass.IN,
42 dns.rdatatype.A,
43 '192.0.2.1')
44 expectedResponse.answer.append(rrset)
45
46 for method in ("sendUDPQuery", "sendTCPQuery"):
47 sender = getattr(self, method)
48 (_, receivedResponse) = sender(query, response=None, useQueue=False)
49 self.assertTrue(receivedResponse)
50 self.assertEqual(expectedResponse, receivedResponse)
51
52 def testSpoofActionAAAA(self):
53 """
54 Spoofing: Spoof AAAA via Action
55
56 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
57 check that dnsdist sends a spoofed result.
58 """
59 name = 'spoofaction.spoofing.tests.powerdns.com.'
60 query = dns.message.make_query(name, 'AAAA', 'IN')
61 # dnsdist set RA = RD for spoofed responses
62 query.flags &= ~dns.flags.RD
63 expectedResponse = dns.message.make_response(query)
64 rrset = dns.rrset.from_text(name,
65 60,
66 dns.rdataclass.IN,
67 dns.rdatatype.AAAA,
68 '2001:DB8::1')
69 expectedResponse.answer.append(rrset)
70
71 for method in ("sendUDPQuery", "sendTCPQuery"):
72 sender = getattr(self, method)
73 (_, receivedResponse) = sender(query, response=None, useQueue=False)
74 self.assertTrue(receivedResponse)
75 self.assertEqual(expectedResponse, receivedResponse)
76
77 def testSpoofActionCNAME(self):
78 """
79 Spoofing: Spoof CNAME via Action
80
81 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
82 check that dnsdist sends a spoofed result.
83 """
84 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
85 query = dns.message.make_query(name, 'A', 'IN')
86 # dnsdist set RA = RD for spoofed responses
87 query.flags &= ~dns.flags.RD
88 expectedResponse = dns.message.make_response(query)
89 rrset = dns.rrset.from_text(name,
90 60,
91 dns.rdataclass.IN,
92 dns.rdatatype.CNAME,
93 'cnameaction.spoofing.tests.powerdns.com.')
94 expectedResponse.answer.append(rrset)
95
96 for method in ("sendUDPQuery", "sendTCPQuery"):
97 sender = getattr(self, method)
98 (_, receivedResponse) = sender(query, response=None, useQueue=False)
99 self.assertTrue(receivedResponse)
100 self.assertEqual(expectedResponse, receivedResponse)
101
102 def testSpoofActionMultiA(self):
103 """
104 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
105
106 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
107 check that dnsdist sends a spoofed result.
108 """
109 name = 'multispoof.spoofing.tests.powerdns.com.'
110 query = dns.message.make_query(name, 'A', 'IN')
111 # dnsdist set RA = RD for spoofed responses
112 query.flags &= ~dns.flags.RD
113 expectedResponse = dns.message.make_response(query)
114 rrset = dns.rrset.from_text(name,
115 60,
116 dns.rdataclass.IN,
117 dns.rdatatype.A,
118 '192.0.2.2', '192.0.2.1')
119 expectedResponse.answer.append(rrset)
120
121 for method in ("sendUDPQuery", "sendTCPQuery"):
122 sender = getattr(self, method)
123 (_, receivedResponse) = sender(query, response=None, useQueue=False)
124 self.assertTrue(receivedResponse)
125 self.assertEqual(expectedResponse, receivedResponse)
126
127 def testSpoofActionMultiAAAA(self):
128 """
129 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
130
131 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
132 check that dnsdist sends a spoofed result.
133 """
134 name = 'multispoof.spoofing.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'AAAA', 'IN')
136 # dnsdist set RA = RD for spoofed responses
137 query.flags &= ~dns.flags.RD
138 expectedResponse = dns.message.make_response(query)
139 rrset = dns.rrset.from_text(name,
140 60,
141 dns.rdataclass.IN,
142 dns.rdatatype.AAAA,
143 '2001:DB8::1', '2001:DB8::2')
144 expectedResponse.answer.append(rrset)
145
146 for method in ("sendUDPQuery", "sendTCPQuery"):
147 sender = getattr(self, method)
148 (_, receivedResponse) = sender(query, response=None, useQueue=False)
149 self.assertTrue(receivedResponse)
150 self.assertEqual(expectedResponse, receivedResponse)
151
152 def testSpoofActionMultiANY(self):
153 """
154 Spoofing: Spoof multiple addresses via AddDomainSpoof
155
156 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
157 check that dnsdist sends a spoofed result.
158 """
159 name = 'multispoof.spoofing.tests.powerdns.com.'
160 query = dns.message.make_query(name, 'ANY', 'IN')
161 # dnsdist set RA = RD for spoofed responses
162 query.flags &= ~dns.flags.RD
163 expectedResponse = dns.message.make_response(query)
164
165 rrset = dns.rrset.from_text(name,
166 60,
167 dns.rdataclass.IN,
168 dns.rdatatype.A,
169 '192.0.2.2', '192.0.2.1')
170 expectedResponse.answer.append(rrset)
171
172 rrset = dns.rrset.from_text(name,
173 60,
174 dns.rdataclass.IN,
175 dns.rdatatype.AAAA,
176 '2001:DB8::1', '2001:DB8::2')
177 expectedResponse.answer.append(rrset)
178
179 for method in ("sendUDPQuery", "sendTCPQuery"):
180 sender = getattr(self, method)
181 (_, receivedResponse) = sender(query, response=None, useQueue=False)
182 self.assertTrue(receivedResponse)
183 self.assertEqual(expectedResponse, receivedResponse)
184
185 def testSpoofActionSetAA(self):
186 """
187 Spoofing: Spoof via Action, setting AA=1
188 """
189 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
190 query = dns.message.make_query(name, 'AAAA', 'IN')
191 # dnsdist set RA = RD for spoofed responses
192 query.flags &= ~dns.flags.RD
193 expectedResponse = dns.message.make_response(query)
194 expectedResponse.flags |= dns.flags.AA
195 rrset = dns.rrset.from_text(name,
196 60,
197 dns.rdataclass.IN,
198 dns.rdatatype.AAAA,
199 '2001:DB8::1')
200 expectedResponse.answer.append(rrset)
201
202 for method in ("sendUDPQuery", "sendTCPQuery"):
203 sender = getattr(self, method)
204 (_, receivedResponse) = sender(query, response=None, useQueue=False)
205 self.assertTrue(receivedResponse)
206 self.assertEqual(expectedResponse, receivedResponse)
207 self.assertEqual(receivedResponse.answer[0].ttl, 60)
208
209 def testSpoofActionSetAD(self):
210 """
211 Spoofing: Spoof via Action, setting AD=1
212 """
213 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'AAAA', 'IN')
215 # dnsdist set RA = RD for spoofed responses
216 query.flags &= ~dns.flags.RD
217 expectedResponse = dns.message.make_response(query)
218 expectedResponse.flags |= dns.flags.AD
219 rrset = dns.rrset.from_text(name,
220 60,
221 dns.rdataclass.IN,
222 dns.rdatatype.AAAA,
223 '2001:DB8::1')
224 expectedResponse.answer.append(rrset)
225
226 for method in ("sendUDPQuery", "sendTCPQuery"):
227 sender = getattr(self, method)
228 (_, receivedResponse) = sender(query, response=None, useQueue=False)
229 self.assertTrue(receivedResponse)
230 self.assertEqual(expectedResponse, receivedResponse)
231 self.assertEqual(receivedResponse.answer[0].ttl, 60)
232
233 def testSpoofActionSetRA(self):
234 """
235 Spoofing: Spoof via Action, setting RA=1
236 """
237 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'AAAA', 'IN')
239 # dnsdist set RA = RD for spoofed responses
240 query.flags &= ~dns.flags.RD
241 expectedResponse = dns.message.make_response(query)
242 expectedResponse.flags |= dns.flags.RA
243 rrset = dns.rrset.from_text(name,
244 60,
245 dns.rdataclass.IN,
246 dns.rdatatype.AAAA,
247 '2001:DB8::1')
248 expectedResponse.answer.append(rrset)
249
250 for method in ("sendUDPQuery", "sendTCPQuery"):
251 sender = getattr(self, method)
252 (_, receivedResponse) = sender(query, response=None, useQueue=False)
253 self.assertTrue(receivedResponse)
254 self.assertEqual(expectedResponse, receivedResponse)
255 self.assertEqual(receivedResponse.answer[0].ttl, 60)
256
257 def testSpoofActionSetNoRA(self):
258 """
259 Spoofing: Spoof via Action, setting RA=0
260 """
261 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
262 query = dns.message.make_query(name, 'AAAA', 'IN')
263 expectedResponse = dns.message.make_response(query)
264 expectedResponse.flags &= ~dns.flags.RA
265 rrset = dns.rrset.from_text(name,
266 60,
267 dns.rdataclass.IN,
268 dns.rdatatype.AAAA,
269 '2001:DB8::1')
270 expectedResponse.answer.append(rrset)
271
272 for method in ("sendUDPQuery", "sendTCPQuery"):
273 sender = getattr(self, method)
274 (_, receivedResponse) = sender(query, response=None, useQueue=False)
275 self.assertTrue(receivedResponse)
276 self.assertEqual(expectedResponse, receivedResponse)
277 self.assertEqual(receivedResponse.answer[0].ttl, 60)
278
279 def testSpoofActionSetTTL(self):
280 """
281 Spoofing: Spoof via Action, setting the TTL to 1500
282 """
283 name = 'spoofaction-ttl.spoofing.tests.powerdns.com.'
284 query = dns.message.make_query(name, 'AAAA', 'IN')
285 expectedResponse = dns.message.make_response(query)
286 expectedResponse.flags |= dns.flags.RA
287 rrset = dns.rrset.from_text(name,
288 60,
289 dns.rdataclass.IN,
290 dns.rdatatype.AAAA,
291 '2001:DB8::1')
292 expectedResponse.answer.append(rrset)
293
294 for method in ("sendUDPQuery", "sendTCPQuery"):
295 sender = getattr(self, method)
296 (_, receivedResponse) = sender(query, response=None, useQueue=False)
297 self.assertTrue(receivedResponse)
298 self.assertEqual(expectedResponse, receivedResponse)
299 self.assertEqual(receivedResponse.answer[0].ttl, 1500)
300
301 def testSpoofRawAction(self):
302 """
303 Spoofing: Spoof a response from raw bytes
304 """
305 name = 'raw.spoofing.tests.powerdns.com.'
306
307 # A
308 query = dns.message.make_query(name, 'A', 'IN')
309 query.flags &= ~dns.flags.RD
310 expectedResponse = dns.message.make_response(query)
311 expectedResponse.flags &= ~dns.flags.AA
312 rrset = dns.rrset.from_text(name,
313 60,
314 dns.rdataclass.IN,
315 dns.rdatatype.A,
316 '192.0.2.1')
317 expectedResponse.answer.append(rrset)
318
319 for method in ("sendUDPQuery", "sendTCPQuery"):
320 sender = getattr(self, method)
321 (_, receivedResponse) = sender(query, response=None, useQueue=False)
322 self.assertTrue(receivedResponse)
323 self.assertEqual(expectedResponse, receivedResponse)
324 self.assertEqual(receivedResponse.answer[0].ttl, 60)
325
326 # TXT
327 query = dns.message.make_query(name, 'TXT', 'IN')
328 query.flags &= ~dns.flags.RD
329 expectedResponse = dns.message.make_response(query)
330 expectedResponse.flags &= ~dns.flags.AA
331 rrset = dns.rrset.from_text(name,
332 60,
333 dns.rdataclass.IN,
334 dns.rdatatype.TXT,
335 '"aaa" "bbbb" "ccccccccccc"')
336 expectedResponse.answer.append(rrset)
337
338 for method in ("sendUDPQuery", "sendTCPQuery"):
339 sender = getattr(self, method)
340 (_, receivedResponse) = sender(query, response=None, useQueue=False)
341 self.assertTrue(receivedResponse)
342 self.assertEqual(expectedResponse, receivedResponse)
343 self.assertEqual(receivedResponse.answer[0].ttl, 60)
344
345 # SRV
346 query = dns.message.make_query(name, 'SRV', 'IN')
347 query.flags &= ~dns.flags.RD
348 expectedResponse = dns.message.make_response(query)
349 # this one should have the AA flag set
350 expectedResponse.flags |= dns.flags.AA
351 rrset = dns.rrset.from_text(name,
352 3600,
353 dns.rdataclass.IN,
354 dns.rdatatype.SRV,
355 '0 0 65535 srv.powerdns.com.')
356 expectedResponse.answer.append(rrset)
357
358 for method in ("sendUDPQuery", "sendTCPQuery"):
359 sender = getattr(self, method)
360 (_, receivedResponse) = sender(query, response=None, useQueue=False)
361 self.assertTrue(receivedResponse)
362 self.assertEqual(expectedResponse, receivedResponse)
363 self.assertEqual(receivedResponse.answer[0].ttl, 3600)
364
365 def testSpoofRawChaosAction(self):
366 """
367 Spoofing: Spoof a response from several raw bytes in QCLass CH
368 """
369 name = 'rawchaos.spoofing.tests.powerdns.com.'
370
371 # TXT CH
372 query = dns.message.make_query(name, 'TXT', 'CH')
373 query.flags &= ~dns.flags.RD
374 expectedResponse = dns.message.make_response(query)
375 expectedResponse.flags &= ~dns.flags.AA
376 rrset = dns.rrset.from_text(name,
377 60,
378 dns.rdataclass.CH,
379 dns.rdatatype.TXT,
380 '"chaos"')
381 expectedResponse.answer.append(rrset)
382
383 for method in ("sendUDPQuery", "sendTCPQuery"):
384 sender = getattr(self, method)
385 (_, receivedResponse) = sender(query, response=None, useQueue=False)
386 self.assertTrue(receivedResponse)
387 self.assertEqual(expectedResponse, receivedResponse)
388 self.assertEqual(receivedResponse.answer[0].ttl, 60)
389
390 def testSpoofRawANYAction(self):
391 """
392 Spoofing: Spoof a HINFO response for ANY queries
393 """
394 name = 'raw-any.spoofing.tests.powerdns.com.'
395
396 query = dns.message.make_query(name, 'ANY', 'IN')
397 query.flags &= ~dns.flags.RD
398 expectedResponse = dns.message.make_response(query)
399 expectedResponse.flags &= ~dns.flags.AA
400 rrset = dns.rrset.from_text(name,
401 60,
402 dns.rdataclass.IN,
403 dns.rdatatype.HINFO,
404 '"rfc8482" ""')
405 expectedResponse.answer.append(rrset)
406
407 for method in ("sendUDPQuery", "sendTCPQuery"):
408 sender = getattr(self, method)
409 (_, receivedResponse) = sender(query, response=None, useQueue=False)
410 self.assertTrue(receivedResponse)
411 self.assertEqual(expectedResponse, receivedResponse)
412 self.assertEqual(receivedResponse.answer[0].ttl, 60)
413
414 def testSpoofRawActionMulti(self):
415 """
416 Spoofing: Spoof a response from several raw bytes
417 """
418 name = 'multiraw.spoofing.tests.powerdns.com.'
419
420 # A
421 query = dns.message.make_query(name, 'A', 'IN')
422 query.flags &= ~dns.flags.RD
423 expectedResponse = dns.message.make_response(query)
424 expectedResponse.flags &= ~dns.flags.AA
425 rrset = dns.rrset.from_text(name,
426 60,
427 dns.rdataclass.IN,
428 dns.rdatatype.A,
429 '192.0.2.1', '192.0.2.2')
430 expectedResponse.answer.append(rrset)
431
432 for method in ("sendUDPQuery", "sendTCPQuery"):
433 sender = getattr(self, method)
434 (_, receivedResponse) = sender(query, response=None, useQueue=False)
435 self.assertTrue(receivedResponse)
436 self.assertEqual(expectedResponse, receivedResponse)
437 self.assertEqual(receivedResponse.answer[0].ttl, 60)
438
439 # TXT
440 query = dns.message.make_query(name, 'TXT', 'IN')
441 query.flags &= ~dns.flags.RD
442 expectedResponse = dns.message.make_response(query)
443 expectedResponse.flags &= ~dns.flags.AA
444 rrset = dns.rrset.from_text(name,
445 60,
446 dns.rdataclass.IN,
447 dns.rdatatype.TXT,
448 '"aaa" "bbbb"', '"ccccccccccc"')
449 expectedResponse.answer.append(rrset)
450
451 for method in ("sendUDPQuery", "sendTCPQuery"):
452 sender = getattr(self, method)
453 (_, receivedResponse) = sender(query, response=None, useQueue=False)
454 self.assertTrue(receivedResponse)
455 self.assertEqual(expectedResponse, receivedResponse)
456 self.assertEqual(receivedResponse.answer[0].ttl, 60)
457
458 class TestSpoofingLuaSpoof(DNSDistTest):
459
460 _config_template = """
461 function spoof1rule(dq)
462 if(dq.qtype==1) -- A
463 then
464 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
465 elseif(dq.qtype == 28) -- AAAA
466 then
467 return DNSAction.Spoof, "2001:DB8::1"
468 else
469 return DNSAction.None, ""
470 end
471 end
472
473 function spoof2rule(dq)
474 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
475 end
476
477 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
478 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
479
480 function spoofrawrule(dq)
481 if dq.qtype == DNSQType.A then
482 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
483 elseif dq.qtype == DNSQType.TXT then
484 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
485 elseif dq.qtype == DNSQType.SRV then
486 dq.dh:setAA(true)
487 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
488 end
489 return DNSAction.None, ""
490 end
491
492 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
493 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
494 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
495 newServer{address="127.0.0.1:%s"}
496 """
497
498 def testLuaSpoofA(self):
499 """
500 Spoofing: Spoofing an A via Lua
501
502 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
503 check that dnsdist sends a spoofed result.
504 """
505 name = 'luaspoof1.spoofing.tests.powerdns.com.'
506 query = dns.message.make_query(name, 'A', 'IN')
507 # dnsdist set RA = RD for spoofed responses
508 query.flags &= ~dns.flags.RD
509 expectedResponse = dns.message.make_response(query)
510 rrset = dns.rrset.from_text(name,
511 60,
512 dns.rdataclass.IN,
513 dns.rdatatype.A,
514 '192.0.2.1', '192.0.2.2')
515 expectedResponse.answer.append(rrset)
516
517 for method in ("sendUDPQuery", "sendTCPQuery"):
518 sender = getattr(self, method)
519 (_, receivedResponse) = sender(query, response=None, useQueue=False)
520 self.assertTrue(receivedResponse)
521 self.assertEqual(expectedResponse, receivedResponse)
522
523 def testLuaSpoofAAAA(self):
524 """
525 Spoofing: Spoofing an AAAA via Lua
526
527 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
528 check that dnsdist sends a spoofed result.
529 """
530 name = 'luaspoof1.spoofing.tests.powerdns.com.'
531 query = dns.message.make_query(name, 'AAAA', 'IN')
532 # dnsdist set RA = RD for spoofed responses
533 query.flags &= ~dns.flags.RD
534 expectedResponse = dns.message.make_response(query)
535 rrset = dns.rrset.from_text(name,
536 60,
537 dns.rdataclass.IN,
538 dns.rdatatype.AAAA,
539 '2001:DB8::1')
540 expectedResponse.answer.append(rrset)
541
542 for method in ("sendUDPQuery", "sendTCPQuery"):
543 sender = getattr(self, method)
544 (_, receivedResponse) = sender(query, response=None, useQueue=False)
545 self.assertTrue(receivedResponse)
546 self.assertEqual(expectedResponse, receivedResponse)
547
548 def testLuaSpoofAWithCNAME(self):
549 """
550 Spoofing: Spoofing an A with a CNAME via Lua
551
552 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
553 check that dnsdist sends a spoofed result.
554 """
555 name = 'luaspoof2.spoofing.tests.powerdns.com.'
556 query = dns.message.make_query(name, 'A', 'IN')
557 # dnsdist set RA = RD for spoofed responses
558 query.flags &= ~dns.flags.RD
559 expectedResponse = dns.message.make_response(query)
560 rrset = dns.rrset.from_text(name,
561 60,
562 dns.rdataclass.IN,
563 dns.rdatatype.CNAME,
564 'spoofedcname.spoofing.tests.powerdns.com.')
565 expectedResponse.answer.append(rrset)
566
567 for method in ("sendUDPQuery", "sendTCPQuery"):
568 sender = getattr(self, method)
569 (_, receivedResponse) = sender(query, response=None, useQueue=False)
570 self.assertTrue(receivedResponse)
571 self.assertEqual(expectedResponse, receivedResponse)
572
573 def testLuaSpoofAAAAWithCNAME(self):
574 """
575 Spoofing: Spoofing an AAAA with a CNAME via Lua
576
577 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
578 check that dnsdist sends a spoofed result.
579 """
580 name = 'luaspoof2.spoofing.tests.powerdns.com.'
581 query = dns.message.make_query(name, 'AAAA', 'IN')
582 # dnsdist set RA = RD for spoofed responses
583 query.flags &= ~dns.flags.RD
584 expectedResponse = dns.message.make_response(query)
585 rrset = dns.rrset.from_text(name,
586 60,
587 dns.rdataclass.IN,
588 dns.rdatatype.CNAME,
589 'spoofedcname.spoofing.tests.powerdns.com.')
590 expectedResponse.answer.append(rrset)
591
592 for method in ("sendUDPQuery", "sendTCPQuery"):
593 sender = getattr(self, method)
594 (_, receivedResponse) = sender(query, response=None, useQueue=False)
595 self.assertTrue(receivedResponse)
596 self.assertEqual(expectedResponse, receivedResponse)
597
598 def testLuaSpoofRawAction(self):
599 """
600 Spoofing: Spoof a response from raw bytes via Lua
601 """
602 name = 'lua-raw.spoofing.tests.powerdns.com.'
603
604 # A
605 query = dns.message.make_query(name, 'A', 'IN')
606 query.flags &= ~dns.flags.RD
607 expectedResponse = dns.message.make_response(query)
608 expectedResponse.flags &= ~dns.flags.AA
609 rrset = dns.rrset.from_text(name,
610 60,
611 dns.rdataclass.IN,
612 dns.rdatatype.A,
613 '192.0.2.1')
614 expectedResponse.answer.append(rrset)
615
616 for method in ("sendUDPQuery", "sendTCPQuery"):
617 sender = getattr(self, method)
618 (_, receivedResponse) = sender(query, response=None, useQueue=False)
619 self.assertTrue(receivedResponse)
620 self.assertEqual(expectedResponse, receivedResponse)
621 self.assertEqual(receivedResponse.answer[0].ttl, 60)
622
623 # TXT
624 query = dns.message.make_query(name, 'TXT', 'IN')
625 query.flags &= ~dns.flags.RD
626 expectedResponse = dns.message.make_response(query)
627 expectedResponse.flags &= ~dns.flags.AA
628 rrset = dns.rrset.from_text(name,
629 60,
630 dns.rdataclass.IN,
631 dns.rdatatype.TXT,
632 '"aaa" "bbbb" "ccccccccccc"')
633 expectedResponse.answer.append(rrset)
634
635 for method in ("sendUDPQuery", "sendTCPQuery"):
636 sender = getattr(self, method)
637 (_, receivedResponse) = sender(query, response=None, useQueue=False)
638 self.assertTrue(receivedResponse)
639 self.assertEqual(expectedResponse, receivedResponse)
640 self.assertEqual(receivedResponse.answer[0].ttl, 60)
641
642 # SRV
643 query = dns.message.make_query(name, 'SRV', 'IN')
644 query.flags &= ~dns.flags.RD
645 expectedResponse = dns.message.make_response(query)
646 # this one should have the AA flag set
647 expectedResponse.flags |= dns.flags.AA
648 rrset = dns.rrset.from_text(name,
649 3600,
650 dns.rdataclass.IN,
651 dns.rdatatype.SRV,
652 '0 0 65535 srv.powerdns.com.')
653 expectedResponse.answer.append(rrset)
654
655 for method in ("sendUDPQuery", "sendTCPQuery"):
656 sender = getattr(self, method)
657 (_, receivedResponse) = sender(query, response=None, useQueue=False)
658 self.assertTrue(receivedResponse)
659 self.assertEqual(expectedResponse, receivedResponse)
660 # sorry, we can't set the TTL from the Lua API right now
661 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
662
663 class TestSpoofingLuaSpoofMulti(DNSDistTest):
664
665 _config_template = """
666 function spoof1multirule(dq)
667 if(dq.qtype==1) -- A
668 then
669 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
670 return DNSAction.HeaderModify
671 elseif(dq.qtype == 28) -- AAAA
672 then
673 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
674 return DNSAction.HeaderModify
675 else
676 return DNSAction.None, ""
677 end
678 end
679
680 function spoofrawmultirule(dq)
681 if dq.qtype == DNSQType.A then
682 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
683 return DNSAction.HeaderModify
684 elseif dq.qtype == DNSQType.TXT then
685 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
686 return DNSAction.HeaderModify
687 elseif dq.qtype == DNSQType.SRV then
688 dq.dh:setAA(true)
689 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
690 return DNSAction.HeaderModify
691 end
692 return DNSAction.None, ""
693 end
694
695 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
696 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
697 newServer{address="127.0.0.1:%s"}
698 """
699
700 def testLuaSpoofMultiA(self):
701 """
702 Spoofing: Spoofing multiple A via Lua dq:spoof
703
704 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
705 check that dnsdist sends a spoofed result.
706 """
707 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
708 query = dns.message.make_query(name, 'A', 'IN')
709 # dnsdist set RA = RD for spoofed responses
710 query.flags &= ~dns.flags.RD
711 expectedResponse = dns.message.make_response(query)
712 rrset = dns.rrset.from_text(name,
713 60,
714 dns.rdataclass.IN,
715 dns.rdatatype.A,
716 '192.0.2.1', '192.0.2.2')
717 expectedResponse.answer.append(rrset)
718
719 for method in ("sendUDPQuery", "sendTCPQuery"):
720 sender = getattr(self, method)
721 (_, receivedResponse) = sender(query, response=None, useQueue=False)
722 self.assertTrue(receivedResponse)
723 self.assertEqual(expectedResponse, receivedResponse)
724
725 def testLuaSpoofMultiAAAA(self):
726 """
727 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
728
729 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
730 check that dnsdist sends a spoofed result.
731 """
732 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
733 query = dns.message.make_query(name, 'AAAA', 'IN')
734 # dnsdist set RA = RD for spoofed responses
735 query.flags &= ~dns.flags.RD
736 expectedResponse = dns.message.make_response(query)
737 rrset = dns.rrset.from_text(name,
738 60,
739 dns.rdataclass.IN,
740 dns.rdatatype.AAAA,
741 '2001:DB8::1', '2001:DB8::2')
742 expectedResponse.answer.append(rrset)
743
744 for method in ("sendUDPQuery", "sendTCPQuery"):
745 sender = getattr(self, method)
746 (_, receivedResponse) = sender(query, response=None, useQueue=False)
747 self.assertTrue(receivedResponse)
748 self.assertEqual(expectedResponse, receivedResponse)
749
750 def testLuaSpoofMultiRawAction(self):
751 """
752 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
753 """
754 name = 'lua-raw-multi.spoofing.tests.powerdns.com.'
755
756 # A
757 query = dns.message.make_query(name, 'A', 'IN')
758 query.flags &= ~dns.flags.RD
759 expectedResponse = dns.message.make_response(query)
760 expectedResponse.flags &= ~dns.flags.AA
761 rrset = dns.rrset.from_text(name,
762 60,
763 dns.rdataclass.IN,
764 dns.rdatatype.A,
765 '192.0.2.1', '192.0.2.2')
766 expectedResponse.answer.append(rrset)
767
768 for method in ("sendUDPQuery", "sendTCPQuery"):
769 sender = getattr(self, method)
770 (_, receivedResponse) = sender(query, response=None, useQueue=False)
771 self.assertTrue(receivedResponse)
772 self.assertEqual(expectedResponse, receivedResponse)
773 self.assertEqual(receivedResponse.answer[0].ttl, 60)
774
775 # TXT
776 query = dns.message.make_query(name, 'TXT', 'IN')
777 query.flags &= ~dns.flags.RD
778 expectedResponse = dns.message.make_response(query)
779 expectedResponse.flags &= ~dns.flags.AA
780 rrset = dns.rrset.from_text(name,
781 60,
782 dns.rdataclass.IN,
783 dns.rdatatype.TXT,
784 '"aaa" "bbbb"', '"ccccccccccc"')
785 expectedResponse.answer.append(rrset)
786
787 for method in ("sendUDPQuery", "sendTCPQuery"):
788 sender = getattr(self, method)
789 (_, receivedResponse) = sender(query, response=None, useQueue=False)
790 self.assertTrue(receivedResponse)
791 self.assertEqual(expectedResponse, receivedResponse)
792 self.assertEqual(receivedResponse.answer[0].ttl, 60)
793
794 # SRV
795 query = dns.message.make_query(name, 'SRV', 'IN')
796 query.flags &= ~dns.flags.RD
797 expectedResponse = dns.message.make_response(query)
798 # this one should have the AA flag set
799 expectedResponse.flags |= dns.flags.AA
800 rrset = dns.rrset.from_text(name,
801 3600,
802 dns.rdataclass.IN,
803 dns.rdatatype.SRV,
804 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
805 expectedResponse.answer.append(rrset)
806
807 for method in ("sendUDPQuery", "sendTCPQuery"):
808 sender = getattr(self, method)
809 (_, receivedResponse) = sender(query, response=None, useQueue=False)
810 self.assertTrue(receivedResponse)
811 self.assertEqual(expectedResponse, receivedResponse)
812 # sorry, we can't set the TTL from the Lua API right now
813 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
814
815 class TestSpoofingLuaFFISpoofMulti(DNSDistTest):
816
817 _config_template = """
818 local ffi = require("ffi")
819
820 function spoofrawmultirule(dq)
821 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
822
823 if qtype == DNSQType.A then
824 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
825
826 local str = "\\192\\000\\002\\001"
827 records[0].size = #str
828 records[0].value = str
829
830 local str = "\\192\\000\\002\\255"
831 records[1].value = str
832 records[1].size = #str
833
834 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
835 return DNSAction.HeaderModify
836 elseif qtype == DNSQType.TXT then
837 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
838
839 local str = "\\033this text has a comma at the end,"
840 records[0].size = #str
841 records[0].value = str
842
843 local str = "\\003aaa\\004bbbb"
844 records[1].size = #str
845 records[1].value = str
846
847 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
848 return DNSAction.HeaderModify
849 end
850
851 return DNSAction.None, ""
852 end
853
854 addAction("lua-raw-multi.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofrawmultirule))
855 newServer{address="127.0.0.1:%s"}
856 """
857 _verboseMode = True
858
859 def testLuaSpoofMultiRawAction(self):
860 """
861 Spoofing via Lua FFI: Spoof responses from raw bytes via Lua FFI
862 """
863 name = 'lua-raw-multi.ffi-spoofing.tests.powerdns.com.'
864
865 # A
866 query = dns.message.make_query(name, 'A', 'IN')
867 query.flags &= ~dns.flags.RD
868 expectedResponse = dns.message.make_response(query)
869 expectedResponse.flags &= ~dns.flags.AA
870 rrset = dns.rrset.from_text(name,
871 60,
872 dns.rdataclass.IN,
873 dns.rdatatype.A,
874 '192.0.2.1', '192.0.2.255')
875 expectedResponse.answer.append(rrset)
876
877 for method in ("sendUDPQuery", "sendTCPQuery"):
878 sender = getattr(self, method)
879 (_, receivedResponse) = sender(query, response=None, useQueue=False)
880 self.assertTrue(receivedResponse)
881 self.assertEqual(expectedResponse, receivedResponse)
882 self.assertEqual(receivedResponse.answer[0].ttl, 60)
883
884 # TXT
885 query = dns.message.make_query(name, 'TXT', 'IN')
886 query.flags &= ~dns.flags.RD
887 expectedResponse = dns.message.make_response(query)
888 expectedResponse.flags &= ~dns.flags.AA
889 rrset = dns.rrset.from_text(name,
890 60,
891 dns.rdataclass.IN,
892 dns.rdatatype.TXT,
893 '"this text has a comma at the end,"', '"aaa" "bbbb"')
894 expectedResponse.answer.append(rrset)
895
896 for method in ("sendUDPQuery", "sendTCPQuery"):
897 sender = getattr(self, method)
898 (_, receivedResponse) = sender(query, response=None, useQueue=False)
899 self.assertTrue(receivedResponse)
900 self.assertEqual(expectedResponse, receivedResponse)
901 self.assertEqual(receivedResponse.answer[0].ttl, 60)
902
903 class TestSpoofingLuaWithStatistics(DNSDistTest):
904
905 _config_template = """
906 function spoof1rule(dq)
907 queriesCount = getStatisticsCounters()['queries']
908 if(queriesCount == 1) then
909 return DNSAction.Spoof, "192.0.2.1"
910 elseif(queriesCount == 2) then
911 return DNSAction.Spoof, "192.0.2.2"
912 else
913 return DNSAction.Spoof, "192.0.2.0"
914 end
915 end
916 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
917 newServer{address="127.0.0.1:%s"}
918 """
919
920 def testLuaSpoofBasedOnStatistics(self):
921 """
922 Spoofing: Spoofing an A via Lua based on statistics counters
923
924 """
925 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
926 query = dns.message.make_query(name, 'A', 'IN')
927 # dnsdist set RA = RD for spoofed responses
928 query.flags &= ~dns.flags.RD
929 expectedResponse1 = dns.message.make_response(query)
930 rrset = dns.rrset.from_text(name,
931 60,
932 dns.rdataclass.IN,
933 dns.rdatatype.A,
934 '192.0.2.1')
935 expectedResponse1.answer.append(rrset)
936 expectedResponse2 = dns.message.make_response(query)
937 rrset = dns.rrset.from_text(name,
938 60,
939 dns.rdataclass.IN,
940 dns.rdatatype.A,
941 '192.0.2.2')
942 expectedResponse2.answer.append(rrset)
943 expectedResponseAfterwards = dns.message.make_response(query)
944 rrset = dns.rrset.from_text(name,
945 60,
946 dns.rdataclass.IN,
947 dns.rdatatype.A,
948 '192.0.2.0')
949 expectedResponseAfterwards.answer.append(rrset)
950
951 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
952 self.assertTrue(receivedResponse)
953 self.assertEqual(expectedResponse1, receivedResponse)
954
955 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
956 self.assertTrue(receivedResponse)
957 self.assertEqual(expectedResponse2, receivedResponse)
958
959 for method in ("sendUDPQuery", "sendTCPQuery"):
960 sender = getattr(self, method)
961 (_, receivedResponse) = sender(query, response=None, useQueue=False)
962 self.assertTrue(receivedResponse)
963 self.assertEqual(expectedResponseAfterwards, receivedResponse)
964
965 class TestSpoofingLuaSpoofPacket(DNSDistTest):
966
967 _config_template = """
968
969 function spoofpacket(dq)
970 if dq.qtype == DNSQType.A then
971 return DNSAction.SpoofPacket, "\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
972 end
973 return DNSAction.None, ""
974 end
975
976 addAction("lua-raw-packet.spoofing.tests.powerdns.com.", LuaAction(spoofpacket))
977 local rawResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\019rule\\045lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
978 addAction(AndRule{QTypeRule(DNSQType.A), SuffixMatchNodeRule("rule-lua-raw-packet.spoofing.tests.powerdns.com.")}, SpoofPacketAction(rawResponse, string.len(rawResponse)))
979
980 local ffi = require("ffi")
981
982 function spoofpacketffi(dq)
983 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
984 if qtype == DNSQType.A then
985 -- REFUSED answer
986 local refusedResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\012ffi\\045spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
987
988 ffi.C.dnsdist_ffi_dnsquestion_spoof_packet(dq, refusedResponse, string.len(refusedResponse))
989 return DNSAction.HeaderModify
990 end
991 return DNSAction.None, ""
992 end
993
994 addAction("lua-raw-packet.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofpacketffi))
995 newServer{address="127.0.0.1:%s"}
996 """
997 _verboseMode = True
998
999 def testLuaSpoofPacket(self):
1000 """
1001 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1002 """
1003 for name in ('lua-raw-packet.spoofing.tests.powerdns.com.', 'rule-lua-raw-packet.spoofing.tests.powerdns.com.'):
1004
1005 query = dns.message.make_query(name, 'A', 'IN')
1006 expectedResponse = dns.message.make_response(query)
1007 expectedResponse.flags |= dns.flags.RA
1008 expectedResponse.set_rcode(dns.rcode.REFUSED)
1009
1010 for method in ("sendUDPQuery", "sendTCPQuery"):
1011 sender = getattr(self, method)
1012 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1013 self.assertTrue(receivedResponse)
1014 self.assertEqual(expectedResponse, receivedResponse)
1015
1016 def testLuaFFISpoofPacket(self):
1017 """
1018 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1019 """
1020 name = 'lua-raw-packet.ffi-spoofing.tests.powerdns.com.'
1021
1022 #
1023 query = dns.message.make_query(name, 'A', 'IN')
1024 expectedResponse = dns.message.make_response(query)
1025 expectedResponse.flags |= dns.flags.RA
1026 expectedResponse.set_rcode(dns.rcode.REFUSED)
1027
1028 for method in ("sendUDPQuery", "sendTCPQuery"):
1029 sender = getattr(self, method)
1030 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1031 self.assertTrue(receivedResponse)
1032 self.assertEqual(expectedResponse, receivedResponse)