]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #8787 from rgacogne/ddist-tls-key-log-file
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
13 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
14 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
15 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
16 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
17 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
18 newServer{address="127.0.0.1:%s"}
19 """
20
21 def testSpoofActionA(self):
22 """
23 Spoofing: Spoof A via Action
24
25 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
26 check that dnsdist sends a spoofed result.
27 """
28 name = 'spoofaction.spoofing.tests.powerdns.com.'
29 query = dns.message.make_query(name, 'A', 'IN')
30 # dnsdist set RA = RD for spoofed responses
31 query.flags &= ~dns.flags.RD
32 expectedResponse = dns.message.make_response(query)
33 rrset = dns.rrset.from_text(name,
34 60,
35 dns.rdataclass.IN,
36 dns.rdatatype.A,
37 '192.0.2.1')
38 expectedResponse.answer.append(rrset)
39
40 for method in ("sendUDPQuery", "sendTCPQuery"):
41 sender = getattr(self, method)
42 (_, receivedResponse) = sender(query, response=None, useQueue=False)
43 self.assertTrue(receivedResponse)
44 self.assertEquals(expectedResponse, receivedResponse)
45
46 def testSpoofActionAAAA(self):
47 """
48 Spoofing: Spoof AAAA via Action
49
50 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
51 check that dnsdist sends a spoofed result.
52 """
53 name = 'spoofaction.spoofing.tests.powerdns.com.'
54 query = dns.message.make_query(name, 'AAAA', 'IN')
55 # dnsdist set RA = RD for spoofed responses
56 query.flags &= ~dns.flags.RD
57 expectedResponse = dns.message.make_response(query)
58 rrset = dns.rrset.from_text(name,
59 60,
60 dns.rdataclass.IN,
61 dns.rdatatype.AAAA,
62 '2001:DB8::1')
63 expectedResponse.answer.append(rrset)
64
65 for method in ("sendUDPQuery", "sendTCPQuery"):
66 sender = getattr(self, method)
67 (_, receivedResponse) = sender(query, response=None, useQueue=False)
68 self.assertTrue(receivedResponse)
69 self.assertEquals(expectedResponse, receivedResponse)
70
71 def testSpoofActionCNAME(self):
72 """
73 Spoofing: Spoof CNAME via Action
74
75 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
76 check that dnsdist sends a spoofed result.
77 """
78 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
79 query = dns.message.make_query(name, 'A', 'IN')
80 # dnsdist set RA = RD for spoofed responses
81 query.flags &= ~dns.flags.RD
82 expectedResponse = dns.message.make_response(query)
83 rrset = dns.rrset.from_text(name,
84 60,
85 dns.rdataclass.IN,
86 dns.rdatatype.CNAME,
87 'cnameaction.spoofing.tests.powerdns.com.')
88 expectedResponse.answer.append(rrset)
89
90 for method in ("sendUDPQuery", "sendTCPQuery"):
91 sender = getattr(self, method)
92 (_, receivedResponse) = sender(query, response=None, useQueue=False)
93 self.assertTrue(receivedResponse)
94 self.assertEquals(expectedResponse, receivedResponse)
95
96 def testSpoofActionMultiA(self):
97 """
98 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
99
100 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
101 check that dnsdist sends a spoofed result.
102 """
103 name = 'multispoof.spoofing.tests.powerdns.com.'
104 query = dns.message.make_query(name, 'A', 'IN')
105 # dnsdist set RA = RD for spoofed responses
106 query.flags &= ~dns.flags.RD
107 expectedResponse = dns.message.make_response(query)
108 rrset = dns.rrset.from_text(name,
109 60,
110 dns.rdataclass.IN,
111 dns.rdatatype.A,
112 '192.0.2.2', '192.0.2.1')
113 expectedResponse.answer.append(rrset)
114
115 for method in ("sendUDPQuery", "sendTCPQuery"):
116 sender = getattr(self, method)
117 (_, receivedResponse) = sender(query, response=None, useQueue=False)
118 self.assertTrue(receivedResponse)
119 self.assertEquals(expectedResponse, receivedResponse)
120
121 def testSpoofActionMultiAAAA(self):
122 """
123 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
124
125 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
126 check that dnsdist sends a spoofed result.
127 """
128 name = 'multispoof.spoofing.tests.powerdns.com.'
129 query = dns.message.make_query(name, 'AAAA', 'IN')
130 # dnsdist set RA = RD for spoofed responses
131 query.flags &= ~dns.flags.RD
132 expectedResponse = dns.message.make_response(query)
133 rrset = dns.rrset.from_text(name,
134 60,
135 dns.rdataclass.IN,
136 dns.rdatatype.AAAA,
137 '2001:DB8::1', '2001:DB8::2')
138 expectedResponse.answer.append(rrset)
139
140 for method in ("sendUDPQuery", "sendTCPQuery"):
141 sender = getattr(self, method)
142 (_, receivedResponse) = sender(query, response=None, useQueue=False)
143 self.assertTrue(receivedResponse)
144 self.assertEquals(expectedResponse, receivedResponse)
145
146 def testSpoofActionMultiANY(self):
147 """
148 Spoofing: Spoof multiple addresses via AddDomainSpoof
149
150 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
151 check that dnsdist sends a spoofed result.
152 """
153 name = 'multispoof.spoofing.tests.powerdns.com.'
154 query = dns.message.make_query(name, 'ANY', 'IN')
155 # dnsdist set RA = RD for spoofed responses
156 query.flags &= ~dns.flags.RD
157 expectedResponse = dns.message.make_response(query)
158
159 rrset = dns.rrset.from_text(name,
160 60,
161 dns.rdataclass.IN,
162 dns.rdatatype.A,
163 '192.0.2.2', '192.0.2.1')
164 expectedResponse.answer.append(rrset)
165
166 rrset = dns.rrset.from_text(name,
167 60,
168 dns.rdataclass.IN,
169 dns.rdatatype.AAAA,
170 '2001:DB8::1', '2001:DB8::2')
171 expectedResponse.answer.append(rrset)
172
173 for method in ("sendUDPQuery", "sendTCPQuery"):
174 sender = getattr(self, method)
175 (_, receivedResponse) = sender(query, response=None, useQueue=False)
176 self.assertTrue(receivedResponse)
177 self.assertEquals(expectedResponse, receivedResponse)
178
179 def testSpoofActionSetAA(self):
180 """
181 Spoofing: Spoof via Action, setting AA=1
182 """
183 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
184 query = dns.message.make_query(name, 'AAAA', 'IN')
185 # dnsdist set RA = RD for spoofed responses
186 query.flags &= ~dns.flags.RD
187 expectedResponse = dns.message.make_response(query)
188 expectedResponse.flags |= dns.flags.AA
189 rrset = dns.rrset.from_text(name,
190 60,
191 dns.rdataclass.IN,
192 dns.rdatatype.AAAA,
193 '2001:DB8::1')
194 expectedResponse.answer.append(rrset)
195
196 for method in ("sendUDPQuery", "sendTCPQuery"):
197 sender = getattr(self, method)
198 (_, receivedResponse) = sender(query, response=None, useQueue=False)
199 self.assertTrue(receivedResponse)
200 self.assertEquals(expectedResponse, receivedResponse)
201 self.assertEquals(receivedResponse.answer[0].ttl, 60)
202
203 def testSpoofActionSetAD(self):
204 """
205 Spoofing: Spoof via Action, setting AD=1
206 """
207 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
208 query = dns.message.make_query(name, 'AAAA', 'IN')
209 # dnsdist set RA = RD for spoofed responses
210 query.flags &= ~dns.flags.RD
211 expectedResponse = dns.message.make_response(query)
212 expectedResponse.flags |= dns.flags.AD
213 rrset = dns.rrset.from_text(name,
214 60,
215 dns.rdataclass.IN,
216 dns.rdatatype.AAAA,
217 '2001:DB8::1')
218 expectedResponse.answer.append(rrset)
219
220 for method in ("sendUDPQuery", "sendTCPQuery"):
221 sender = getattr(self, method)
222 (_, receivedResponse) = sender(query, response=None, useQueue=False)
223 self.assertTrue(receivedResponse)
224 self.assertEquals(expectedResponse, receivedResponse)
225 self.assertEquals(receivedResponse.answer[0].ttl, 60)
226
227 def testSpoofActionSetRA(self):
228 """
229 Spoofing: Spoof via Action, setting RA=1
230 """
231 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
232 query = dns.message.make_query(name, 'AAAA', 'IN')
233 # dnsdist set RA = RD for spoofed responses
234 query.flags &= ~dns.flags.RD
235 expectedResponse = dns.message.make_response(query)
236 expectedResponse.flags |= dns.flags.RA
237 rrset = dns.rrset.from_text(name,
238 60,
239 dns.rdataclass.IN,
240 dns.rdatatype.AAAA,
241 '2001:DB8::1')
242 expectedResponse.answer.append(rrset)
243
244 for method in ("sendUDPQuery", "sendTCPQuery"):
245 sender = getattr(self, method)
246 (_, receivedResponse) = sender(query, response=None, useQueue=False)
247 self.assertTrue(receivedResponse)
248 self.assertEquals(expectedResponse, receivedResponse)
249 self.assertEquals(receivedResponse.answer[0].ttl, 60)
250
251 def testSpoofActionSetNoRA(self):
252 """
253 Spoofing: Spoof via Action, setting RA=0
254 """
255 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
256 query = dns.message.make_query(name, 'AAAA', 'IN')
257 expectedResponse = dns.message.make_response(query)
258 expectedResponse.flags &= ~dns.flags.RA
259 rrset = dns.rrset.from_text(name,
260 60,
261 dns.rdataclass.IN,
262 dns.rdatatype.AAAA,
263 '2001:DB8::1')
264 expectedResponse.answer.append(rrset)
265
266 for method in ("sendUDPQuery", "sendTCPQuery"):
267 sender = getattr(self, method)
268 (_, receivedResponse) = sender(query, response=None, useQueue=False)
269 self.assertTrue(receivedResponse)
270 self.assertEquals(expectedResponse, receivedResponse)
271 self.assertEquals(receivedResponse.answer[0].ttl, 60)
272
273 def testSpoofRawAction(self):
274 """
275 Spoofing: Spoof a response from raw bytes
276 """
277 name = 'raw.spoofing.tests.powerdns.com.'
278
279 # A
280 query = dns.message.make_query(name, 'A', 'IN')
281 query.flags &= ~dns.flags.RD
282 expectedResponse = dns.message.make_response(query)
283 expectedResponse.flags &= ~dns.flags.AA
284 rrset = dns.rrset.from_text(name,
285 60,
286 dns.rdataclass.IN,
287 dns.rdatatype.A,
288 '192.0.2.1')
289 expectedResponse.answer.append(rrset)
290
291 for method in ("sendUDPQuery", "sendTCPQuery"):
292 sender = getattr(self, method)
293 (_, receivedResponse) = sender(query, response=None, useQueue=False)
294 self.assertTrue(receivedResponse)
295 self.assertEquals(expectedResponse, receivedResponse)
296 self.assertEquals(receivedResponse.answer[0].ttl, 60)
297
298 # TXT
299 query = dns.message.make_query(name, 'TXT', 'IN')
300 query.flags &= ~dns.flags.RD
301 expectedResponse = dns.message.make_response(query)
302 expectedResponse.flags &= ~dns.flags.AA
303 rrset = dns.rrset.from_text(name,
304 60,
305 dns.rdataclass.IN,
306 dns.rdatatype.TXT,
307 '"aaa" "bbbb" "ccccccccccc"')
308 expectedResponse.answer.append(rrset)
309
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.assertTrue(receivedResponse)
314 self.assertEquals(expectedResponse, receivedResponse)
315 self.assertEquals(receivedResponse.answer[0].ttl, 60)
316
317 # SRV
318 query = dns.message.make_query(name, 'SRV', 'IN')
319 query.flags &= ~dns.flags.RD
320 expectedResponse = dns.message.make_response(query)
321 # this one should have the AA flag set
322 expectedResponse.flags |= dns.flags.AA
323 rrset = dns.rrset.from_text(name,
324 3600,
325 dns.rdataclass.IN,
326 dns.rdatatype.SRV,
327 '0 0 65535 srv.powerdns.com.')
328 expectedResponse.answer.append(rrset)
329
330 for method in ("sendUDPQuery", "sendTCPQuery"):
331 sender = getattr(self, method)
332 (_, receivedResponse) = sender(query, response=None, useQueue=False)
333 self.assertTrue(receivedResponse)
334 self.assertEquals(expectedResponse, receivedResponse)
335 self.assertEquals(receivedResponse.answer[0].ttl, 3600)
336
337 class TestSpoofingLuaSpoof(DNSDistTest):
338
339 _config_template = """
340 function spoof1rule(dq)
341 if(dq.qtype==1) -- A
342 then
343 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
344 elseif(dq.qtype == 28) -- AAAA
345 then
346 return DNSAction.Spoof, "2001:DB8::1"
347 else
348 return DNSAction.None, ""
349 end
350 end
351
352 function spoof2rule(dq)
353 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
354 end
355
356 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
357 addAction(AndRule{makeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
358
359 function spoofrawrule(dq)
360 if dq.qtype == DNSQType.A then
361 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
362 elseif dq.qtype == DNSQType.TXT then
363 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
364 elseif dq.qtype == DNSQType.SRV then
365 dq.dh:setAA(true)
366 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
367 end
368 return DNSAction.None, ""
369 end
370
371 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
372 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
373 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
374 newServer{address="127.0.0.1:%s"}
375 """
376
377 def testLuaSpoofA(self):
378 """
379 Spoofing: Spoofing an A via Lua
380
381 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
382 check that dnsdist sends a spoofed result.
383 """
384 name = 'luaspoof1.spoofing.tests.powerdns.com.'
385 query = dns.message.make_query(name, 'A', 'IN')
386 # dnsdist set RA = RD for spoofed responses
387 query.flags &= ~dns.flags.RD
388 expectedResponse = dns.message.make_response(query)
389 rrset = dns.rrset.from_text(name,
390 60,
391 dns.rdataclass.IN,
392 dns.rdatatype.A,
393 '192.0.2.1', '192.0.2.2')
394 expectedResponse.answer.append(rrset)
395
396 for method in ("sendUDPQuery", "sendTCPQuery"):
397 sender = getattr(self, method)
398 (_, receivedResponse) = sender(query, response=None, useQueue=False)
399 self.assertTrue(receivedResponse)
400 self.assertEquals(expectedResponse, receivedResponse)
401
402 def testLuaSpoofAAAA(self):
403 """
404 Spoofing: Spoofing an AAAA via Lua
405
406 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
407 check that dnsdist sends a spoofed result.
408 """
409 name = 'luaspoof1.spoofing.tests.powerdns.com.'
410 query = dns.message.make_query(name, 'AAAA', 'IN')
411 # dnsdist set RA = RD for spoofed responses
412 query.flags &= ~dns.flags.RD
413 expectedResponse = dns.message.make_response(query)
414 rrset = dns.rrset.from_text(name,
415 60,
416 dns.rdataclass.IN,
417 dns.rdatatype.AAAA,
418 '2001:DB8::1')
419 expectedResponse.answer.append(rrset)
420
421 for method in ("sendUDPQuery", "sendTCPQuery"):
422 sender = getattr(self, method)
423 (_, receivedResponse) = sender(query, response=None, useQueue=False)
424 self.assertTrue(receivedResponse)
425 self.assertEquals(expectedResponse, receivedResponse)
426
427 def testLuaSpoofAWithCNAME(self):
428 """
429 Spoofing: Spoofing an A with a CNAME via Lua
430
431 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
432 check that dnsdist sends a spoofed result.
433 """
434 name = 'luaspoof2.spoofing.tests.powerdns.com.'
435 query = dns.message.make_query(name, 'A', 'IN')
436 # dnsdist set RA = RD for spoofed responses
437 query.flags &= ~dns.flags.RD
438 expectedResponse = dns.message.make_response(query)
439 rrset = dns.rrset.from_text(name,
440 60,
441 dns.rdataclass.IN,
442 dns.rdatatype.CNAME,
443 'spoofedcname.spoofing.tests.powerdns.com.')
444 expectedResponse.answer.append(rrset)
445
446 for method in ("sendUDPQuery", "sendTCPQuery"):
447 sender = getattr(self, method)
448 (_, receivedResponse) = sender(query, response=None, useQueue=False)
449 self.assertTrue(receivedResponse)
450 self.assertEquals(expectedResponse, receivedResponse)
451
452 def testLuaSpoofAAAAWithCNAME(self):
453 """
454 Spoofing: Spoofing an AAAA with a CNAME via Lua
455
456 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
457 check that dnsdist sends a spoofed result.
458 """
459 name = 'luaspoof2.spoofing.tests.powerdns.com.'
460 query = dns.message.make_query(name, 'AAAA', 'IN')
461 # dnsdist set RA = RD for spoofed responses
462 query.flags &= ~dns.flags.RD
463 expectedResponse = dns.message.make_response(query)
464 rrset = dns.rrset.from_text(name,
465 60,
466 dns.rdataclass.IN,
467 dns.rdatatype.CNAME,
468 'spoofedcname.spoofing.tests.powerdns.com.')
469 expectedResponse.answer.append(rrset)
470
471 for method in ("sendUDPQuery", "sendTCPQuery"):
472 sender = getattr(self, method)
473 (_, receivedResponse) = sender(query, response=None, useQueue=False)
474 self.assertTrue(receivedResponse)
475 self.assertEquals(expectedResponse, receivedResponse)
476
477 def testLuaSpoofRawAction(self):
478 """
479 Spoofing: Spoof a response from raw bytes via Lua
480 """
481 name = 'lua-raw.spoofing.tests.powerdns.com.'
482
483 # A
484 query = dns.message.make_query(name, 'A', 'IN')
485 query.flags &= ~dns.flags.RD
486 expectedResponse = dns.message.make_response(query)
487 expectedResponse.flags &= ~dns.flags.AA
488 rrset = dns.rrset.from_text(name,
489 60,
490 dns.rdataclass.IN,
491 dns.rdatatype.A,
492 '192.0.2.1')
493 expectedResponse.answer.append(rrset)
494
495 for method in ("sendUDPQuery", "sendTCPQuery"):
496 sender = getattr(self, method)
497 (_, receivedResponse) = sender(query, response=None, useQueue=False)
498 self.assertTrue(receivedResponse)
499 self.assertEquals(expectedResponse, receivedResponse)
500 self.assertEquals(receivedResponse.answer[0].ttl, 60)
501
502 # TXT
503 query = dns.message.make_query(name, 'TXT', 'IN')
504 query.flags &= ~dns.flags.RD
505 expectedResponse = dns.message.make_response(query)
506 expectedResponse.flags &= ~dns.flags.AA
507 rrset = dns.rrset.from_text(name,
508 60,
509 dns.rdataclass.IN,
510 dns.rdatatype.TXT,
511 '"aaa" "bbbb" "ccccccccccc"')
512 expectedResponse.answer.append(rrset)
513
514 for method in ("sendUDPQuery", "sendTCPQuery"):
515 sender = getattr(self, method)
516 (_, receivedResponse) = sender(query, response=None, useQueue=False)
517 self.assertTrue(receivedResponse)
518 self.assertEquals(expectedResponse, receivedResponse)
519 self.assertEquals(receivedResponse.answer[0].ttl, 60)
520
521 # SRV
522 query = dns.message.make_query(name, 'SRV', 'IN')
523 query.flags &= ~dns.flags.RD
524 expectedResponse = dns.message.make_response(query)
525 # this one should have the AA flag set
526 expectedResponse.flags |= dns.flags.AA
527 rrset = dns.rrset.from_text(name,
528 3600,
529 dns.rdataclass.IN,
530 dns.rdatatype.SRV,
531 '0 0 65535 srv.powerdns.com.')
532 expectedResponse.answer.append(rrset)
533
534 for method in ("sendUDPQuery", "sendTCPQuery"):
535 sender = getattr(self, method)
536 (_, receivedResponse) = sender(query, response=None, useQueue=False)
537 self.assertTrue(receivedResponse)
538 self.assertEquals(expectedResponse, receivedResponse)
539 # sorry, we can't set the TTL from the Lua API right now
540 #self.assertEquals(receivedResponse.answer[0].ttl, 3600)
541
542 class TestSpoofingLuaWithStatistics(DNSDistTest):
543
544 _config_template = """
545 function spoof1rule(dq)
546 queriesCount = getStatisticsCounters()['queries']
547 if(queriesCount == 1) then
548 return DNSAction.Spoof, "192.0.2.1"
549 elseif(queriesCount == 2) then
550 return DNSAction.Spoof, "192.0.2.2"
551 else
552 return DNSAction.Spoof, "192.0.2.0"
553 end
554 end
555 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
556 newServer{address="127.0.0.1:%s"}
557 """
558
559 def testLuaSpoofBasedOnStatistics(self):
560 """
561 Spoofing: Spoofing an A via Lua based on statistics counters
562
563 """
564 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
565 query = dns.message.make_query(name, 'A', 'IN')
566 # dnsdist set RA = RD for spoofed responses
567 query.flags &= ~dns.flags.RD
568 expectedResponse1 = dns.message.make_response(query)
569 rrset = dns.rrset.from_text(name,
570 60,
571 dns.rdataclass.IN,
572 dns.rdatatype.A,
573 '192.0.2.1')
574 expectedResponse1.answer.append(rrset)
575 expectedResponse2 = dns.message.make_response(query)
576 rrset = dns.rrset.from_text(name,
577 60,
578 dns.rdataclass.IN,
579 dns.rdatatype.A,
580 '192.0.2.2')
581 expectedResponse2.answer.append(rrset)
582 expectedResponseAfterwards = dns.message.make_response(query)
583 rrset = dns.rrset.from_text(name,
584 60,
585 dns.rdataclass.IN,
586 dns.rdatatype.A,
587 '192.0.2.0')
588 expectedResponseAfterwards.answer.append(rrset)
589
590 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
591 self.assertTrue(receivedResponse)
592 self.assertEquals(expectedResponse1, receivedResponse)
593
594 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
595 self.assertTrue(receivedResponse)
596 self.assertEquals(expectedResponse2, receivedResponse)
597
598 for method in ("sendUDPQuery", "sendTCPQuery"):
599 sender = getattr(self, method)
600 (_, receivedResponse) = sender(query, response=None, useQueue=False)
601 self.assertTrue(receivedResponse)
602 self.assertEquals(expectedResponseAfterwards, receivedResponse)