]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Advanced.py
3 from datetime
import datetime
, timedelta
8 import clientsubnetoption
9 from dnsdisttests
import DNSDistTest
11 class TestAdvancedAllow(DNSDistTest
):
13 _config_template
= """
14 addAction(AllRule(), NoneAction())
15 addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
16 addAction(AllRule(), DropAction())
17 newServer{address="127.0.0.1:%s"}
20 def testAdvancedAllow(self
):
22 Advanced: Allowed qname is not dropped
24 A query for allowed.advanced.tests.powerdns.com. should be allowed
25 while others should be dropped.
27 name
= 'allowed.advanced.tests.powerdns.com.'
28 query
= dns
.message
.make_query(name
, 'A', 'IN')
29 response
= dns
.message
.make_response(query
)
30 rrset
= dns
.rrset
.from_text(name
,
35 response
.answer
.append(rrset
)
37 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
38 self
.assertTrue(receivedQuery
)
39 self
.assertTrue(receivedResponse
)
40 receivedQuery
.id = query
.id
41 self
.assertEquals(query
, receivedQuery
)
42 self
.assertEquals(response
, receivedResponse
)
44 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
45 self
.assertTrue(receivedQuery
)
46 self
.assertTrue(receivedResponse
)
47 receivedQuery
.id = query
.id
48 self
.assertEquals(query
, receivedQuery
)
49 self
.assertEquals(response
, receivedResponse
)
51 def testAdvancedAllowDropped(self
):
53 Advanced: Not allowed qname is dropped
55 A query for notallowed.advanced.tests.powerdns.com. should be dropped.
57 name
= 'notallowed.advanced.tests.powerdns.com.'
58 query
= dns
.message
.make_query(name
, 'A', 'IN')
59 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
60 self
.assertEquals(receivedResponse
, None)
62 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
63 self
.assertEquals(receivedResponse
, None)
65 class TestAdvancedFixupCase(DNSDistTest
):
67 _config_template
= """
70 newServer{address="127.0.0.1:%s"}
73 def testAdvancedFixupCase(self
):
77 Send a query with lower and upper chars,
78 make the backend return a lowercase version,
79 check that dnsdist fixes the response.
81 name
= 'fiXuPCasE.advanced.tests.powerdns.com.'
82 query
= dns
.message
.make_query(name
, 'A', 'IN')
83 lowercasequery
= dns
.message
.make_query(name
.lower(), 'A', 'IN')
84 response
= dns
.message
.make_response(lowercasequery
)
85 expectedResponse
= dns
.message
.make_response(query
)
86 rrset
= dns
.rrset
.from_text(name
,
91 response
.answer
.append(rrset
)
92 expectedResponse
.answer
.append(rrset
)
94 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
95 self
.assertTrue(receivedQuery
)
96 self
.assertTrue(receivedResponse
)
97 receivedQuery
.id = query
.id
98 self
.assertEquals(query
, receivedQuery
)
99 self
.assertEquals(expectedResponse
, receivedResponse
)
101 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
102 self
.assertTrue(receivedQuery
)
103 self
.assertTrue(receivedResponse
)
104 receivedQuery
.id = query
.id
105 self
.assertEquals(query
, receivedQuery
)
106 self
.assertEquals(expectedResponse
, receivedResponse
)
109 class TestAdvancedRemoveRD(DNSDistTest
):
111 _config_template
= """
112 addAction("norecurse.advanced.tests.powerdns.com.", NoRecurseAction())
113 newServer{address="127.0.0.1:%s"}
116 def testAdvancedNoRD(self
):
120 Send a query with RD,
121 check that dnsdist clears the RD flag.
123 name
= 'norecurse.advanced.tests.powerdns.com.'
124 query
= dns
.message
.make_query(name
, 'A', 'IN')
125 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
126 expectedQuery
.flags
&= ~dns
.flags
.RD
128 response
= dns
.message
.make_response(query
)
129 rrset
= dns
.rrset
.from_text(name
,
134 response
.answer
.append(rrset
)
136 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
137 self
.assertTrue(receivedQuery
)
138 self
.assertTrue(receivedResponse
)
139 receivedQuery
.id = expectedQuery
.id
140 self
.assertEquals(expectedQuery
, receivedQuery
)
141 self
.assertEquals(response
, receivedResponse
)
143 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
144 self
.assertTrue(receivedQuery
)
145 self
.assertTrue(receivedResponse
)
146 receivedQuery
.id = expectedQuery
.id
147 self
.assertEquals(expectedQuery
, receivedQuery
)
148 self
.assertEquals(response
, receivedResponse
)
150 def testAdvancedKeepRD(self
):
152 Advanced: No RD canary
154 Send a query with RD for a canary domain,
155 check that dnsdist does not clear the RD flag.
157 name
= 'keeprecurse.advanced.tests.powerdns.com.'
158 query
= dns
.message
.make_query(name
, 'A', 'IN')
160 response
= dns
.message
.make_response(query
)
161 rrset
= dns
.rrset
.from_text(name
,
166 response
.answer
.append(rrset
)
168 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
169 self
.assertTrue(receivedQuery
)
170 self
.assertTrue(receivedResponse
)
171 receivedQuery
.id = query
.id
172 self
.assertEquals(query
, receivedQuery
)
173 self
.assertEquals(response
, receivedResponse
)
175 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
176 self
.assertTrue(receivedQuery
)
177 self
.assertTrue(receivedResponse
)
178 receivedQuery
.id = query
.id
179 self
.assertEquals(query
, receivedQuery
)
180 self
.assertEquals(response
, receivedResponse
)
183 class TestAdvancedAddCD(DNSDistTest
):
185 _config_template
= """
186 addAction("setcd.advanced.tests.powerdns.com.", DisableValidationAction())
187 addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
188 newServer{address="127.0.0.1:%s"}
191 def testAdvancedSetCD(self
):
195 Send a query with CD cleared,
196 check that dnsdist set the CD flag.
198 name
= 'setcd.advanced.tests.powerdns.com.'
199 query
= dns
.message
.make_query(name
, 'A', 'IN')
200 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
201 expectedQuery
.flags |
= dns
.flags
.CD
203 response
= dns
.message
.make_response(query
)
204 rrset
= dns
.rrset
.from_text(name
,
209 response
.answer
.append(rrset
)
211 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
212 self
.assertTrue(receivedQuery
)
213 self
.assertTrue(receivedResponse
)
214 receivedQuery
.id = expectedQuery
.id
215 self
.assertEquals(expectedQuery
, receivedQuery
)
216 self
.assertEquals(response
, receivedResponse
)
218 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
219 self
.assertTrue(receivedQuery
)
220 self
.assertTrue(receivedResponse
)
221 receivedQuery
.id = expectedQuery
.id
222 self
.assertEquals(expectedQuery
, receivedQuery
)
223 self
.assertEquals(response
, receivedResponse
)
225 def testAdvancedSetCDViaAction(self
):
227 Advanced: Set CD via Action
229 Send a query with CD cleared,
230 check that dnsdist set the CD flag.
232 name
= 'setcdviaaction.advanced.tests.powerdns.com.'
233 query
= dns
.message
.make_query(name
, 'A', 'IN')
234 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
235 expectedQuery
.flags |
= dns
.flags
.CD
237 response
= dns
.message
.make_response(query
)
238 rrset
= dns
.rrset
.from_text(name
,
243 response
.answer
.append(rrset
)
245 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
246 self
.assertTrue(receivedQuery
)
247 self
.assertTrue(receivedResponse
)
248 receivedQuery
.id = expectedQuery
.id
249 self
.assertEquals(expectedQuery
, receivedQuery
)
250 self
.assertEquals(response
, receivedResponse
)
252 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
253 self
.assertTrue(receivedQuery
)
254 self
.assertTrue(receivedResponse
)
255 receivedQuery
.id = expectedQuery
.id
256 self
.assertEquals(expectedQuery
, receivedQuery
)
257 self
.assertEquals(response
, receivedResponse
)
259 def testAdvancedKeepNoCD(self
):
261 Advanced: Preserve CD canary
263 Send a query without CD for a canary domain,
264 check that dnsdist does not set the CD flag.
266 name
= 'keepnocd.advanced.tests.powerdns.com.'
267 query
= dns
.message
.make_query(name
, 'A', 'IN')
269 response
= dns
.message
.make_response(query
)
270 rrset
= dns
.rrset
.from_text(name
,
275 response
.answer
.append(rrset
)
277 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
278 self
.assertTrue(receivedQuery
)
279 self
.assertTrue(receivedResponse
)
280 receivedQuery
.id = query
.id
281 self
.assertEquals(query
, receivedQuery
)
282 self
.assertEquals(response
, receivedResponse
)
284 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
285 self
.assertTrue(receivedQuery
)
286 self
.assertTrue(receivedResponse
)
287 receivedQuery
.id = query
.id
288 self
.assertEquals(query
, receivedQuery
)
289 self
.assertEquals(response
, receivedResponse
)
291 class TestAdvancedClearRD(DNSDistTest
):
293 _config_template
= """
294 addAction("clearrd.advanced.tests.powerdns.com.", NoRecurseAction())
295 addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
296 newServer{address="127.0.0.1:%s"}
299 def testAdvancedClearRD(self
):
303 Send a query with RD set,
304 check that dnsdist clears the RD flag.
306 name
= 'clearrd.advanced.tests.powerdns.com.'
307 query
= dns
.message
.make_query(name
, 'A', 'IN')
308 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
309 expectedQuery
.flags
&= ~dns
.flags
.RD
311 response
= dns
.message
.make_response(query
)
312 rrset
= dns
.rrset
.from_text(name
,
317 response
.answer
.append(rrset
)
319 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
320 self
.assertTrue(receivedQuery
)
321 self
.assertTrue(receivedResponse
)
322 receivedQuery
.id = expectedQuery
.id
323 self
.assertEquals(expectedQuery
, receivedQuery
)
324 self
.assertEquals(response
, receivedResponse
)
326 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
327 self
.assertTrue(receivedQuery
)
328 self
.assertTrue(receivedResponse
)
329 receivedQuery
.id = expectedQuery
.id
330 self
.assertEquals(expectedQuery
, receivedQuery
)
331 self
.assertEquals(response
, receivedResponse
)
333 def testAdvancedClearRDViaAction(self
):
335 Advanced: Clear RD via Action
337 Send a query with RD set,
338 check that dnsdist clears the RD flag.
340 name
= 'clearrdviaaction.advanced.tests.powerdns.com.'
341 query
= dns
.message
.make_query(name
, 'A', 'IN')
342 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
343 expectedQuery
.flags
&= ~dns
.flags
.RD
345 response
= dns
.message
.make_response(query
)
346 rrset
= dns
.rrset
.from_text(name
,
351 response
.answer
.append(rrset
)
353 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
354 self
.assertTrue(receivedQuery
)
355 self
.assertTrue(receivedResponse
)
356 receivedQuery
.id = expectedQuery
.id
357 self
.assertEquals(expectedQuery
, receivedQuery
)
358 self
.assertEquals(response
, receivedResponse
)
360 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
361 self
.assertTrue(receivedQuery
)
362 self
.assertTrue(receivedResponse
)
363 receivedQuery
.id = expectedQuery
.id
364 self
.assertEquals(expectedQuery
, receivedQuery
)
365 self
.assertEquals(response
, receivedResponse
)
367 def testAdvancedKeepRD(self
):
369 Advanced: Preserve RD canary
371 Send a query with RD for a canary domain,
372 check that dnsdist does not clear the RD flag.
374 name
= 'keeprd.advanced.tests.powerdns.com.'
375 query
= dns
.message
.make_query(name
, 'A', 'IN')
377 response
= dns
.message
.make_response(query
)
378 rrset
= dns
.rrset
.from_text(name
,
383 response
.answer
.append(rrset
)
385 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
386 self
.assertTrue(receivedQuery
)
387 self
.assertTrue(receivedResponse
)
388 receivedQuery
.id = query
.id
389 self
.assertEquals(query
, receivedQuery
)
390 self
.assertEquals(response
, receivedResponse
)
392 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
393 self
.assertTrue(receivedQuery
)
394 self
.assertTrue(receivedResponse
)
395 receivedQuery
.id = query
.id
396 self
.assertEquals(query
, receivedQuery
)
397 self
.assertEquals(response
, receivedResponse
)
400 class TestAdvancedACL(DNSDistTest
):
402 _config_template
= """
403 newServer{address="127.0.0.1:%s"}
405 _acl
= ['192.0.2.1/32']
407 def testACLBlocked(self
):
409 Advanced: ACL blocked
411 Send an A query to "tests.powerdns.com.",
412 we expect no response since 127.0.0.1 is not on the
415 name
= 'tests.powerdns.com.'
416 query
= dns
.message
.make_query(name
, 'A', 'IN')
418 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
419 self
.assertEquals(receivedResponse
, None)
421 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
422 self
.assertEquals(receivedResponse
, None)
424 class TestAdvancedDelay(DNSDistTest
):
426 _config_template
= """
427 addAction(AllRule(), DelayAction(1000))
428 newServer{address="127.0.0.1:%s"}
431 def testDelayed(self
):
435 Send an A query to "tests.powerdns.com.",
436 check that the response delay is longer than 1000 ms
437 over UDP, less than that over TCP.
439 name
= 'tests.powerdns.com.'
440 query
= dns
.message
.make_query(name
, 'A', 'IN')
441 response
= dns
.message
.make_response(query
)
442 rrset
= dns
.rrset
.from_text(name
,
447 response
.answer
.append(rrset
)
449 begin
= datetime
.now()
450 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
452 receivedQuery
.id = query
.id
453 self
.assertEquals(query
, receivedQuery
)
454 self
.assertEquals(response
, receivedResponse
)
455 self
.assertTrue((end
- begin
) > timedelta(0, 1))
457 begin
= datetime
.now()
458 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
460 receivedQuery
.id = query
.id
461 self
.assertEquals(query
, receivedQuery
)
462 self
.assertEquals(response
, receivedResponse
)
463 self
.assertTrue((end
- begin
) < timedelta(0, 1))
466 class TestAdvancedTruncateAnyAndTCP(DNSDistTest
):
468 _config_template
= """
470 addAction(AndRule({QTypeRule("ANY"), TCPRule(true)}), TCAction())
471 newServer{address="127.0.0.1:%s"}
473 def testTruncateAnyOverTCP(self
):
475 Advanced: Truncate ANY over TCP
477 Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
478 should be truncated over TCP, not over UDP (yes, it makes no sense,
481 name
= 'anytruncatetcp.advanced.tests.powerdns.com.'
482 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
484 response
= dns
.message
.make_response(query
)
485 rrset
= dns
.rrset
.from_text(name
,
491 response
.answer
.append(rrset
)
493 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
494 self
.assertTrue(receivedQuery
)
495 self
.assertTrue(receivedResponse
)
496 receivedQuery
.id = query
.id
497 self
.assertEquals(query
, receivedQuery
)
498 self
.assertEquals(receivedResponse
, response
)
500 expectedResponse
= dns
.message
.make_response(query
)
501 expectedResponse
.flags |
= dns
.flags
.TC
503 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
504 self
.assertEquals(receivedResponse
, expectedResponse
)
506 class TestAdvancedAndNot(DNSDistTest
):
508 _config_template
= """
509 addAction(AndRule({NotRule(QTypeRule("A")), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
510 newServer{address="127.0.0.1:%s"}
512 def testAOverUDPReturnsNotImplementedCanary(self
):
514 Advanced: !A && UDP canary
516 dnsdist is configured to reply 'not implemented' for query
517 over UDP AND !qtype A.
518 We send an A query over UDP and TCP, and check that the
521 name
= 'andnot.advanced.tests.powerdns.com.'
522 query
= dns
.message
.make_query(name
, 'A', 'IN')
523 response
= dns
.message
.make_response(query
)
524 rrset
= dns
.rrset
.from_text(name
,
529 response
.answer
.append(rrset
)
531 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
532 self
.assertTrue(receivedQuery
)
533 self
.assertTrue(receivedResponse
)
534 receivedQuery
.id = query
.id
535 self
.assertEquals(query
, receivedQuery
)
536 self
.assertEquals(receivedResponse
, response
)
538 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
539 self
.assertTrue(receivedQuery
)
540 self
.assertTrue(receivedResponse
)
541 receivedQuery
.id = query
.id
542 self
.assertEquals(query
, receivedQuery
)
543 self
.assertEquals(receivedResponse
, response
)
545 def testAOverUDPReturnsNotImplemented(self
):
549 dnsdist is configured to reply 'not implemented' for query
550 over UDP AND !qtype A.
551 We send a TXT query over UDP and TCP, and check that the
552 response is OK for TCP and 'not implemented' for UDP.
554 name
= 'andnot.advanced.tests.powerdns.com.'
555 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
557 expectedResponse
= dns
.message
.make_response(query
)
558 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
560 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
561 self
.assertEquals(receivedResponse
, expectedResponse
)
563 response
= dns
.message
.make_response(query
)
564 rrset
= dns
.rrset
.from_text(name
,
568 'nothing to see here')
569 response
.answer
.append(rrset
)
571 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
572 self
.assertTrue(receivedQuery
)
573 self
.assertTrue(receivedResponse
)
574 receivedQuery
.id = query
.id
575 self
.assertEquals(query
, receivedQuery
)
576 self
.assertEquals(receivedResponse
, response
)
578 class TestAdvancedOr(DNSDistTest
):
580 _config_template
= """
581 addAction(OrRule({QTypeRule("A"), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
582 newServer{address="127.0.0.1:%s"}
584 def testAAAAOverUDPReturnsNotImplemented(self
):
586 Advanced: A || UDP: AAAA
588 dnsdist is configured to reply 'not implemented' for query
590 We send an AAAA query over UDP and TCP, and check that the
591 response is 'not implemented' for UDP and OK for TCP.
593 name
= 'aorudp.advanced.tests.powerdns.com.'
594 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
595 response
= dns
.message
.make_response(query
)
596 rrset
= dns
.rrset
.from_text(name
,
601 response
.answer
.append(rrset
)
603 expectedResponse
= dns
.message
.make_response(query
)
604 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
606 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
607 self
.assertEquals(receivedResponse
, expectedResponse
)
609 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
610 self
.assertTrue(receivedQuery
)
611 self
.assertTrue(receivedResponse
)
612 receivedQuery
.id = query
.id
613 self
.assertEquals(query
, receivedQuery
)
614 self
.assertEquals(receivedResponse
, response
)
616 def testAOverUDPReturnsNotImplemented(self
):
618 Advanced: A || UDP: A
620 dnsdist is configured to reply 'not implemented' for query
622 We send an A query over UDP and TCP, and check that the
623 response is 'not implemented' for both.
625 name
= 'aorudp.advanced.tests.powerdns.com.'
626 query
= dns
.message
.make_query(name
, 'A', 'IN')
628 expectedResponse
= dns
.message
.make_response(query
)
629 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
631 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
632 self
.assertEquals(receivedResponse
, expectedResponse
)
634 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
635 self
.assertEquals(receivedResponse
, expectedResponse
)
638 class TestAdvancedLogAction(DNSDistTest
):
640 _config_template
= """
641 newServer{address="127.0.0.1:%s"}
642 addAction(AllRule(), LogAction("dnsdist.log", false))
644 def testAdvancedLogAction(self
):
646 Advanced: Log all queries
650 name
= 'logaction.advanced.tests.powerdns.com.'
651 query
= dns
.message
.make_query(name
, 'A', 'IN')
652 response
= dns
.message
.make_response(query
)
653 rrset
= dns
.rrset
.from_text(name
,
658 response
.answer
.append(rrset
)
660 for _
in range(count
):
661 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
662 self
.assertTrue(receivedQuery
)
663 self
.assertTrue(receivedResponse
)
664 receivedQuery
.id = query
.id
665 self
.assertEquals(query
, receivedQuery
)
666 self
.assertEquals(response
, receivedResponse
)
668 self
.assertTrue(os
.path
.isfile('dnsdist.log'))
669 self
.assertTrue(os
.stat('dnsdist.log').st_size
> 0)
671 class TestAdvancedDNSSEC(DNSDistTest
):
673 _config_template
= """
674 newServer{address="127.0.0.1:%s"}
675 addAction(DNSSECRule(), DropAction())
677 def testAdvancedDNSSECDrop(self
):
679 Advanced: DNSSEC Rule
682 name
= 'dnssec.advanced.tests.powerdns.com.'
683 query
= dns
.message
.make_query(name
, 'A', 'IN')
684 doquery
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
685 response
= dns
.message
.make_response(query
)
686 rrset
= dns
.rrset
.from_text(name
,
691 response
.answer
.append(rrset
)
693 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
694 self
.assertTrue(receivedQuery
)
695 self
.assertTrue(receivedResponse
)
696 receivedQuery
.id = query
.id
697 self
.assertEquals(query
, receivedQuery
)
698 self
.assertEquals(response
, receivedResponse
)
700 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
701 self
.assertTrue(receivedQuery
)
702 self
.assertTrue(receivedResponse
)
703 receivedQuery
.id = query
.id
704 self
.assertEquals(query
, receivedQuery
)
705 self
.assertEquals(response
, receivedResponse
)
707 (_
, receivedResponse
) = self
.sendUDPQuery(doquery
, response
)
708 self
.assertEquals(receivedResponse
, None)
709 (_
, receivedResponse
) = self
.sendTCPQuery(doquery
, response
)
710 self
.assertEquals(receivedResponse
, None)
712 class TestAdvancedQClass(DNSDistTest
):
714 _config_template
= """
715 newServer{address="127.0.0.1:%s"}
716 addAction(QClassRule(DNSClass.CHAOS), DropAction())
718 def testAdvancedQClassChaosDrop(self
):
720 Advanced: Drop QClass CHAOS
723 name
= 'qclasschaos.advanced.tests.powerdns.com.'
724 query
= dns
.message
.make_query(name
, 'TXT', 'CHAOS')
726 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
727 self
.assertEquals(receivedResponse
, None)
728 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
729 self
.assertEquals(receivedResponse
, None)
731 def testAdvancedQClassINAllow(self
):
733 Advanced: Allow QClass IN
736 name
= 'qclassin.advanced.tests.powerdns.com.'
737 query
= dns
.message
.make_query(name
, 'A', 'IN')
738 response
= dns
.message
.make_response(query
)
739 rrset
= dns
.rrset
.from_text(name
,
744 response
.answer
.append(rrset
)
746 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
747 self
.assertTrue(receivedQuery
)
748 self
.assertTrue(receivedResponse
)
749 receivedQuery
.id = query
.id
750 self
.assertEquals(query
, receivedQuery
)
751 self
.assertEquals(response
, receivedResponse
)
753 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
754 self
.assertTrue(receivedQuery
)
755 self
.assertTrue(receivedResponse
)
756 receivedQuery
.id = query
.id
757 self
.assertEquals(query
, receivedQuery
)
758 self
.assertEquals(response
, receivedResponse
)
760 class TestAdvancedOpcode(DNSDistTest
):
762 _config_template
= """
763 newServer{address="127.0.0.1:%s"}
764 addAction(OpcodeRule(DNSOpcode.Notify), DropAction())
766 def testAdvancedOpcodeNotifyDrop(self
):
768 Advanced: Drop Opcode NOTIFY
771 name
= 'opcodenotify.advanced.tests.powerdns.com.'
772 query
= dns
.message
.make_query(name
, 'A', 'IN')
773 query
.set_opcode(dns
.opcode
.NOTIFY
)
775 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
776 self
.assertEquals(receivedResponse
, None)
777 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
778 self
.assertEquals(receivedResponse
, None)
780 def testAdvancedOpcodeUpdateINAllow(self
):
782 Advanced: Allow Opcode UPDATE
785 name
= 'opcodeupdate.advanced.tests.powerdns.com.'
786 query
= dns
.message
.make_query(name
, 'A', 'IN')
787 query
.set_opcode(dns
.opcode
.UPDATE
)
788 response
= dns
.message
.make_response(query
)
789 rrset
= dns
.rrset
.from_text(name
,
794 response
.answer
.append(rrset
)
796 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
797 self
.assertTrue(receivedQuery
)
798 self
.assertTrue(receivedResponse
)
799 receivedQuery
.id = query
.id
800 self
.assertEquals(query
, receivedQuery
)
801 self
.assertEquals(response
, receivedResponse
)
803 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
804 self
.assertTrue(receivedQuery
)
805 self
.assertTrue(receivedResponse
)
806 receivedQuery
.id = query
.id
807 self
.assertEquals(query
, receivedQuery
)
808 self
.assertEquals(response
, receivedResponse
)
810 class TestAdvancedNonTerminalRule(DNSDistTest
):
812 _config_template
= """
813 newServer{address="127.0.0.1:%s", pool="real"}
814 addAction(AllRule(), DisableValidationAction())
815 addAction(AllRule(), PoolAction("real"))
816 addAction(AllRule(), DropAction())
818 def testAdvancedNonTerminalRules(self
):
820 Advanced: Non terminal rules
822 We check that DisableValidationAction() is applied
823 but does not stop the processing, then that
824 PoolAction() is applied _and_ stop the processing.
826 name
= 'nonterminal.advanced.tests.powerdns.com.'
827 query
= dns
.message
.make_query(name
, 'A', 'IN')
828 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
829 expectedQuery
.flags |
= dns
.flags
.CD
830 response
= dns
.message
.make_response(query
)
831 rrset
= dns
.rrset
.from_text(name
,
836 response
.answer
.append(rrset
)
838 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
839 self
.assertTrue(receivedQuery
)
840 self
.assertTrue(receivedResponse
)
841 receivedQuery
.id = expectedQuery
.id
842 self
.assertEquals(expectedQuery
, receivedQuery
)
843 self
.assertEquals(response
, receivedResponse
)
845 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
846 self
.assertTrue(receivedQuery
)
847 self
.assertTrue(receivedResponse
)
848 receivedQuery
.id = expectedQuery
.id
849 self
.assertEquals(expectedQuery
, receivedQuery
)
850 self
.assertEquals(response
, receivedResponse
)
852 class TestAdvancedStringOnlyServer(DNSDistTest
):
854 _config_template
= """
855 newServer("127.0.0.1:%s")
858 def testAdvancedStringOnlyServer(self
):
860 Advanced: "string-only" server is placed in the default pool
862 name
= 'string-only-server.advanced.tests.powerdns.com.'
863 query
= dns
.message
.make_query(name
, 'A', 'IN')
864 response
= dns
.message
.make_response(query
)
865 rrset
= dns
.rrset
.from_text(name
,
870 response
.answer
.append(rrset
)
872 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
873 self
.assertTrue(receivedQuery
)
874 self
.assertTrue(receivedResponse
)
875 receivedQuery
.id = query
.id
876 self
.assertEquals(query
, receivedQuery
)
877 self
.assertEquals(response
, receivedResponse
)
879 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
880 self
.assertTrue(receivedQuery
)
881 self
.assertTrue(receivedResponse
)
882 receivedQuery
.id = query
.id
883 self
.assertEquals(query
, receivedQuery
)
884 self
.assertEquals(response
, receivedResponse
)
886 class TestAdvancedRestoreFlagsOnSelfResponse(DNSDistTest
):
888 _config_template
= """
889 addAction(AllRule(), DisableValidationAction())
890 addAction(AllRule(), SpoofAction("192.0.2.1"))
891 newServer{address="127.0.0.1:%s"}
894 def testAdvancedRestoreFlagsOnSpoofResponse(self
):
896 Advanced: Restore flags on spoofed response
898 Send a query with CD flag cleared, dnsdist is
899 instructed to set it, then to spoof the response,
900 check that response has the flag cleared.
902 name
= 'spoofed.restoreflags.advanced.tests.powerdns.com.'
903 query
= dns
.message
.make_query(name
, 'A', 'IN')
904 # dnsdist set RA = RD for spoofed responses
905 query
.flags
&= ~dns
.flags
.RD
907 response
= dns
.message
.make_response(query
)
908 rrset
= dns
.rrset
.from_text(name
,
913 response
.answer
.append(rrset
)
915 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
916 self
.assertTrue(receivedResponse
)
917 self
.assertEquals(response
, receivedResponse
)
919 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
920 self
.assertTrue(receivedResponse
)
921 self
.assertEquals(response
, receivedResponse
)
923 class TestAdvancedQPS(DNSDistTest
):
925 _config_template
= """
926 addAction("qps.advanced.tests.powerdns.com", QPSAction(10))
927 newServer{address="127.0.0.1:%s"}
930 def testAdvancedQPSLimit(self
):
934 Send queries to "qps.advanced.tests.powerdns.com."
935 check that dnsdist drops queries when the max QPS has been reached.
938 name
= 'qps.advanced.tests.powerdns.com.'
939 query
= dns
.message
.make_query(name
, 'A', 'IN')
940 response
= dns
.message
.make_response(query
)
941 rrset
= dns
.rrset
.from_text(name
,
946 response
.answer
.append(rrset
)
948 for _
in range(maxQPS
):
949 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
950 receivedQuery
.id = query
.id
951 self
.assertEquals(query
, receivedQuery
)
952 self
.assertEquals(response
, receivedResponse
)
954 # we should now be dropped
955 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
956 self
.assertEquals(receivedResponse
, None)
960 # again, over TCP this time
961 for _
in range(maxQPS
):
962 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
963 receivedQuery
.id = query
.id
964 self
.assertEquals(query
, receivedQuery
)
965 self
.assertEquals(response
, receivedResponse
)
968 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
969 self
.assertEquals(receivedResponse
, None)
971 class TestAdvancedQPSNone(DNSDistTest
):
973 _config_template
= """
974 addAction("qpsnone.advanced.tests.powerdns.com", QPSAction(100))
975 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
976 newServer{address="127.0.0.1:%s"}
979 def testAdvancedQPSNone(self
):
981 Advanced: Not matching QPS returns None, not Allow
983 Send queries to "qps.advanced.tests.powerdns.com."
984 check that the rule returns None when the QPS has not been
987 name
= 'qpsnone.advanced.tests.powerdns.com.'
988 query
= dns
.message
.make_query(name
, 'A', 'IN')
989 expectedResponse
= dns
.message
.make_response(query
)
990 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
992 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
993 self
.assertEquals(receivedResponse
, expectedResponse
)
995 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
996 self
.assertEquals(receivedResponse
, expectedResponse
)
998 class TestAdvancedNMGRule(DNSDistTest
):
1000 _config_template
= """
1002 allowed:addMask("192.0.2.1/32")
1003 addAction(NotRule(NetmaskGroupRule(allowed)), RCodeAction(dnsdist.REFUSED))
1004 newServer{address="127.0.0.1:%s"}
1007 def testAdvancedNMGRule(self
):
1009 Advanced: NMGRule should refuse our queries
1011 Send queries to "nmgrule.advanced.tests.powerdns.com.",
1012 check that we are getting a REFUSED response.
1014 name
= 'nmgrule.advanced.tests.powerdns.com.'
1015 query
= dns
.message
.make_query(name
, 'A', 'IN')
1016 expectedResponse
= dns
.message
.make_response(query
)
1017 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1019 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1020 self
.assertEquals(receivedResponse
, expectedResponse
)
1022 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1023 self
.assertEquals(receivedResponse
, expectedResponse
)
1025 class TestDSTPortRule(DNSDistTest
):
1027 _config_params
= ['_dnsDistPort', '_testServerPort']
1028 _config_template
= """
1029 addAction(DSTPortRule(%d), RCodeAction(dnsdist.REFUSED))
1030 newServer{address="127.0.0.1:%s"}
1033 def testDSTPortRule(self
):
1035 Advanced: DSTPortRule should capture our queries
1037 Send queries to "dstportrule.advanced.tests.powerdns.com.",
1038 check that we are getting a REFUSED response.
1041 name
= 'dstportrule.advanced.tests.powerdns.com.'
1042 query
= dns
.message
.make_query(name
, 'A', 'IN')
1043 expectedResponse
= dns
.message
.make_response(query
)
1044 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1046 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1047 self
.assertEquals(receivedResponse
, expectedResponse
)
1049 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1050 self
.assertEquals(receivedResponse
, expectedResponse
)
1052 class TestAdvancedLabelsCountRule(DNSDistTest
):
1054 _config_template
= """
1055 addAction(QNameLabelsCountRule(5,6), RCodeAction(dnsdist.REFUSED))
1056 newServer{address="127.0.0.1:%s"}
1059 def testAdvancedLabelsCountRule(self
):
1061 Advanced: QNameLabelsCountRule(5,6)
1063 # 6 labels, we should be fine
1064 name
= 'ok.labelscount.advanced.tests.powerdns.com.'
1065 query
= dns
.message
.make_query(name
, 'A', 'IN')
1066 response
= dns
.message
.make_response(query
)
1067 rrset
= dns
.rrset
.from_text(name
,
1072 response
.answer
.append(rrset
)
1074 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1075 self
.assertTrue(receivedQuery
)
1076 self
.assertTrue(receivedResponse
)
1077 receivedQuery
.id = query
.id
1078 self
.assertEquals(query
, receivedQuery
)
1079 self
.assertEquals(response
, receivedResponse
)
1081 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1082 self
.assertTrue(receivedQuery
)
1083 self
.assertTrue(receivedResponse
)
1084 receivedQuery
.id = query
.id
1085 self
.assertEquals(query
, receivedQuery
)
1086 self
.assertEquals(response
, receivedResponse
)
1088 # more than 6 labels, the query should be refused
1089 name
= 'not.ok.labelscount.advanced.tests.powerdns.com.'
1090 query
= dns
.message
.make_query(name
, 'A', 'IN')
1091 expectedResponse
= dns
.message
.make_response(query
)
1092 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1094 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1095 self
.assertEquals(receivedResponse
, expectedResponse
)
1097 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1098 self
.assertEquals(receivedResponse
, expectedResponse
)
1100 # less than 5 labels, the query should be refused
1101 name
= 'labelscountadvanced.tests.powerdns.com.'
1102 query
= dns
.message
.make_query(name
, 'A', 'IN')
1103 expectedResponse
= dns
.message
.make_response(query
)
1104 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1106 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1107 self
.assertEquals(receivedResponse
, expectedResponse
)
1109 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1110 self
.assertEquals(receivedResponse
, expectedResponse
)
1112 class TestAdvancedWireLengthRule(DNSDistTest
):
1114 _config_template
= """
1115 addAction(QNameWireLengthRule(54,56), RCodeAction(dnsdist.REFUSED))
1116 newServer{address="127.0.0.1:%s"}
1119 def testAdvancedWireLengthRule(self
):
1121 Advanced: QNameWireLengthRule(54,56)
1123 name
= 'longenough.qnamewirelength.advanced.tests.powerdns.com.'
1124 query
= dns
.message
.make_query(name
, 'A', 'IN')
1125 response
= dns
.message
.make_response(query
)
1126 rrset
= dns
.rrset
.from_text(name
,
1131 response
.answer
.append(rrset
)
1133 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1134 self
.assertTrue(receivedQuery
)
1135 self
.assertTrue(receivedResponse
)
1136 receivedQuery
.id = query
.id
1137 self
.assertEquals(query
, receivedQuery
)
1138 self
.assertEquals(response
, receivedResponse
)
1140 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1141 self
.assertTrue(receivedQuery
)
1142 self
.assertTrue(receivedResponse
)
1143 receivedQuery
.id = query
.id
1144 self
.assertEquals(query
, receivedQuery
)
1145 self
.assertEquals(response
, receivedResponse
)
1147 # too short, the query should be refused
1148 name
= 'short.qnamewirelength.advanced.tests.powerdns.com.'
1149 query
= dns
.message
.make_query(name
, 'A', 'IN')
1150 expectedResponse
= dns
.message
.make_response(query
)
1151 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1153 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1154 self
.assertEquals(receivedResponse
, expectedResponse
)
1156 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1157 self
.assertEquals(receivedResponse
, expectedResponse
)
1159 # too long, the query should be refused
1160 name
= 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
1161 query
= dns
.message
.make_query(name
, 'A', 'IN')
1162 expectedResponse
= dns
.message
.make_response(query
)
1163 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1165 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1166 self
.assertEquals(receivedResponse
, expectedResponse
)
1168 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1169 self
.assertEquals(receivedResponse
, expectedResponse
)
1171 class TestAdvancedIncludeDir(DNSDistTest
):
1173 _config_template
= """
1174 -- this directory contains a file allowing includedir.advanced.tests.powerdns.com.
1175 includeDirectory('test-include-dir')
1176 newServer{address="127.0.0.1:%s"}
1179 def testAdvancedIncludeDirAllowed(self
):
1181 Advanced: includeDirectory()
1183 name
= 'includedir.advanced.tests.powerdns.com.'
1184 query
= dns
.message
.make_query(name
, 'A', 'IN')
1185 response
= dns
.message
.make_response(query
)
1186 rrset
= dns
.rrset
.from_text(name
,
1191 response
.answer
.append(rrset
)
1193 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1194 self
.assertTrue(receivedQuery
)
1195 self
.assertTrue(receivedResponse
)
1196 receivedQuery
.id = query
.id
1197 self
.assertEquals(query
, receivedQuery
)
1198 self
.assertEquals(response
, receivedResponse
)
1200 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1201 self
.assertTrue(receivedQuery
)
1202 self
.assertTrue(receivedResponse
)
1203 receivedQuery
.id = query
.id
1204 self
.assertEquals(query
, receivedQuery
)
1205 self
.assertEquals(response
, receivedResponse
)
1207 # this one should be refused
1208 name
= 'notincludedir.advanced.tests.powerdns.com.'
1209 query
= dns
.message
.make_query(name
, 'A', 'IN')
1210 expectedResponse
= dns
.message
.make_response(query
)
1211 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1213 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1214 self
.assertEquals(receivedResponse
, expectedResponse
)
1216 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1217 self
.assertEquals(receivedResponse
, expectedResponse
)
1219 class TestAdvancedLuaDO(DNSDistTest
):
1221 _config_template
= """
1222 function nxDOLua(dq)
1224 return DNSAction.Nxdomain, ""
1226 return DNSAction.None, ""
1228 addAction(AllRule(), LuaAction(nxDOLua))
1229 newServer{address="127.0.0.1:%s"}
1232 def testNxDOViaLua(self
):
1234 Advanced: Nx DO queries via Lua
1236 name
= 'nxdo.advanced.tests.powerdns.com.'
1237 query
= dns
.message
.make_query(name
, 'A', 'IN')
1238 response
= dns
.message
.make_response(query
)
1239 rrset
= dns
.rrset
.from_text(name
,
1244 response
.answer
.append(rrset
)
1245 queryWithDO
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
1246 doResponse
= dns
.message
.make_response(queryWithDO
)
1247 doResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
1250 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1251 self
.assertTrue(receivedQuery
)
1252 self
.assertTrue(receivedResponse
)
1253 receivedQuery
.id = query
.id
1254 self
.assertEquals(query
, receivedQuery
)
1255 self
.assertEquals(receivedResponse
, response
)
1257 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1258 self
.assertTrue(receivedQuery
)
1259 self
.assertTrue(receivedResponse
)
1260 receivedQuery
.id = query
.id
1261 self
.assertEquals(query
, receivedQuery
)
1262 self
.assertEquals(receivedResponse
, response
)
1265 (_
, receivedResponse
) = self
.sendUDPQuery(queryWithDO
, response
=None, useQueue
=False)
1266 self
.assertTrue(receivedResponse
)
1267 doResponse
.id = receivedResponse
.id
1268 self
.assertEquals(receivedResponse
, doResponse
)
1270 (_
, receivedResponse
) = self
.sendTCPQuery(queryWithDO
, response
=None, useQueue
=False)
1271 self
.assertTrue(receivedResponse
)
1272 doResponse
.id = receivedResponse
.id
1273 self
.assertEquals(receivedResponse
, doResponse
)
1275 class TestAdvancedLuaRefused(DNSDistTest
):
1277 _config_template
= """
1279 return DNSAction.Refused, ""
1281 addAction(AllRule(), LuaAction(refuse))
1282 newServer{address="127.0.0.1:%s"}
1285 def testRefusedViaLua(self
):
1287 Advanced: Refused via Lua
1289 name
= 'refused.advanced.tests.powerdns.com.'
1290 query
= dns
.message
.make_query(name
, 'A', 'IN')
1291 response
= dns
.message
.make_response(query
)
1292 rrset
= dns
.rrset
.from_text(name
,
1297 response
.answer
.append(rrset
)
1298 refusedResponse
= dns
.message
.make_response(query
)
1299 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1301 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1302 self
.assertTrue(receivedResponse
)
1303 refusedResponse
.id = receivedResponse
.id
1304 self
.assertEquals(receivedResponse
, refusedResponse
)
1306 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1307 self
.assertTrue(receivedResponse
)
1308 refusedResponse
.id = receivedResponse
.id
1309 self
.assertEquals(receivedResponse
, refusedResponse
)
1311 class TestAdvancedLuaActionReturnSyntax(DNSDistTest
):
1313 _config_template
= """
1315 return DNSAction.Refused
1317 addAction(AllRule(), LuaAction(refuse))
1318 newServer{address="127.0.0.1:%s"}
1321 def testRefusedWithEmptyRule(self
):
1323 Advanced: Short syntax for LuaAction return values
1325 name
= 'short.refused.advanced.tests.powerdns.com.'
1326 query
= dns
.message
.make_query(name
, 'A', 'IN')
1327 response
= dns
.message
.make_response(query
)
1328 rrset
= dns
.rrset
.from_text(name
,
1333 response
.answer
.append(rrset
)
1334 refusedResponse
= dns
.message
.make_response(query
)
1335 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1337 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1338 self
.assertTrue(receivedResponse
)
1339 refusedResponse
.id = receivedResponse
.id
1340 self
.assertEquals(receivedResponse
, refusedResponse
)
1342 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1343 self
.assertTrue(receivedResponse
)
1344 refusedResponse
.id = receivedResponse
.id
1345 self
.assertEquals(receivedResponse
, refusedResponse
)
1347 class TestAdvancedLuaTruncated(DNSDistTest
):
1349 _config_template
= """
1352 return DNSAction.Truncate, ""
1354 return DNSAction.None, ""
1356 addAction(AllRule(), LuaAction(trunc))
1357 newServer{address="127.0.0.1:%s"}
1360 def testTCViaLua(self
):
1362 Advanced: TC via Lua
1364 name
= 'tc.advanced.tests.powerdns.com.'
1365 query
= dns
.message
.make_query(name
, 'A', 'IN')
1366 response
= dns
.message
.make_response(query
)
1367 rrset
= dns
.rrset
.from_text(name
,
1372 response
.answer
.append(rrset
)
1374 truncatedResponse
= dns
.message
.make_response(query
)
1375 truncatedResponse
.flags |
= dns
.flags
.TC
1377 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1378 self
.assertTrue(receivedResponse
)
1379 truncatedResponse
.id = receivedResponse
.id
1380 self
.assertEquals(receivedResponse
, truncatedResponse
)
1382 # no truncation over TCP
1383 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1384 self
.assertTrue(receivedQuery
)
1385 self
.assertTrue(receivedResponse
)
1386 receivedQuery
.id = query
.id
1387 self
.assertEquals(query
, receivedQuery
)
1388 self
.assertEquals(receivedResponse
, response
)
1390 class TestStatNodeRespRingSince(DNSDistTest
):
1392 _consoleKey
= DNSDistTest
.generateConsoleKey()
1393 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
1394 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort']
1395 _config_template
= """
1397 controlSocket("127.0.0.1:%s")
1398 s1 = newServer{address="127.0.0.1:%s"}
1400 function visitor(node, self, childstat)
1401 table.insert(nodesSeen, node.fullname)
1405 def testStatNodeRespRingSince(self
):
1407 Advanced: StatNodeRespRing with optional since parameter
1410 name
= 'statnodesince.advanced.tests.powerdns.com.'
1411 query
= dns
.message
.make_query(name
, 'A', 'IN')
1412 response
= dns
.message
.make_response(query
)
1413 rrset
= dns
.rrset
.from_text(name
,
1418 response
.answer
.append(rrset
)
1420 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1421 self
.assertTrue(receivedQuery
)
1422 self
.assertTrue(receivedResponse
)
1423 receivedQuery
.id = query
.id
1424 self
.assertEquals(query
, receivedQuery
)
1425 self
.assertEquals(response
, receivedResponse
)
1427 self
.sendConsoleCommand("nodesSeen = {}")
1428 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1429 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1430 nodes
= nodes
.strip("\n")
1431 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1432 advanced.tests.powerdns.com.
1437 self
.sendConsoleCommand("nodesSeen = {}")
1438 self
.sendConsoleCommand("statNodeRespRing(visitor, 0)")
1439 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1440 nodes
= nodes
.strip("\n")
1441 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1442 advanced.tests.powerdns.com.
1449 self
.sendConsoleCommand("nodesSeen = {}")
1450 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1451 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1452 nodes
= nodes
.strip("\n")
1453 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1454 advanced.tests.powerdns.com.
1459 self
.sendConsoleCommand("nodesSeen = {}")
1460 self
.sendConsoleCommand("statNodeRespRing(visitor, 5)")
1461 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1462 nodes
= nodes
.strip("\n")
1463 self
.assertEquals(nodes
, """""")
1465 self
.sendConsoleCommand("nodesSeen = {}")
1466 self
.sendConsoleCommand("statNodeRespRing(visitor, 10)")
1467 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1468 nodes
= nodes
.strip("\n")
1469 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1470 advanced.tests.powerdns.com.
1475 class TestAdvancedRD(DNSDistTest
):
1477 _config_template
= """
1478 addAction(RDRule(), RCodeAction(dnsdist.REFUSED))
1479 newServer{address="127.0.0.1:%s"}
1482 def testAdvancedRDRefused(self
):
1484 Advanced: RD query is refused
1486 name
= 'rd.advanced.tests.powerdns.com.'
1487 query
= dns
.message
.make_query(name
, 'A', 'IN')
1488 expectedResponse
= dns
.message
.make_response(query
)
1489 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1491 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1492 self
.assertEquals(receivedResponse
, expectedResponse
)
1494 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1495 self
.assertEquals(receivedResponse
, expectedResponse
)
1497 def testAdvancedNoRDAllowed(self
):
1499 Advanced: No-RD query is allowed
1501 name
= 'no-rd.advanced.tests.powerdns.com.'
1502 query
= dns
.message
.make_query(name
, 'A', 'IN')
1503 query
.flags
&= ~dns
.flags
.RD
1504 response
= dns
.message
.make_response(query
)
1506 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1507 receivedQuery
.id = query
.id
1508 self
.assertEquals(receivedQuery
, query
)
1509 self
.assertEquals(receivedResponse
, response
)
1511 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1512 receivedQuery
.id = query
.id
1513 self
.assertEquals(receivedQuery
, query
)
1514 self
.assertEquals(receivedResponse
, response
)
1516 class TestAdvancedGetLocalPort(DNSDistTest
):
1518 _config_template
= """
1519 function answerBasedOnLocalPort(dq)
1520 local port = dq.localaddr:getPort()
1521 return DNSAction.Spoof, "port-was-"..port..".local-port.advanced.tests.powerdns.com."
1523 addAction("local-port.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1524 newServer{address="127.0.0.1:%s"}
1527 def testAdvancedGetLocalPort(self
):
1529 Advanced: Return CNAME containing the local port
1531 name
= 'local-port.advanced.tests.powerdns.com.'
1532 query
= dns
.message
.make_query(name
, 'A', 'IN')
1533 # dnsdist set RA = RD for spoofed responses
1534 query
.flags
&= ~dns
.flags
.RD
1536 response
= dns
.message
.make_response(query
)
1537 rrset
= dns
.rrset
.from_text(name
,
1540 dns
.rdatatype
.CNAME
,
1541 'port-was-{}.local-port.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1542 response
.answer
.append(rrset
)
1544 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1545 self
.assertEquals(receivedResponse
, response
)
1547 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1548 self
.assertEquals(receivedResponse
, response
)
1550 class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest
):
1552 _config_template
= """
1553 function answerBasedOnLocalPort(dq)
1554 local port = dq.localaddr:getPort()
1555 return DNSAction.Spoof, "port-was-"..port..".local-port-any.advanced.tests.powerdns.com."
1557 addAction("local-port-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1558 newServer{address="127.0.0.1:%s"}
1560 _dnsDistListeningAddr
= "0.0.0.0"
1562 def testAdvancedGetLocalPortOnAnyBind(self
):
1564 Advanced: Return CNAME containing the local port for an ANY bind
1566 name
= 'local-port-any.advanced.tests.powerdns.com.'
1567 query
= dns
.message
.make_query(name
, 'A', 'IN')
1568 # dnsdist set RA = RD for spoofed responses
1569 query
.flags
&= ~dns
.flags
.RD
1571 response
= dns
.message
.make_response(query
)
1572 rrset
= dns
.rrset
.from_text(name
,
1575 dns
.rdatatype
.CNAME
,
1576 'port-was-{}.local-port-any.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1577 response
.answer
.append(rrset
)
1579 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1580 self
.assertEquals(receivedResponse
, response
)
1582 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1583 self
.assertEquals(receivedResponse
, response
)
1585 class TestAdvancedGetLocalAddressOnAnyBind(DNSDistTest
):
1587 _config_template
= """
1588 function answerBasedOnLocalAddress(dq)
1589 local dest = dq.localaddr:toString()
1590 local i, j = string.find(dest, "[0-9.]+")
1591 local addr = string.sub(dest, i, j)
1592 local dashAddr = string.gsub(addr, "[.]", "-")
1593 return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
1595 addAction("local-address-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
1596 newServer{address="127.0.0.1:%s"}
1598 _dnsDistListeningAddr
= "0.0.0.0"
1600 def testAdvancedGetLocalAddressOnAnyBind(self
):
1602 Advanced: Return CNAME containing the local address for an ANY bind
1604 name
= 'local-address-any.advanced.tests.powerdns.com.'
1605 query
= dns
.message
.make_query(name
, 'A', 'IN')
1606 # dnsdist set RA = RD for spoofed responses
1607 query
.flags
&= ~dns
.flags
.RD
1609 response
= dns
.message
.make_response(query
)
1610 rrset
= dns
.rrset
.from_text(name
,
1613 dns
.rdatatype
.CNAME
,
1614 'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.')
1615 response
.answer
.append(rrset
)
1617 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1618 self
.assertEquals(receivedResponse
, response
)
1620 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1621 self
.assertEquals(receivedResponse
, response
)
1623 class TestAdvancedLuaTempFailureTTL(DNSDistTest
):
1625 _config_template
= """
1626 function testAction(dq)
1627 if dq.tempFailureTTL ~= nil then
1628 return DNSAction.Spoof, "initially.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1630 dq.tempFailureTTL = 30
1631 if dq.tempFailureTTL ~= 30 then
1632 return DNSAction.Spoof, "after.set.not.expected.value.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1634 dq.tempFailureTTL = nil
1635 if dq.tempFailureTTL ~= nil then
1636 return DNSAction.Spoof, "after.unset.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1638 return DNSAction.None, ""
1640 addAction(AllRule(), LuaAction(testAction))
1641 newServer{address="127.0.0.1:%s"}
1644 def testTempFailureTTLBinding(self
):
1646 Advanced: Exercise dq.tempFailureTTL Lua binding
1648 name
= 'tempfailurettlbinding.advanced.tests.powerdns.com.'
1649 query
= dns
.message
.make_query(name
, 'A', 'IN')
1650 response
= dns
.message
.make_response(query
)
1651 rrset
= dns
.rrset
.from_text(name
,
1656 response
.answer
.append(rrset
)
1658 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1659 self
.assertTrue(receivedQuery
)
1660 self
.assertTrue(receivedResponse
)
1661 receivedQuery
.id = query
.id
1662 self
.assertEquals(query
, receivedQuery
)
1663 self
.assertEquals(receivedResponse
, response
)
1665 class TestAdvancedEDNSOptionRule(DNSDistTest
):
1667 _config_template
= """
1668 newServer{address="127.0.0.1:%s"}
1669 addAction(EDNSOptionRule(EDNSOptionCode.ECS), DropAction())
1672 def testDropped(self
):
1674 Advanced: A question with ECS is dropped
1677 name
= 'ednsoptionrule.advanced.tests.powerdns.com.'
1679 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
1680 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
1682 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
1683 self
.assertEquals(receivedResponse
, None)
1684 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
1685 self
.assertEquals(receivedResponse
, None)
1687 def testReplied(self
):
1689 Advanced: A question without ECS is answered
1692 name
= 'ednsoptionrule.advanced.tests.powerdns.com.'
1695 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[], payload
=512)
1696 response
= dns
.message
.make_response(query
)
1698 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1699 self
.assertTrue(receivedQuery
)
1700 self
.assertTrue(receivedResponse
)
1702 receivedQuery
.id = query
.id
1703 self
.assertEquals(query
, receivedQuery
)
1704 self
.assertEquals(receivedResponse
, response
)
1706 # and with no EDNS at all
1707 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
1708 response
= dns
.message
.make_response(query
)
1710 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1711 self
.assertTrue(receivedQuery
)
1712 self
.assertTrue(receivedResponse
)
1714 receivedQuery
.id = query
.id
1715 self
.assertEquals(query
, receivedQuery
)
1716 self
.assertEquals(receivedResponse
, response
)