]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #12776 from jacobbunk/tsig-qtype
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(makeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{makeRule("rawchaos.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT), QClassRule(DNSClass.CHAOS)}, SpoofRawAction("\\005chaos"))
20 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
21 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
22 newServer{address="127.0.0.1:%s"}
23 """
24
25 def testSpoofActionA(self):
26 """
27 Spoofing: Spoof A via Action
28
29 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
30 check that dnsdist sends a spoofed result.
31 """
32 name = 'spoofaction.spoofing.tests.powerdns.com.'
33 query = dns.message.make_query(name, 'A', 'IN')
34 # dnsdist set RA = RD for spoofed responses
35 query.flags &= ~dns.flags.RD
36 expectedResponse = dns.message.make_response(query)
37 rrset = dns.rrset.from_text(name,
38 60,
39 dns.rdataclass.IN,
40 dns.rdatatype.A,
41 '192.0.2.1')
42 expectedResponse.answer.append(rrset)
43
44 for method in ("sendUDPQuery", "sendTCPQuery"):
45 sender = getattr(self, method)
46 (_, receivedResponse) = sender(query, response=None, useQueue=False)
47 self.assertTrue(receivedResponse)
48 self.assertEqual(expectedResponse, receivedResponse)
49
50 def testSpoofActionAAAA(self):
51 """
52 Spoofing: Spoof AAAA via Action
53
54 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
55 check that dnsdist sends a spoofed result.
56 """
57 name = 'spoofaction.spoofing.tests.powerdns.com.'
58 query = dns.message.make_query(name, 'AAAA', 'IN')
59 # dnsdist set RA = RD for spoofed responses
60 query.flags &= ~dns.flags.RD
61 expectedResponse = dns.message.make_response(query)
62 rrset = dns.rrset.from_text(name,
63 60,
64 dns.rdataclass.IN,
65 dns.rdatatype.AAAA,
66 '2001:DB8::1')
67 expectedResponse.answer.append(rrset)
68
69 for method in ("sendUDPQuery", "sendTCPQuery"):
70 sender = getattr(self, method)
71 (_, receivedResponse) = sender(query, response=None, useQueue=False)
72 self.assertTrue(receivedResponse)
73 self.assertEqual(expectedResponse, receivedResponse)
74
75 def testSpoofActionCNAME(self):
76 """
77 Spoofing: Spoof CNAME via Action
78
79 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
80 check that dnsdist sends a spoofed result.
81 """
82 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
83 query = dns.message.make_query(name, 'A', 'IN')
84 # dnsdist set RA = RD for spoofed responses
85 query.flags &= ~dns.flags.RD
86 expectedResponse = dns.message.make_response(query)
87 rrset = dns.rrset.from_text(name,
88 60,
89 dns.rdataclass.IN,
90 dns.rdatatype.CNAME,
91 'cnameaction.spoofing.tests.powerdns.com.')
92 expectedResponse.answer.append(rrset)
93
94 for method in ("sendUDPQuery", "sendTCPQuery"):
95 sender = getattr(self, method)
96 (_, receivedResponse) = sender(query, response=None, useQueue=False)
97 self.assertTrue(receivedResponse)
98 self.assertEqual(expectedResponse, receivedResponse)
99
100 def testSpoofActionMultiA(self):
101 """
102 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
103
104 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
105 check that dnsdist sends a spoofed result.
106 """
107 name = 'multispoof.spoofing.tests.powerdns.com.'
108 query = dns.message.make_query(name, 'A', 'IN')
109 # dnsdist set RA = RD for spoofed responses
110 query.flags &= ~dns.flags.RD
111 expectedResponse = dns.message.make_response(query)
112 rrset = dns.rrset.from_text(name,
113 60,
114 dns.rdataclass.IN,
115 dns.rdatatype.A,
116 '192.0.2.2', '192.0.2.1')
117 expectedResponse.answer.append(rrset)
118
119 for method in ("sendUDPQuery", "sendTCPQuery"):
120 sender = getattr(self, method)
121 (_, receivedResponse) = sender(query, response=None, useQueue=False)
122 self.assertTrue(receivedResponse)
123 self.assertEqual(expectedResponse, receivedResponse)
124
125 def testSpoofActionMultiAAAA(self):
126 """
127 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
128
129 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
130 check that dnsdist sends a spoofed result.
131 """
132 name = 'multispoof.spoofing.tests.powerdns.com.'
133 query = dns.message.make_query(name, 'AAAA', 'IN')
134 # dnsdist set RA = RD for spoofed responses
135 query.flags &= ~dns.flags.RD
136 expectedResponse = dns.message.make_response(query)
137 rrset = dns.rrset.from_text(name,
138 60,
139 dns.rdataclass.IN,
140 dns.rdatatype.AAAA,
141 '2001:DB8::1', '2001:DB8::2')
142 expectedResponse.answer.append(rrset)
143
144 for method in ("sendUDPQuery", "sendTCPQuery"):
145 sender = getattr(self, method)
146 (_, receivedResponse) = sender(query, response=None, useQueue=False)
147 self.assertTrue(receivedResponse)
148 self.assertEqual(expectedResponse, receivedResponse)
149
150 def testSpoofActionMultiANY(self):
151 """
152 Spoofing: Spoof multiple addresses via AddDomainSpoof
153
154 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
155 check that dnsdist sends a spoofed result.
156 """
157 name = 'multispoof.spoofing.tests.powerdns.com.'
158 query = dns.message.make_query(name, 'ANY', 'IN')
159 # dnsdist set RA = RD for spoofed responses
160 query.flags &= ~dns.flags.RD
161 expectedResponse = dns.message.make_response(query)
162
163 rrset = dns.rrset.from_text(name,
164 60,
165 dns.rdataclass.IN,
166 dns.rdatatype.A,
167 '192.0.2.2', '192.0.2.1')
168 expectedResponse.answer.append(rrset)
169
170 rrset = dns.rrset.from_text(name,
171 60,
172 dns.rdataclass.IN,
173 dns.rdatatype.AAAA,
174 '2001:DB8::1', '2001:DB8::2')
175 expectedResponse.answer.append(rrset)
176
177 for method in ("sendUDPQuery", "sendTCPQuery"):
178 sender = getattr(self, method)
179 (_, receivedResponse) = sender(query, response=None, useQueue=False)
180 self.assertTrue(receivedResponse)
181 self.assertEqual(expectedResponse, receivedResponse)
182
183 def testSpoofActionSetAA(self):
184 """
185 Spoofing: Spoof via Action, setting AA=1
186 """
187 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
188 query = dns.message.make_query(name, 'AAAA', 'IN')
189 # dnsdist set RA = RD for spoofed responses
190 query.flags &= ~dns.flags.RD
191 expectedResponse = dns.message.make_response(query)
192 expectedResponse.flags |= dns.flags.AA
193 rrset = dns.rrset.from_text(name,
194 60,
195 dns.rdataclass.IN,
196 dns.rdatatype.AAAA,
197 '2001:DB8::1')
198 expectedResponse.answer.append(rrset)
199
200 for method in ("sendUDPQuery", "sendTCPQuery"):
201 sender = getattr(self, method)
202 (_, receivedResponse) = sender(query, response=None, useQueue=False)
203 self.assertTrue(receivedResponse)
204 self.assertEqual(expectedResponse, receivedResponse)
205 self.assertEqual(receivedResponse.answer[0].ttl, 60)
206
207 def testSpoofActionSetAD(self):
208 """
209 Spoofing: Spoof via Action, setting AD=1
210 """
211 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
212 query = dns.message.make_query(name, 'AAAA', 'IN')
213 # dnsdist set RA = RD for spoofed responses
214 query.flags &= ~dns.flags.RD
215 expectedResponse = dns.message.make_response(query)
216 expectedResponse.flags |= dns.flags.AD
217 rrset = dns.rrset.from_text(name,
218 60,
219 dns.rdataclass.IN,
220 dns.rdatatype.AAAA,
221 '2001:DB8::1')
222 expectedResponse.answer.append(rrset)
223
224 for method in ("sendUDPQuery", "sendTCPQuery"):
225 sender = getattr(self, method)
226 (_, receivedResponse) = sender(query, response=None, useQueue=False)
227 self.assertTrue(receivedResponse)
228 self.assertEqual(expectedResponse, receivedResponse)
229 self.assertEqual(receivedResponse.answer[0].ttl, 60)
230
231 def testSpoofActionSetRA(self):
232 """
233 Spoofing: Spoof via Action, setting RA=1
234 """
235 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
236 query = dns.message.make_query(name, 'AAAA', 'IN')
237 # dnsdist set RA = RD for spoofed responses
238 query.flags &= ~dns.flags.RD
239 expectedResponse = dns.message.make_response(query)
240 expectedResponse.flags |= dns.flags.RA
241 rrset = dns.rrset.from_text(name,
242 60,
243 dns.rdataclass.IN,
244 dns.rdatatype.AAAA,
245 '2001:DB8::1')
246 expectedResponse.answer.append(rrset)
247
248 for method in ("sendUDPQuery", "sendTCPQuery"):
249 sender = getattr(self, method)
250 (_, receivedResponse) = sender(query, response=None, useQueue=False)
251 self.assertTrue(receivedResponse)
252 self.assertEqual(expectedResponse, receivedResponse)
253 self.assertEqual(receivedResponse.answer[0].ttl, 60)
254
255 def testSpoofActionSetNoRA(self):
256 """
257 Spoofing: Spoof via Action, setting RA=0
258 """
259 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
260 query = dns.message.make_query(name, 'AAAA', 'IN')
261 expectedResponse = dns.message.make_response(query)
262 expectedResponse.flags &= ~dns.flags.RA
263 rrset = dns.rrset.from_text(name,
264 60,
265 dns.rdataclass.IN,
266 dns.rdatatype.AAAA,
267 '2001:DB8::1')
268 expectedResponse.answer.append(rrset)
269
270 for method in ("sendUDPQuery", "sendTCPQuery"):
271 sender = getattr(self, method)
272 (_, receivedResponse) = sender(query, response=None, useQueue=False)
273 self.assertTrue(receivedResponse)
274 self.assertEqual(expectedResponse, receivedResponse)
275 self.assertEqual(receivedResponse.answer[0].ttl, 60)
276
277 def testSpoofActionSetTTL(self):
278 """
279 Spoofing: Spoof via Action, setting the TTL to 1500
280 """
281 name = 'spoofaction-ttl.spoofing.tests.powerdns.com.'
282 query = dns.message.make_query(name, 'AAAA', 'IN')
283 expectedResponse = dns.message.make_response(query)
284 expectedResponse.flags |= dns.flags.RA
285 rrset = dns.rrset.from_text(name,
286 60,
287 dns.rdataclass.IN,
288 dns.rdatatype.AAAA,
289 '2001:DB8::1')
290 expectedResponse.answer.append(rrset)
291
292 for method in ("sendUDPQuery", "sendTCPQuery"):
293 sender = getattr(self, method)
294 (_, receivedResponse) = sender(query, response=None, useQueue=False)
295 self.assertTrue(receivedResponse)
296 self.assertEqual(expectedResponse, receivedResponse)
297 self.assertEqual(receivedResponse.answer[0].ttl, 1500)
298
299 def testSpoofRawAction(self):
300 """
301 Spoofing: Spoof a response from raw bytes
302 """
303 name = 'raw.spoofing.tests.powerdns.com.'
304
305 # A
306 query = dns.message.make_query(name, 'A', 'IN')
307 query.flags &= ~dns.flags.RD
308 expectedResponse = dns.message.make_response(query)
309 expectedResponse.flags &= ~dns.flags.AA
310 rrset = dns.rrset.from_text(name,
311 60,
312 dns.rdataclass.IN,
313 dns.rdatatype.A,
314 '192.0.2.1')
315 expectedResponse.answer.append(rrset)
316
317 for method in ("sendUDPQuery", "sendTCPQuery"):
318 sender = getattr(self, method)
319 (_, receivedResponse) = sender(query, response=None, useQueue=False)
320 self.assertTrue(receivedResponse)
321 self.assertEqual(expectedResponse, receivedResponse)
322 self.assertEqual(receivedResponse.answer[0].ttl, 60)
323
324 # TXT
325 query = dns.message.make_query(name, 'TXT', 'IN')
326 query.flags &= ~dns.flags.RD
327 expectedResponse = dns.message.make_response(query)
328 expectedResponse.flags &= ~dns.flags.AA
329 rrset = dns.rrset.from_text(name,
330 60,
331 dns.rdataclass.IN,
332 dns.rdatatype.TXT,
333 '"aaa" "bbbb" "ccccccccccc"')
334 expectedResponse.answer.append(rrset)
335
336 for method in ("sendUDPQuery", "sendTCPQuery"):
337 sender = getattr(self, method)
338 (_, receivedResponse) = sender(query, response=None, useQueue=False)
339 self.assertTrue(receivedResponse)
340 self.assertEqual(expectedResponse, receivedResponse)
341 self.assertEqual(receivedResponse.answer[0].ttl, 60)
342
343 # SRV
344 query = dns.message.make_query(name, 'SRV', 'IN')
345 query.flags &= ~dns.flags.RD
346 expectedResponse = dns.message.make_response(query)
347 # this one should have the AA flag set
348 expectedResponse.flags |= dns.flags.AA
349 rrset = dns.rrset.from_text(name,
350 3600,
351 dns.rdataclass.IN,
352 dns.rdatatype.SRV,
353 '0 0 65535 srv.powerdns.com.')
354 expectedResponse.answer.append(rrset)
355
356 for method in ("sendUDPQuery", "sendTCPQuery"):
357 sender = getattr(self, method)
358 (_, receivedResponse) = sender(query, response=None, useQueue=False)
359 self.assertTrue(receivedResponse)
360 self.assertEqual(expectedResponse, receivedResponse)
361 self.assertEqual(receivedResponse.answer[0].ttl, 3600)
362
363 def testSpoofRawChaosAction(self):
364 """
365 Spoofing: Spoof a response from several raw bytes in QCLass CH
366 """
367 name = 'rawchaos.spoofing.tests.powerdns.com.'
368
369 # TXT CH
370 query = dns.message.make_query(name, 'TXT', 'CH')
371 query.flags &= ~dns.flags.RD
372 expectedResponse = dns.message.make_response(query)
373 expectedResponse.flags &= ~dns.flags.AA
374 rrset = dns.rrset.from_text(name,
375 60,
376 dns.rdataclass.CH,
377 dns.rdatatype.TXT,
378 '"chaos"')
379 expectedResponse.answer.append(rrset)
380
381 for method in ("sendUDPQuery", "sendTCPQuery"):
382 sender = getattr(self, method)
383 (_, receivedResponse) = sender(query, response=None, useQueue=False)
384 self.assertTrue(receivedResponse)
385 self.assertEqual(expectedResponse, receivedResponse)
386 self.assertEqual(receivedResponse.answer[0].ttl, 60)
387
388
389 def testSpoofRawActionMulti(self):
390 """
391 Spoofing: Spoof a response from several raw bytes
392 """
393 name = 'multiraw.spoofing.tests.powerdns.com.'
394
395 # A
396 query = dns.message.make_query(name, 'A', 'IN')
397 query.flags &= ~dns.flags.RD
398 expectedResponse = dns.message.make_response(query)
399 expectedResponse.flags &= ~dns.flags.AA
400 rrset = dns.rrset.from_text(name,
401 60,
402 dns.rdataclass.IN,
403 dns.rdatatype.A,
404 '192.0.2.1', '192.0.2.2')
405 expectedResponse.answer.append(rrset)
406
407 for method in ("sendUDPQuery", "sendTCPQuery"):
408 sender = getattr(self, method)
409 (_, receivedResponse) = sender(query, response=None, useQueue=False)
410 self.assertTrue(receivedResponse)
411 self.assertEqual(expectedResponse, receivedResponse)
412 self.assertEqual(receivedResponse.answer[0].ttl, 60)
413
414 # TXT
415 query = dns.message.make_query(name, 'TXT', 'IN')
416 query.flags &= ~dns.flags.RD
417 expectedResponse = dns.message.make_response(query)
418 expectedResponse.flags &= ~dns.flags.AA
419 rrset = dns.rrset.from_text(name,
420 60,
421 dns.rdataclass.IN,
422 dns.rdatatype.TXT,
423 '"aaa" "bbbb"', '"ccccccccccc"')
424 expectedResponse.answer.append(rrset)
425
426 for method in ("sendUDPQuery", "sendTCPQuery"):
427 sender = getattr(self, method)
428 (_, receivedResponse) = sender(query, response=None, useQueue=False)
429 self.assertTrue(receivedResponse)
430 self.assertEqual(expectedResponse, receivedResponse)
431 self.assertEqual(receivedResponse.answer[0].ttl, 60)
432
433 class TestSpoofingLuaSpoof(DNSDistTest):
434
435 _config_template = """
436 function spoof1rule(dq)
437 if(dq.qtype==1) -- A
438 then
439 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
440 elseif(dq.qtype == 28) -- AAAA
441 then
442 return DNSAction.Spoof, "2001:DB8::1"
443 else
444 return DNSAction.None, ""
445 end
446 end
447
448 function spoof2rule(dq)
449 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
450 end
451
452 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
453 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
454
455 function spoofrawrule(dq)
456 if dq.qtype == DNSQType.A then
457 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
458 elseif dq.qtype == DNSQType.TXT then
459 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
460 elseif dq.qtype == DNSQType.SRV then
461 dq.dh:setAA(true)
462 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
463 end
464 return DNSAction.None, ""
465 end
466
467 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
468 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
469 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
470 newServer{address="127.0.0.1:%s"}
471 """
472
473 def testLuaSpoofA(self):
474 """
475 Spoofing: Spoofing an A via Lua
476
477 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
478 check that dnsdist sends a spoofed result.
479 """
480 name = 'luaspoof1.spoofing.tests.powerdns.com.'
481 query = dns.message.make_query(name, 'A', 'IN')
482 # dnsdist set RA = RD for spoofed responses
483 query.flags &= ~dns.flags.RD
484 expectedResponse = dns.message.make_response(query)
485 rrset = dns.rrset.from_text(name,
486 60,
487 dns.rdataclass.IN,
488 dns.rdatatype.A,
489 '192.0.2.1', '192.0.2.2')
490 expectedResponse.answer.append(rrset)
491
492 for method in ("sendUDPQuery", "sendTCPQuery"):
493 sender = getattr(self, method)
494 (_, receivedResponse) = sender(query, response=None, useQueue=False)
495 self.assertTrue(receivedResponse)
496 self.assertEqual(expectedResponse, receivedResponse)
497
498 def testLuaSpoofAAAA(self):
499 """
500 Spoofing: Spoofing an AAAA via Lua
501
502 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
503 check that dnsdist sends a spoofed result.
504 """
505 name = 'luaspoof1.spoofing.tests.powerdns.com.'
506 query = dns.message.make_query(name, 'AAAA', 'IN')
507 # dnsdist set RA = RD for spoofed responses
508 query.flags &= ~dns.flags.RD
509 expectedResponse = dns.message.make_response(query)
510 rrset = dns.rrset.from_text(name,
511 60,
512 dns.rdataclass.IN,
513 dns.rdatatype.AAAA,
514 '2001:DB8::1')
515 expectedResponse.answer.append(rrset)
516
517 for method in ("sendUDPQuery", "sendTCPQuery"):
518 sender = getattr(self, method)
519 (_, receivedResponse) = sender(query, response=None, useQueue=False)
520 self.assertTrue(receivedResponse)
521 self.assertEqual(expectedResponse, receivedResponse)
522
523 def testLuaSpoofAWithCNAME(self):
524 """
525 Spoofing: Spoofing an A with a CNAME via Lua
526
527 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
528 check that dnsdist sends a spoofed result.
529 """
530 name = 'luaspoof2.spoofing.tests.powerdns.com.'
531 query = dns.message.make_query(name, 'A', 'IN')
532 # dnsdist set RA = RD for spoofed responses
533 query.flags &= ~dns.flags.RD
534 expectedResponse = dns.message.make_response(query)
535 rrset = dns.rrset.from_text(name,
536 60,
537 dns.rdataclass.IN,
538 dns.rdatatype.CNAME,
539 'spoofedcname.spoofing.tests.powerdns.com.')
540 expectedResponse.answer.append(rrset)
541
542 for method in ("sendUDPQuery", "sendTCPQuery"):
543 sender = getattr(self, method)
544 (_, receivedResponse) = sender(query, response=None, useQueue=False)
545 self.assertTrue(receivedResponse)
546 self.assertEqual(expectedResponse, receivedResponse)
547
548 def testLuaSpoofAAAAWithCNAME(self):
549 """
550 Spoofing: Spoofing an AAAA with a CNAME via Lua
551
552 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
553 check that dnsdist sends a spoofed result.
554 """
555 name = 'luaspoof2.spoofing.tests.powerdns.com.'
556 query = dns.message.make_query(name, 'AAAA', 'IN')
557 # dnsdist set RA = RD for spoofed responses
558 query.flags &= ~dns.flags.RD
559 expectedResponse = dns.message.make_response(query)
560 rrset = dns.rrset.from_text(name,
561 60,
562 dns.rdataclass.IN,
563 dns.rdatatype.CNAME,
564 'spoofedcname.spoofing.tests.powerdns.com.')
565 expectedResponse.answer.append(rrset)
566
567 for method in ("sendUDPQuery", "sendTCPQuery"):
568 sender = getattr(self, method)
569 (_, receivedResponse) = sender(query, response=None, useQueue=False)
570 self.assertTrue(receivedResponse)
571 self.assertEqual(expectedResponse, receivedResponse)
572
573 def testLuaSpoofRawAction(self):
574 """
575 Spoofing: Spoof a response from raw bytes via Lua
576 """
577 name = 'lua-raw.spoofing.tests.powerdns.com.'
578
579 # A
580 query = dns.message.make_query(name, 'A', 'IN')
581 query.flags &= ~dns.flags.RD
582 expectedResponse = dns.message.make_response(query)
583 expectedResponse.flags &= ~dns.flags.AA
584 rrset = dns.rrset.from_text(name,
585 60,
586 dns.rdataclass.IN,
587 dns.rdatatype.A,
588 '192.0.2.1')
589 expectedResponse.answer.append(rrset)
590
591 for method in ("sendUDPQuery", "sendTCPQuery"):
592 sender = getattr(self, method)
593 (_, receivedResponse) = sender(query, response=None, useQueue=False)
594 self.assertTrue(receivedResponse)
595 self.assertEqual(expectedResponse, receivedResponse)
596 self.assertEqual(receivedResponse.answer[0].ttl, 60)
597
598 # TXT
599 query = dns.message.make_query(name, 'TXT', 'IN')
600 query.flags &= ~dns.flags.RD
601 expectedResponse = dns.message.make_response(query)
602 expectedResponse.flags &= ~dns.flags.AA
603 rrset = dns.rrset.from_text(name,
604 60,
605 dns.rdataclass.IN,
606 dns.rdatatype.TXT,
607 '"aaa" "bbbb" "ccccccccccc"')
608 expectedResponse.answer.append(rrset)
609
610 for method in ("sendUDPQuery", "sendTCPQuery"):
611 sender = getattr(self, method)
612 (_, receivedResponse) = sender(query, response=None, useQueue=False)
613 self.assertTrue(receivedResponse)
614 self.assertEqual(expectedResponse, receivedResponse)
615 self.assertEqual(receivedResponse.answer[0].ttl, 60)
616
617 # SRV
618 query = dns.message.make_query(name, 'SRV', 'IN')
619 query.flags &= ~dns.flags.RD
620 expectedResponse = dns.message.make_response(query)
621 # this one should have the AA flag set
622 expectedResponse.flags |= dns.flags.AA
623 rrset = dns.rrset.from_text(name,
624 3600,
625 dns.rdataclass.IN,
626 dns.rdatatype.SRV,
627 '0 0 65535 srv.powerdns.com.')
628 expectedResponse.answer.append(rrset)
629
630 for method in ("sendUDPQuery", "sendTCPQuery"):
631 sender = getattr(self, method)
632 (_, receivedResponse) = sender(query, response=None, useQueue=False)
633 self.assertTrue(receivedResponse)
634 self.assertEqual(expectedResponse, receivedResponse)
635 # sorry, we can't set the TTL from the Lua API right now
636 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
637
638 class TestSpoofingLuaSpoofMulti(DNSDistTest):
639
640 _config_template = """
641 function spoof1multirule(dq)
642 if(dq.qtype==1) -- A
643 then
644 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
645 return DNSAction.HeaderModify
646 elseif(dq.qtype == 28) -- AAAA
647 then
648 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
649 return DNSAction.HeaderModify
650 else
651 return DNSAction.None, ""
652 end
653 end
654
655 function spoofrawmultirule(dq)
656 if dq.qtype == DNSQType.A then
657 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
658 return DNSAction.HeaderModify
659 elseif dq.qtype == DNSQType.TXT then
660 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
661 return DNSAction.HeaderModify
662 elseif dq.qtype == DNSQType.SRV then
663 dq.dh:setAA(true)
664 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
665 return DNSAction.HeaderModify
666 end
667 return DNSAction.None, ""
668 end
669
670 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
671 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
672 newServer{address="127.0.0.1:%s"}
673 """
674
675 def testLuaSpoofMultiA(self):
676 """
677 Spoofing: Spoofing multiple A via Lua dq:spoof
678
679 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
680 check that dnsdist sends a spoofed result.
681 """
682 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
683 query = dns.message.make_query(name, 'A', 'IN')
684 # dnsdist set RA = RD for spoofed responses
685 query.flags &= ~dns.flags.RD
686 expectedResponse = dns.message.make_response(query)
687 rrset = dns.rrset.from_text(name,
688 60,
689 dns.rdataclass.IN,
690 dns.rdatatype.A,
691 '192.0.2.1', '192.0.2.2')
692 expectedResponse.answer.append(rrset)
693
694 for method in ("sendUDPQuery", "sendTCPQuery"):
695 sender = getattr(self, method)
696 (_, receivedResponse) = sender(query, response=None, useQueue=False)
697 self.assertTrue(receivedResponse)
698 self.assertEqual(expectedResponse, receivedResponse)
699
700 def testLuaSpoofMultiAAAA(self):
701 """
702 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
703
704 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
705 check that dnsdist sends a spoofed result.
706 """
707 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
708 query = dns.message.make_query(name, 'AAAA', 'IN')
709 # dnsdist set RA = RD for spoofed responses
710 query.flags &= ~dns.flags.RD
711 expectedResponse = dns.message.make_response(query)
712 rrset = dns.rrset.from_text(name,
713 60,
714 dns.rdataclass.IN,
715 dns.rdatatype.AAAA,
716 '2001:DB8::1', '2001:DB8::2')
717 expectedResponse.answer.append(rrset)
718
719 for method in ("sendUDPQuery", "sendTCPQuery"):
720 sender = getattr(self, method)
721 (_, receivedResponse) = sender(query, response=None, useQueue=False)
722 self.assertTrue(receivedResponse)
723 self.assertEqual(expectedResponse, receivedResponse)
724
725 def testLuaSpoofMultiRawAction(self):
726 """
727 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
728 """
729 name = 'lua-raw-multi.spoofing.tests.powerdns.com.'
730
731 # A
732 query = dns.message.make_query(name, 'A', 'IN')
733 query.flags &= ~dns.flags.RD
734 expectedResponse = dns.message.make_response(query)
735 expectedResponse.flags &= ~dns.flags.AA
736 rrset = dns.rrset.from_text(name,
737 60,
738 dns.rdataclass.IN,
739 dns.rdatatype.A,
740 '192.0.2.1', '192.0.2.2')
741 expectedResponse.answer.append(rrset)
742
743 for method in ("sendUDPQuery", "sendTCPQuery"):
744 sender = getattr(self, method)
745 (_, receivedResponse) = sender(query, response=None, useQueue=False)
746 self.assertTrue(receivedResponse)
747 self.assertEqual(expectedResponse, receivedResponse)
748 self.assertEqual(receivedResponse.answer[0].ttl, 60)
749
750 # TXT
751 query = dns.message.make_query(name, 'TXT', 'IN')
752 query.flags &= ~dns.flags.RD
753 expectedResponse = dns.message.make_response(query)
754 expectedResponse.flags &= ~dns.flags.AA
755 rrset = dns.rrset.from_text(name,
756 60,
757 dns.rdataclass.IN,
758 dns.rdatatype.TXT,
759 '"aaa" "bbbb"', '"ccccccccccc"')
760 expectedResponse.answer.append(rrset)
761
762 for method in ("sendUDPQuery", "sendTCPQuery"):
763 sender = getattr(self, method)
764 (_, receivedResponse) = sender(query, response=None, useQueue=False)
765 self.assertTrue(receivedResponse)
766 self.assertEqual(expectedResponse, receivedResponse)
767 self.assertEqual(receivedResponse.answer[0].ttl, 60)
768
769 # SRV
770 query = dns.message.make_query(name, 'SRV', 'IN')
771 query.flags &= ~dns.flags.RD
772 expectedResponse = dns.message.make_response(query)
773 # this one should have the AA flag set
774 expectedResponse.flags |= dns.flags.AA
775 rrset = dns.rrset.from_text(name,
776 3600,
777 dns.rdataclass.IN,
778 dns.rdatatype.SRV,
779 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
780 expectedResponse.answer.append(rrset)
781
782 for method in ("sendUDPQuery", "sendTCPQuery"):
783 sender = getattr(self, method)
784 (_, receivedResponse) = sender(query, response=None, useQueue=False)
785 self.assertTrue(receivedResponse)
786 self.assertEqual(expectedResponse, receivedResponse)
787 # sorry, we can't set the TTL from the Lua API right now
788 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
789
790 class TestSpoofingLuaFFISpoofMulti(DNSDistTest):
791
792 _config_template = """
793 local ffi = require("ffi")
794
795 function spoofrawmultirule(dq)
796 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
797
798 if qtype == DNSQType.A then
799 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
800
801 local str = "\\192\\000\\002\\001"
802 records[0].size = #str
803 records[0].value = str
804
805 local str = "\\192\\000\\002\\255"
806 records[1].value = str
807 records[1].size = #str
808
809 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
810 return DNSAction.HeaderModify
811 elseif qtype == DNSQType.TXT then
812 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
813
814 local str = "\\033this text has a comma at the end,"
815 records[0].size = #str
816 records[0].value = str
817
818 local str = "\\003aaa\\004bbbb"
819 records[1].size = #str
820 records[1].value = str
821
822 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
823 return DNSAction.HeaderModify
824 end
825
826 return DNSAction.None, ""
827 end
828
829 addAction("lua-raw-multi.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofrawmultirule))
830 newServer{address="127.0.0.1:%s"}
831 """
832 _verboseMode = True
833
834 def testLuaSpoofMultiRawAction(self):
835 """
836 Spoofing via Lua FFI: Spoof responses from raw bytes via Lua FFI
837 """
838 name = 'lua-raw-multi.ffi-spoofing.tests.powerdns.com.'
839
840 # A
841 query = dns.message.make_query(name, 'A', 'IN')
842 query.flags &= ~dns.flags.RD
843 expectedResponse = dns.message.make_response(query)
844 expectedResponse.flags &= ~dns.flags.AA
845 rrset = dns.rrset.from_text(name,
846 60,
847 dns.rdataclass.IN,
848 dns.rdatatype.A,
849 '192.0.2.1', '192.0.2.255')
850 expectedResponse.answer.append(rrset)
851
852 for method in ("sendUDPQuery", "sendTCPQuery"):
853 sender = getattr(self, method)
854 (_, receivedResponse) = sender(query, response=None, useQueue=False)
855 self.assertTrue(receivedResponse)
856 self.assertEqual(expectedResponse, receivedResponse)
857 self.assertEqual(receivedResponse.answer[0].ttl, 60)
858
859 # TXT
860 query = dns.message.make_query(name, 'TXT', 'IN')
861 query.flags &= ~dns.flags.RD
862 expectedResponse = dns.message.make_response(query)
863 expectedResponse.flags &= ~dns.flags.AA
864 rrset = dns.rrset.from_text(name,
865 60,
866 dns.rdataclass.IN,
867 dns.rdatatype.TXT,
868 '"this text has a comma at the end,"', '"aaa" "bbbb"')
869 expectedResponse.answer.append(rrset)
870
871 for method in ("sendUDPQuery", "sendTCPQuery"):
872 sender = getattr(self, method)
873 (_, receivedResponse) = sender(query, response=None, useQueue=False)
874 self.assertTrue(receivedResponse)
875 self.assertEqual(expectedResponse, receivedResponse)
876 self.assertEqual(receivedResponse.answer[0].ttl, 60)
877
878 class TestSpoofingLuaWithStatistics(DNSDistTest):
879
880 _config_template = """
881 function spoof1rule(dq)
882 queriesCount = getStatisticsCounters()['queries']
883 if(queriesCount == 1) then
884 return DNSAction.Spoof, "192.0.2.1"
885 elseif(queriesCount == 2) then
886 return DNSAction.Spoof, "192.0.2.2"
887 else
888 return DNSAction.Spoof, "192.0.2.0"
889 end
890 end
891 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
892 newServer{address="127.0.0.1:%s"}
893 """
894
895 def testLuaSpoofBasedOnStatistics(self):
896 """
897 Spoofing: Spoofing an A via Lua based on statistics counters
898
899 """
900 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
901 query = dns.message.make_query(name, 'A', 'IN')
902 # dnsdist set RA = RD for spoofed responses
903 query.flags &= ~dns.flags.RD
904 expectedResponse1 = dns.message.make_response(query)
905 rrset = dns.rrset.from_text(name,
906 60,
907 dns.rdataclass.IN,
908 dns.rdatatype.A,
909 '192.0.2.1')
910 expectedResponse1.answer.append(rrset)
911 expectedResponse2 = dns.message.make_response(query)
912 rrset = dns.rrset.from_text(name,
913 60,
914 dns.rdataclass.IN,
915 dns.rdatatype.A,
916 '192.0.2.2')
917 expectedResponse2.answer.append(rrset)
918 expectedResponseAfterwards = dns.message.make_response(query)
919 rrset = dns.rrset.from_text(name,
920 60,
921 dns.rdataclass.IN,
922 dns.rdatatype.A,
923 '192.0.2.0')
924 expectedResponseAfterwards.answer.append(rrset)
925
926 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
927 self.assertTrue(receivedResponse)
928 self.assertEqual(expectedResponse1, receivedResponse)
929
930 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
931 self.assertTrue(receivedResponse)
932 self.assertEqual(expectedResponse2, receivedResponse)
933
934 for method in ("sendUDPQuery", "sendTCPQuery"):
935 sender = getattr(self, method)
936 (_, receivedResponse) = sender(query, response=None, useQueue=False)
937 self.assertTrue(receivedResponse)
938 self.assertEqual(expectedResponseAfterwards, receivedResponse)
939
940 class TestSpoofingLuaSpoofPacket(DNSDistTest):
941
942 _config_template = """
943
944 function spoofpacket(dq)
945 if dq.qtype == DNSQType.A then
946 return DNSAction.SpoofPacket, "\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
947 end
948 return DNSAction.None, ""
949 end
950
951 addAction("lua-raw-packet.spoofing.tests.powerdns.com.", LuaAction(spoofpacket))
952 local rawResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\019rule\\045lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
953 addAction(AndRule{QTypeRule(DNSQType.A), makeRule("rule-lua-raw-packet.spoofing.tests.powerdns.com.")}, SpoofPacketAction(rawResponse, string.len(rawResponse)))
954
955 local ffi = require("ffi")
956
957 function spoofpacketffi(dq)
958 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
959 if qtype == DNSQType.A then
960 -- REFUSED answer
961 local refusedResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\012ffi\\045spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
962
963 ffi.C.dnsdist_ffi_dnsquestion_spoof_packet(dq, refusedResponse, string.len(refusedResponse))
964 return DNSAction.HeaderModify
965 end
966 return DNSAction.None, ""
967 end
968
969 addAction("lua-raw-packet.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofpacketffi))
970 newServer{address="127.0.0.1:%s"}
971 """
972 _verboseMode = True
973
974 def testLuaSpoofPacket(self):
975 """
976 Spoofing via Lua FFI: Spoof raw response via Lua FFI
977 """
978 for name in ('lua-raw-packet.spoofing.tests.powerdns.com.', 'rule-lua-raw-packet.spoofing.tests.powerdns.com.'):
979
980 query = dns.message.make_query(name, 'A', 'IN')
981 expectedResponse = dns.message.make_response(query)
982 expectedResponse.flags |= dns.flags.RA
983 expectedResponse.set_rcode(dns.rcode.REFUSED)
984
985 for method in ("sendUDPQuery", "sendTCPQuery"):
986 sender = getattr(self, method)
987 (_, receivedResponse) = sender(query, response=None, useQueue=False)
988 self.assertTrue(receivedResponse)
989 self.assertEqual(expectedResponse, receivedResponse)
990
991 def testLuaFFISpoofPacket(self):
992 """
993 Spoofing via Lua FFI: Spoof raw response via Lua FFI
994 """
995 name = 'lua-raw-packet.ffi-spoofing.tests.powerdns.com.'
996
997 #
998 query = dns.message.make_query(name, 'A', 'IN')
999 expectedResponse = dns.message.make_response(query)
1000 expectedResponse.flags |= dns.flags.RA
1001 expectedResponse.set_rcode(dns.rcode.REFUSED)
1002
1003 for method in ("sendUDPQuery", "sendTCPQuery"):
1004 sender = getattr(self, method)
1005 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1006 self.assertTrue(receivedResponse)
1007 self.assertEqual(expectedResponse, receivedResponse)