]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #10245 from omoerbeek/qclass
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(makeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
20 addAction(AndRule{makeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
21 newServer{address="127.0.0.1:%s"}
22 """
23
24 def testSpoofActionA(self):
25 """
26 Spoofing: Spoof A via Action
27
28 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
29 check that dnsdist sends a spoofed result.
30 """
31 name = 'spoofaction.spoofing.tests.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 # dnsdist set RA = RD for spoofed responses
34 query.flags &= ~dns.flags.RD
35 expectedResponse = dns.message.make_response(query)
36 rrset = dns.rrset.from_text(name,
37 60,
38 dns.rdataclass.IN,
39 dns.rdatatype.A,
40 '192.0.2.1')
41 expectedResponse.answer.append(rrset)
42
43 for method in ("sendUDPQuery", "sendTCPQuery"):
44 sender = getattr(self, method)
45 (_, receivedResponse) = sender(query, response=None, useQueue=False)
46 self.assertTrue(receivedResponse)
47 self.assertEqual(expectedResponse, receivedResponse)
48
49 def testSpoofActionAAAA(self):
50 """
51 Spoofing: Spoof AAAA via Action
52
53 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
54 check that dnsdist sends a spoofed result.
55 """
56 name = 'spoofaction.spoofing.tests.powerdns.com.'
57 query = dns.message.make_query(name, 'AAAA', 'IN')
58 # dnsdist set RA = RD for spoofed responses
59 query.flags &= ~dns.flags.RD
60 expectedResponse = dns.message.make_response(query)
61 rrset = dns.rrset.from_text(name,
62 60,
63 dns.rdataclass.IN,
64 dns.rdatatype.AAAA,
65 '2001:DB8::1')
66 expectedResponse.answer.append(rrset)
67
68 for method in ("sendUDPQuery", "sendTCPQuery"):
69 sender = getattr(self, method)
70 (_, receivedResponse) = sender(query, response=None, useQueue=False)
71 self.assertTrue(receivedResponse)
72 self.assertEqual(expectedResponse, receivedResponse)
73
74 def testSpoofActionCNAME(self):
75 """
76 Spoofing: Spoof CNAME via Action
77
78 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
79 check that dnsdist sends a spoofed result.
80 """
81 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
82 query = dns.message.make_query(name, 'A', 'IN')
83 # dnsdist set RA = RD for spoofed responses
84 query.flags &= ~dns.flags.RD
85 expectedResponse = dns.message.make_response(query)
86 rrset = dns.rrset.from_text(name,
87 60,
88 dns.rdataclass.IN,
89 dns.rdatatype.CNAME,
90 'cnameaction.spoofing.tests.powerdns.com.')
91 expectedResponse.answer.append(rrset)
92
93 for method in ("sendUDPQuery", "sendTCPQuery"):
94 sender = getattr(self, method)
95 (_, receivedResponse) = sender(query, response=None, useQueue=False)
96 self.assertTrue(receivedResponse)
97 self.assertEqual(expectedResponse, receivedResponse)
98
99 def testSpoofActionMultiA(self):
100 """
101 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
102
103 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
104 check that dnsdist sends a spoofed result.
105 """
106 name = 'multispoof.spoofing.tests.powerdns.com.'
107 query = dns.message.make_query(name, 'A', 'IN')
108 # dnsdist set RA = RD for spoofed responses
109 query.flags &= ~dns.flags.RD
110 expectedResponse = dns.message.make_response(query)
111 rrset = dns.rrset.from_text(name,
112 60,
113 dns.rdataclass.IN,
114 dns.rdatatype.A,
115 '192.0.2.2', '192.0.2.1')
116 expectedResponse.answer.append(rrset)
117
118 for method in ("sendUDPQuery", "sendTCPQuery"):
119 sender = getattr(self, method)
120 (_, receivedResponse) = sender(query, response=None, useQueue=False)
121 self.assertTrue(receivedResponse)
122 self.assertEqual(expectedResponse, receivedResponse)
123
124 def testSpoofActionMultiAAAA(self):
125 """
126 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
127
128 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
129 check that dnsdist sends a spoofed result.
130 """
131 name = 'multispoof.spoofing.tests.powerdns.com.'
132 query = dns.message.make_query(name, 'AAAA', 'IN')
133 # dnsdist set RA = RD for spoofed responses
134 query.flags &= ~dns.flags.RD
135 expectedResponse = dns.message.make_response(query)
136 rrset = dns.rrset.from_text(name,
137 60,
138 dns.rdataclass.IN,
139 dns.rdatatype.AAAA,
140 '2001:DB8::1', '2001:DB8::2')
141 expectedResponse.answer.append(rrset)
142
143 for method in ("sendUDPQuery", "sendTCPQuery"):
144 sender = getattr(self, method)
145 (_, receivedResponse) = sender(query, response=None, useQueue=False)
146 self.assertTrue(receivedResponse)
147 self.assertEqual(expectedResponse, receivedResponse)
148
149 def testSpoofActionMultiANY(self):
150 """
151 Spoofing: Spoof multiple addresses via AddDomainSpoof
152
153 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
154 check that dnsdist sends a spoofed result.
155 """
156 name = 'multispoof.spoofing.tests.powerdns.com.'
157 query = dns.message.make_query(name, 'ANY', 'IN')
158 # dnsdist set RA = RD for spoofed responses
159 query.flags &= ~dns.flags.RD
160 expectedResponse = dns.message.make_response(query)
161
162 rrset = dns.rrset.from_text(name,
163 60,
164 dns.rdataclass.IN,
165 dns.rdatatype.A,
166 '192.0.2.2', '192.0.2.1')
167 expectedResponse.answer.append(rrset)
168
169 rrset = dns.rrset.from_text(name,
170 60,
171 dns.rdataclass.IN,
172 dns.rdatatype.AAAA,
173 '2001:DB8::1', '2001:DB8::2')
174 expectedResponse.answer.append(rrset)
175
176 for method in ("sendUDPQuery", "sendTCPQuery"):
177 sender = getattr(self, method)
178 (_, receivedResponse) = sender(query, response=None, useQueue=False)
179 self.assertTrue(receivedResponse)
180 self.assertEqual(expectedResponse, receivedResponse)
181
182 def testSpoofActionSetAA(self):
183 """
184 Spoofing: Spoof via Action, setting AA=1
185 """
186 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
187 query = dns.message.make_query(name, 'AAAA', 'IN')
188 # dnsdist set RA = RD for spoofed responses
189 query.flags &= ~dns.flags.RD
190 expectedResponse = dns.message.make_response(query)
191 expectedResponse.flags |= dns.flags.AA
192 rrset = dns.rrset.from_text(name,
193 60,
194 dns.rdataclass.IN,
195 dns.rdatatype.AAAA,
196 '2001:DB8::1')
197 expectedResponse.answer.append(rrset)
198
199 for method in ("sendUDPQuery", "sendTCPQuery"):
200 sender = getattr(self, method)
201 (_, receivedResponse) = sender(query, response=None, useQueue=False)
202 self.assertTrue(receivedResponse)
203 self.assertEqual(expectedResponse, receivedResponse)
204 self.assertEqual(receivedResponse.answer[0].ttl, 60)
205
206 def testSpoofActionSetAD(self):
207 """
208 Spoofing: Spoof via Action, setting AD=1
209 """
210 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
211 query = dns.message.make_query(name, 'AAAA', 'IN')
212 # dnsdist set RA = RD for spoofed responses
213 query.flags &= ~dns.flags.RD
214 expectedResponse = dns.message.make_response(query)
215 expectedResponse.flags |= dns.flags.AD
216 rrset = dns.rrset.from_text(name,
217 60,
218 dns.rdataclass.IN,
219 dns.rdatatype.AAAA,
220 '2001:DB8::1')
221 expectedResponse.answer.append(rrset)
222
223 for method in ("sendUDPQuery", "sendTCPQuery"):
224 sender = getattr(self, method)
225 (_, receivedResponse) = sender(query, response=None, useQueue=False)
226 self.assertTrue(receivedResponse)
227 self.assertEqual(expectedResponse, receivedResponse)
228 self.assertEqual(receivedResponse.answer[0].ttl, 60)
229
230 def testSpoofActionSetRA(self):
231 """
232 Spoofing: Spoof via Action, setting RA=1
233 """
234 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
235 query = dns.message.make_query(name, 'AAAA', 'IN')
236 # dnsdist set RA = RD for spoofed responses
237 query.flags &= ~dns.flags.RD
238 expectedResponse = dns.message.make_response(query)
239 expectedResponse.flags |= dns.flags.RA
240 rrset = dns.rrset.from_text(name,
241 60,
242 dns.rdataclass.IN,
243 dns.rdatatype.AAAA,
244 '2001:DB8::1')
245 expectedResponse.answer.append(rrset)
246
247 for method in ("sendUDPQuery", "sendTCPQuery"):
248 sender = getattr(self, method)
249 (_, receivedResponse) = sender(query, response=None, useQueue=False)
250 self.assertTrue(receivedResponse)
251 self.assertEqual(expectedResponse, receivedResponse)
252 self.assertEqual(receivedResponse.answer[0].ttl, 60)
253
254 def testSpoofActionSetNoRA(self):
255 """
256 Spoofing: Spoof via Action, setting RA=0
257 """
258 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
259 query = dns.message.make_query(name, 'AAAA', 'IN')
260 expectedResponse = dns.message.make_response(query)
261 expectedResponse.flags &= ~dns.flags.RA
262 rrset = dns.rrset.from_text(name,
263 60,
264 dns.rdataclass.IN,
265 dns.rdatatype.AAAA,
266 '2001:DB8::1')
267 expectedResponse.answer.append(rrset)
268
269 for method in ("sendUDPQuery", "sendTCPQuery"):
270 sender = getattr(self, method)
271 (_, receivedResponse) = sender(query, response=None, useQueue=False)
272 self.assertTrue(receivedResponse)
273 self.assertEqual(expectedResponse, receivedResponse)
274 self.assertEqual(receivedResponse.answer[0].ttl, 60)
275
276 def testSpoofActionSetTTL(self):
277 """
278 Spoofing: Spoof via Action, setting the TTL to 1500
279 """
280 name = 'spoofaction-ttl.spoofing.tests.powerdns.com.'
281 query = dns.message.make_query(name, 'AAAA', 'IN')
282 expectedResponse = dns.message.make_response(query)
283 expectedResponse.flags |= dns.flags.RA
284 rrset = dns.rrset.from_text(name,
285 60,
286 dns.rdataclass.IN,
287 dns.rdatatype.AAAA,
288 '2001:DB8::1')
289 expectedResponse.answer.append(rrset)
290
291 for method in ("sendUDPQuery", "sendTCPQuery"):
292 sender = getattr(self, method)
293 (_, receivedResponse) = sender(query, response=None, useQueue=False)
294 self.assertTrue(receivedResponse)
295 self.assertEqual(expectedResponse, receivedResponse)
296 self.assertEqual(receivedResponse.answer[0].ttl, 1500)
297
298 def testSpoofRawAction(self):
299 """
300 Spoofing: Spoof a response from raw bytes
301 """
302 name = 'raw.spoofing.tests.powerdns.com.'
303
304 # A
305 query = dns.message.make_query(name, 'A', 'IN')
306 query.flags &= ~dns.flags.RD
307 expectedResponse = dns.message.make_response(query)
308 expectedResponse.flags &= ~dns.flags.AA
309 rrset = dns.rrset.from_text(name,
310 60,
311 dns.rdataclass.IN,
312 dns.rdatatype.A,
313 '192.0.2.1')
314 expectedResponse.answer.append(rrset)
315
316 for method in ("sendUDPQuery", "sendTCPQuery"):
317 sender = getattr(self, method)
318 (_, receivedResponse) = sender(query, response=None, useQueue=False)
319 self.assertTrue(receivedResponse)
320 self.assertEqual(expectedResponse, receivedResponse)
321 self.assertEqual(receivedResponse.answer[0].ttl, 60)
322
323 # TXT
324 query = dns.message.make_query(name, 'TXT', 'IN')
325 query.flags &= ~dns.flags.RD
326 expectedResponse = dns.message.make_response(query)
327 expectedResponse.flags &= ~dns.flags.AA
328 rrset = dns.rrset.from_text(name,
329 60,
330 dns.rdataclass.IN,
331 dns.rdatatype.TXT,
332 '"aaa" "bbbb" "ccccccccccc"')
333 expectedResponse.answer.append(rrset)
334
335 for method in ("sendUDPQuery", "sendTCPQuery"):
336 sender = getattr(self, method)
337 (_, receivedResponse) = sender(query, response=None, useQueue=False)
338 self.assertTrue(receivedResponse)
339 self.assertEqual(expectedResponse, receivedResponse)
340 self.assertEqual(receivedResponse.answer[0].ttl, 60)
341
342 # SRV
343 query = dns.message.make_query(name, 'SRV', 'IN')
344 query.flags &= ~dns.flags.RD
345 expectedResponse = dns.message.make_response(query)
346 # this one should have the AA flag set
347 expectedResponse.flags |= dns.flags.AA
348 rrset = dns.rrset.from_text(name,
349 3600,
350 dns.rdataclass.IN,
351 dns.rdatatype.SRV,
352 '0 0 65535 srv.powerdns.com.')
353 expectedResponse.answer.append(rrset)
354
355 for method in ("sendUDPQuery", "sendTCPQuery"):
356 sender = getattr(self, method)
357 (_, receivedResponse) = sender(query, response=None, useQueue=False)
358 self.assertTrue(receivedResponse)
359 self.assertEqual(expectedResponse, receivedResponse)
360 self.assertEqual(receivedResponse.answer[0].ttl, 3600)
361
362 def testSpoofRawActionMulti(self):
363 """
364 Spoofing: Spoof a response from several raw bytes
365 """
366 name = 'multiraw.spoofing.tests.powerdns.com.'
367
368 # A
369 query = dns.message.make_query(name, 'A', 'IN')
370 query.flags &= ~dns.flags.RD
371 expectedResponse = dns.message.make_response(query)
372 expectedResponse.flags &= ~dns.flags.AA
373 rrset = dns.rrset.from_text(name,
374 60,
375 dns.rdataclass.IN,
376 dns.rdatatype.A,
377 '192.0.2.1', '192.0.2.2')
378 expectedResponse.answer.append(rrset)
379
380 for method in ("sendUDPQuery", "sendTCPQuery"):
381 sender = getattr(self, method)
382 (_, receivedResponse) = sender(query, response=None, useQueue=False)
383 self.assertTrue(receivedResponse)
384 self.assertEqual(expectedResponse, receivedResponse)
385 self.assertEqual(receivedResponse.answer[0].ttl, 60)
386
387 # TXT
388 query = dns.message.make_query(name, 'TXT', 'IN')
389 query.flags &= ~dns.flags.RD
390 expectedResponse = dns.message.make_response(query)
391 expectedResponse.flags &= ~dns.flags.AA
392 rrset = dns.rrset.from_text(name,
393 60,
394 dns.rdataclass.IN,
395 dns.rdatatype.TXT,
396 '"aaa" "bbbb"', '"ccccccccccc"')
397 expectedResponse.answer.append(rrset)
398
399 for method in ("sendUDPQuery", "sendTCPQuery"):
400 sender = getattr(self, method)
401 (_, receivedResponse) = sender(query, response=None, useQueue=False)
402 self.assertTrue(receivedResponse)
403 self.assertEqual(expectedResponse, receivedResponse)
404 self.assertEqual(receivedResponse.answer[0].ttl, 60)
405
406 class TestSpoofingLuaSpoof(DNSDistTest):
407
408 _config_template = """
409 function spoof1rule(dq)
410 if(dq.qtype==1) -- A
411 then
412 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
413 elseif(dq.qtype == 28) -- AAAA
414 then
415 return DNSAction.Spoof, "2001:DB8::1"
416 else
417 return DNSAction.None, ""
418 end
419 end
420
421 function spoof2rule(dq)
422 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
423 end
424
425 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
426 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
427
428 function spoofrawrule(dq)
429 if dq.qtype == DNSQType.A then
430 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
431 elseif dq.qtype == DNSQType.TXT then
432 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
433 elseif dq.qtype == DNSQType.SRV then
434 dq.dh:setAA(true)
435 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
436 end
437 return DNSAction.None, ""
438 end
439
440 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
441 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
442 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
443 newServer{address="127.0.0.1:%s"}
444 """
445
446 def testLuaSpoofA(self):
447 """
448 Spoofing: Spoofing an A via Lua
449
450 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
451 check that dnsdist sends a spoofed result.
452 """
453 name = 'luaspoof1.spoofing.tests.powerdns.com.'
454 query = dns.message.make_query(name, 'A', 'IN')
455 # dnsdist set RA = RD for spoofed responses
456 query.flags &= ~dns.flags.RD
457 expectedResponse = dns.message.make_response(query)
458 rrset = dns.rrset.from_text(name,
459 60,
460 dns.rdataclass.IN,
461 dns.rdatatype.A,
462 '192.0.2.1', '192.0.2.2')
463 expectedResponse.answer.append(rrset)
464
465 for method in ("sendUDPQuery", "sendTCPQuery"):
466 sender = getattr(self, method)
467 (_, receivedResponse) = sender(query, response=None, useQueue=False)
468 self.assertTrue(receivedResponse)
469 self.assertEqual(expectedResponse, receivedResponse)
470
471 def testLuaSpoofAAAA(self):
472 """
473 Spoofing: Spoofing an AAAA via Lua
474
475 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
476 check that dnsdist sends a spoofed result.
477 """
478 name = 'luaspoof1.spoofing.tests.powerdns.com.'
479 query = dns.message.make_query(name, 'AAAA', 'IN')
480 # dnsdist set RA = RD for spoofed responses
481 query.flags &= ~dns.flags.RD
482 expectedResponse = dns.message.make_response(query)
483 rrset = dns.rrset.from_text(name,
484 60,
485 dns.rdataclass.IN,
486 dns.rdatatype.AAAA,
487 '2001:DB8::1')
488 expectedResponse.answer.append(rrset)
489
490 for method in ("sendUDPQuery", "sendTCPQuery"):
491 sender = getattr(self, method)
492 (_, receivedResponse) = sender(query, response=None, useQueue=False)
493 self.assertTrue(receivedResponse)
494 self.assertEqual(expectedResponse, receivedResponse)
495
496 def testLuaSpoofAWithCNAME(self):
497 """
498 Spoofing: Spoofing an A with a CNAME via Lua
499
500 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
501 check that dnsdist sends a spoofed result.
502 """
503 name = 'luaspoof2.spoofing.tests.powerdns.com.'
504 query = dns.message.make_query(name, 'A', 'IN')
505 # dnsdist set RA = RD for spoofed responses
506 query.flags &= ~dns.flags.RD
507 expectedResponse = dns.message.make_response(query)
508 rrset = dns.rrset.from_text(name,
509 60,
510 dns.rdataclass.IN,
511 dns.rdatatype.CNAME,
512 'spoofedcname.spoofing.tests.powerdns.com.')
513 expectedResponse.answer.append(rrset)
514
515 for method in ("sendUDPQuery", "sendTCPQuery"):
516 sender = getattr(self, method)
517 (_, receivedResponse) = sender(query, response=None, useQueue=False)
518 self.assertTrue(receivedResponse)
519 self.assertEqual(expectedResponse, receivedResponse)
520
521 def testLuaSpoofAAAAWithCNAME(self):
522 """
523 Spoofing: Spoofing an AAAA with a CNAME via Lua
524
525 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
526 check that dnsdist sends a spoofed result.
527 """
528 name = 'luaspoof2.spoofing.tests.powerdns.com.'
529 query = dns.message.make_query(name, 'AAAA', 'IN')
530 # dnsdist set RA = RD for spoofed responses
531 query.flags &= ~dns.flags.RD
532 expectedResponse = dns.message.make_response(query)
533 rrset = dns.rrset.from_text(name,
534 60,
535 dns.rdataclass.IN,
536 dns.rdatatype.CNAME,
537 'spoofedcname.spoofing.tests.powerdns.com.')
538 expectedResponse.answer.append(rrset)
539
540 for method in ("sendUDPQuery", "sendTCPQuery"):
541 sender = getattr(self, method)
542 (_, receivedResponse) = sender(query, response=None, useQueue=False)
543 self.assertTrue(receivedResponse)
544 self.assertEqual(expectedResponse, receivedResponse)
545
546 def testLuaSpoofRawAction(self):
547 """
548 Spoofing: Spoof a response from raw bytes via Lua
549 """
550 name = 'lua-raw.spoofing.tests.powerdns.com.'
551
552 # A
553 query = dns.message.make_query(name, 'A', 'IN')
554 query.flags &= ~dns.flags.RD
555 expectedResponse = dns.message.make_response(query)
556 expectedResponse.flags &= ~dns.flags.AA
557 rrset = dns.rrset.from_text(name,
558 60,
559 dns.rdataclass.IN,
560 dns.rdatatype.A,
561 '192.0.2.1')
562 expectedResponse.answer.append(rrset)
563
564 for method in ("sendUDPQuery", "sendTCPQuery"):
565 sender = getattr(self, method)
566 (_, receivedResponse) = sender(query, response=None, useQueue=False)
567 self.assertTrue(receivedResponse)
568 self.assertEqual(expectedResponse, receivedResponse)
569 self.assertEqual(receivedResponse.answer[0].ttl, 60)
570
571 # TXT
572 query = dns.message.make_query(name, 'TXT', 'IN')
573 query.flags &= ~dns.flags.RD
574 expectedResponse = dns.message.make_response(query)
575 expectedResponse.flags &= ~dns.flags.AA
576 rrset = dns.rrset.from_text(name,
577 60,
578 dns.rdataclass.IN,
579 dns.rdatatype.TXT,
580 '"aaa" "bbbb" "ccccccccccc"')
581 expectedResponse.answer.append(rrset)
582
583 for method in ("sendUDPQuery", "sendTCPQuery"):
584 sender = getattr(self, method)
585 (_, receivedResponse) = sender(query, response=None, useQueue=False)
586 self.assertTrue(receivedResponse)
587 self.assertEqual(expectedResponse, receivedResponse)
588 self.assertEqual(receivedResponse.answer[0].ttl, 60)
589
590 # SRV
591 query = dns.message.make_query(name, 'SRV', 'IN')
592 query.flags &= ~dns.flags.RD
593 expectedResponse = dns.message.make_response(query)
594 # this one should have the AA flag set
595 expectedResponse.flags |= dns.flags.AA
596 rrset = dns.rrset.from_text(name,
597 3600,
598 dns.rdataclass.IN,
599 dns.rdatatype.SRV,
600 '0 0 65535 srv.powerdns.com.')
601 expectedResponse.answer.append(rrset)
602
603 for method in ("sendUDPQuery", "sendTCPQuery"):
604 sender = getattr(self, method)
605 (_, receivedResponse) = sender(query, response=None, useQueue=False)
606 self.assertTrue(receivedResponse)
607 self.assertEqual(expectedResponse, receivedResponse)
608 # sorry, we can't set the TTL from the Lua API right now
609 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
610
611 class TestSpoofingLuaSpoofMulti(DNSDistTest):
612
613 _config_template = """
614 function spoof1multirule(dq)
615 if(dq.qtype==1) -- A
616 then
617 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
618 return DNSAction.HeaderModify
619 elseif(dq.qtype == 28) -- AAAA
620 then
621 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
622 return DNSAction.HeaderModify
623 else
624 return DNSAction.None, ""
625 end
626 end
627
628 function spoofrawmultirule(dq)
629 if dq.qtype == DNSQType.A then
630 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
631 return DNSAction.HeaderModify
632 elseif dq.qtype == DNSQType.TXT then
633 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
634 return DNSAction.HeaderModify
635 elseif dq.qtype == DNSQType.SRV then
636 dq.dh:setAA(true)
637 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
638 return DNSAction.HeaderModify
639 end
640 return DNSAction.None, ""
641 end
642
643 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
644 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
645 newServer{address="127.0.0.1:%s"}
646 """
647
648 def testLuaSpoofMultiA(self):
649 """
650 Spoofing: Spoofing multiple A via Lua dq:spoof
651
652 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
653 check that dnsdist sends a spoofed result.
654 """
655 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
656 query = dns.message.make_query(name, 'A', 'IN')
657 # dnsdist set RA = RD for spoofed responses
658 query.flags &= ~dns.flags.RD
659 expectedResponse = dns.message.make_response(query)
660 rrset = dns.rrset.from_text(name,
661 60,
662 dns.rdataclass.IN,
663 dns.rdatatype.A,
664 '192.0.2.1', '192.0.2.2')
665 expectedResponse.answer.append(rrset)
666
667 for method in ("sendUDPQuery", "sendTCPQuery"):
668 sender = getattr(self, method)
669 (_, receivedResponse) = sender(query, response=None, useQueue=False)
670 self.assertTrue(receivedResponse)
671 self.assertEqual(expectedResponse, receivedResponse)
672
673 def testLuaSpoofMultiAAAA(self):
674 """
675 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
676
677 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
678 check that dnsdist sends a spoofed result.
679 """
680 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
681 query = dns.message.make_query(name, 'AAAA', 'IN')
682 # dnsdist set RA = RD for spoofed responses
683 query.flags &= ~dns.flags.RD
684 expectedResponse = dns.message.make_response(query)
685 rrset = dns.rrset.from_text(name,
686 60,
687 dns.rdataclass.IN,
688 dns.rdatatype.AAAA,
689 '2001:DB8::1', '2001:DB8::2')
690 expectedResponse.answer.append(rrset)
691
692 for method in ("sendUDPQuery", "sendTCPQuery"):
693 sender = getattr(self, method)
694 (_, receivedResponse) = sender(query, response=None, useQueue=False)
695 self.assertTrue(receivedResponse)
696 self.assertEqual(expectedResponse, receivedResponse)
697
698 def testLuaSpoofMultiRawAction(self):
699 """
700 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
701 """
702 name = 'lua-raw-multi.spoofing.tests.powerdns.com.'
703
704 # A
705 query = dns.message.make_query(name, 'A', 'IN')
706 query.flags &= ~dns.flags.RD
707 expectedResponse = dns.message.make_response(query)
708 expectedResponse.flags &= ~dns.flags.AA
709 rrset = dns.rrset.from_text(name,
710 60,
711 dns.rdataclass.IN,
712 dns.rdatatype.A,
713 '192.0.2.1', '192.0.2.2')
714 expectedResponse.answer.append(rrset)
715
716 for method in ("sendUDPQuery", "sendTCPQuery"):
717 sender = getattr(self, method)
718 (_, receivedResponse) = sender(query, response=None, useQueue=False)
719 self.assertTrue(receivedResponse)
720 self.assertEqual(expectedResponse, receivedResponse)
721 self.assertEqual(receivedResponse.answer[0].ttl, 60)
722
723 # TXT
724 query = dns.message.make_query(name, 'TXT', 'IN')
725 query.flags &= ~dns.flags.RD
726 expectedResponse = dns.message.make_response(query)
727 expectedResponse.flags &= ~dns.flags.AA
728 rrset = dns.rrset.from_text(name,
729 60,
730 dns.rdataclass.IN,
731 dns.rdatatype.TXT,
732 '"aaa" "bbbb"', '"ccccccccccc"')
733 expectedResponse.answer.append(rrset)
734
735 for method in ("sendUDPQuery", "sendTCPQuery"):
736 sender = getattr(self, method)
737 (_, receivedResponse) = sender(query, response=None, useQueue=False)
738 self.assertTrue(receivedResponse)
739 self.assertEqual(expectedResponse, receivedResponse)
740 self.assertEqual(receivedResponse.answer[0].ttl, 60)
741
742 # SRV
743 query = dns.message.make_query(name, 'SRV', 'IN')
744 query.flags &= ~dns.flags.RD
745 expectedResponse = dns.message.make_response(query)
746 # this one should have the AA flag set
747 expectedResponse.flags |= dns.flags.AA
748 rrset = dns.rrset.from_text(name,
749 3600,
750 dns.rdataclass.IN,
751 dns.rdatatype.SRV,
752 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
753 expectedResponse.answer.append(rrset)
754
755 for method in ("sendUDPQuery", "sendTCPQuery"):
756 sender = getattr(self, method)
757 (_, receivedResponse) = sender(query, response=None, useQueue=False)
758 self.assertTrue(receivedResponse)
759 self.assertEqual(expectedResponse, receivedResponse)
760 # sorry, we can't set the TTL from the Lua API right now
761 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
762
763 class TestSpoofingLuaWithStatistics(DNSDistTest):
764
765 _config_template = """
766 function spoof1rule(dq)
767 queriesCount = getStatisticsCounters()['queries']
768 if(queriesCount == 1) then
769 return DNSAction.Spoof, "192.0.2.1"
770 elseif(queriesCount == 2) then
771 return DNSAction.Spoof, "192.0.2.2"
772 else
773 return DNSAction.Spoof, "192.0.2.0"
774 end
775 end
776 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
777 newServer{address="127.0.0.1:%s"}
778 """
779
780 def testLuaSpoofBasedOnStatistics(self):
781 """
782 Spoofing: Spoofing an A via Lua based on statistics counters
783
784 """
785 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
786 query = dns.message.make_query(name, 'A', 'IN')
787 # dnsdist set RA = RD for spoofed responses
788 query.flags &= ~dns.flags.RD
789 expectedResponse1 = dns.message.make_response(query)
790 rrset = dns.rrset.from_text(name,
791 60,
792 dns.rdataclass.IN,
793 dns.rdatatype.A,
794 '192.0.2.1')
795 expectedResponse1.answer.append(rrset)
796 expectedResponse2 = dns.message.make_response(query)
797 rrset = dns.rrset.from_text(name,
798 60,
799 dns.rdataclass.IN,
800 dns.rdatatype.A,
801 '192.0.2.2')
802 expectedResponse2.answer.append(rrset)
803 expectedResponseAfterwards = dns.message.make_response(query)
804 rrset = dns.rrset.from_text(name,
805 60,
806 dns.rdataclass.IN,
807 dns.rdatatype.A,
808 '192.0.2.0')
809 expectedResponseAfterwards.answer.append(rrset)
810
811 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
812 self.assertTrue(receivedResponse)
813 self.assertEqual(expectedResponse1, receivedResponse)
814
815 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
816 self.assertTrue(receivedResponse)
817 self.assertEqual(expectedResponse2, receivedResponse)
818
819 for method in ("sendUDPQuery", "sendTCPQuery"):
820 sender = getattr(self, method)
821 (_, receivedResponse) = sender(query, response=None, useQueue=False)
822 self.assertTrue(receivedResponse)
823 self.assertEqual(expectedResponseAfterwards, receivedResponse)