]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Advanced.py
3 from datetime
import datetime
, timedelta
8 from dnsdisttests
import DNSDistTest
10 class TestAdvancedAllow(DNSDistTest
):
12 _config_template
= """
13 addAction(AllRule(), NoneAction())
14 addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
15 addAction(AllRule(), DropAction())
16 newServer{address="127.0.0.1:%s"}
19 def testAdvancedAllow(self
):
21 Advanced: Allowed qname is not dropped
23 A query for allowed.advanced.tests.powerdns.com. should be allowed
24 while others should be dropped.
26 name
= 'allowed.advanced.tests.powerdns.com.'
27 query
= dns
.message
.make_query(name
, 'A', 'IN')
28 response
= dns
.message
.make_response(query
)
29 rrset
= dns
.rrset
.from_text(name
,
34 response
.answer
.append(rrset
)
36 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
37 self
.assertTrue(receivedQuery
)
38 self
.assertTrue(receivedResponse
)
39 receivedQuery
.id = query
.id
40 self
.assertEquals(query
, receivedQuery
)
41 self
.assertEquals(response
, receivedResponse
)
43 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
44 self
.assertTrue(receivedQuery
)
45 self
.assertTrue(receivedResponse
)
46 receivedQuery
.id = query
.id
47 self
.assertEquals(query
, receivedQuery
)
48 self
.assertEquals(response
, receivedResponse
)
50 def testAdvancedAllowDropped(self
):
52 Advanced: Not allowed qname is dropped
54 A query for notallowed.advanced.tests.powerdns.com. should be dropped.
56 name
= 'notallowed.advanced.tests.powerdns.com.'
57 query
= dns
.message
.make_query(name
, 'A', 'IN')
58 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
59 self
.assertEquals(receivedResponse
, None)
61 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
62 self
.assertEquals(receivedResponse
, None)
64 class TestAdvancedFixupCase(DNSDistTest
):
66 _config_template
= """
69 newServer{address="127.0.0.1:%s"}
72 def testAdvancedFixupCase(self
):
76 Send a query with lower and upper chars,
77 make the backend return a lowercase version,
78 check that dnsdist fixes the response.
80 name
= 'fiXuPCasE.advanced.tests.powerdns.com.'
81 query
= dns
.message
.make_query(name
, 'A', 'IN')
82 lowercasequery
= dns
.message
.make_query(name
.lower(), 'A', 'IN')
83 response
= dns
.message
.make_response(lowercasequery
)
84 expectedResponse
= dns
.message
.make_response(query
)
85 rrset
= dns
.rrset
.from_text(name
,
90 response
.answer
.append(rrset
)
91 expectedResponse
.answer
.append(rrset
)
93 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
94 self
.assertTrue(receivedQuery
)
95 self
.assertTrue(receivedResponse
)
96 receivedQuery
.id = query
.id
97 self
.assertEquals(query
, receivedQuery
)
98 self
.assertEquals(expectedResponse
, receivedResponse
)
100 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
101 self
.assertTrue(receivedQuery
)
102 self
.assertTrue(receivedResponse
)
103 receivedQuery
.id = query
.id
104 self
.assertEquals(query
, receivedQuery
)
105 self
.assertEquals(expectedResponse
, receivedResponse
)
108 class TestAdvancedRemoveRD(DNSDistTest
):
110 _config_template
= """
111 addAction("norecurse.advanced.tests.powerdns.com.", NoRecurseAction())
112 newServer{address="127.0.0.1:%s"}
115 def testAdvancedNoRD(self
):
119 Send a query with RD,
120 check that dnsdist clears the RD flag.
122 name
= 'norecurse.advanced.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN')
124 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
125 expectedQuery
.flags
&= ~dns
.flags
.RD
127 response
= dns
.message
.make_response(query
)
128 rrset
= dns
.rrset
.from_text(name
,
133 response
.answer
.append(rrset
)
135 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
136 self
.assertTrue(receivedQuery
)
137 self
.assertTrue(receivedResponse
)
138 receivedQuery
.id = expectedQuery
.id
139 self
.assertEquals(expectedQuery
, receivedQuery
)
140 self
.assertEquals(response
, receivedResponse
)
142 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
143 self
.assertTrue(receivedQuery
)
144 self
.assertTrue(receivedResponse
)
145 receivedQuery
.id = expectedQuery
.id
146 self
.assertEquals(expectedQuery
, receivedQuery
)
147 self
.assertEquals(response
, receivedResponse
)
149 def testAdvancedKeepRD(self
):
151 Advanced: No RD canary
153 Send a query with RD for a canary domain,
154 check that dnsdist does not clear the RD flag.
156 name
= 'keeprecurse.advanced.tests.powerdns.com.'
157 query
= dns
.message
.make_query(name
, 'A', 'IN')
159 response
= dns
.message
.make_response(query
)
160 rrset
= dns
.rrset
.from_text(name
,
165 response
.answer
.append(rrset
)
167 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
168 self
.assertTrue(receivedQuery
)
169 self
.assertTrue(receivedResponse
)
170 receivedQuery
.id = query
.id
171 self
.assertEquals(query
, receivedQuery
)
172 self
.assertEquals(response
, receivedResponse
)
174 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
175 self
.assertTrue(receivedQuery
)
176 self
.assertTrue(receivedResponse
)
177 receivedQuery
.id = query
.id
178 self
.assertEquals(query
, receivedQuery
)
179 self
.assertEquals(response
, receivedResponse
)
182 class TestAdvancedAddCD(DNSDistTest
):
184 _config_template
= """
185 addAction("setcd.advanced.tests.powerdns.com.", DisableValidationAction())
186 addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
187 newServer{address="127.0.0.1:%s"}
190 def testAdvancedSetCD(self
):
194 Send a query with CD cleared,
195 check that dnsdist set the CD flag.
197 name
= 'setcd.advanced.tests.powerdns.com.'
198 query
= dns
.message
.make_query(name
, 'A', 'IN')
199 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
200 expectedQuery
.flags |
= dns
.flags
.CD
202 response
= dns
.message
.make_response(query
)
203 rrset
= dns
.rrset
.from_text(name
,
208 response
.answer
.append(rrset
)
210 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
211 self
.assertTrue(receivedQuery
)
212 self
.assertTrue(receivedResponse
)
213 receivedQuery
.id = expectedQuery
.id
214 self
.assertEquals(expectedQuery
, receivedQuery
)
215 self
.assertEquals(response
, receivedResponse
)
217 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
218 self
.assertTrue(receivedQuery
)
219 self
.assertTrue(receivedResponse
)
220 receivedQuery
.id = expectedQuery
.id
221 self
.assertEquals(expectedQuery
, receivedQuery
)
222 self
.assertEquals(response
, receivedResponse
)
224 def testAdvancedSetCDViaAction(self
):
226 Advanced: Set CD via Action
228 Send a query with CD cleared,
229 check that dnsdist set the CD flag.
231 name
= 'setcdviaaction.advanced.tests.powerdns.com.'
232 query
= dns
.message
.make_query(name
, 'A', 'IN')
233 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
234 expectedQuery
.flags |
= dns
.flags
.CD
236 response
= dns
.message
.make_response(query
)
237 rrset
= dns
.rrset
.from_text(name
,
242 response
.answer
.append(rrset
)
244 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
245 self
.assertTrue(receivedQuery
)
246 self
.assertTrue(receivedResponse
)
247 receivedQuery
.id = expectedQuery
.id
248 self
.assertEquals(expectedQuery
, receivedQuery
)
249 self
.assertEquals(response
, receivedResponse
)
251 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
252 self
.assertTrue(receivedQuery
)
253 self
.assertTrue(receivedResponse
)
254 receivedQuery
.id = expectedQuery
.id
255 self
.assertEquals(expectedQuery
, receivedQuery
)
256 self
.assertEquals(response
, receivedResponse
)
258 def testAdvancedKeepNoCD(self
):
260 Advanced: Preserve CD canary
262 Send a query without CD for a canary domain,
263 check that dnsdist does not set the CD flag.
265 name
= 'keepnocd.advanced.tests.powerdns.com.'
266 query
= dns
.message
.make_query(name
, 'A', 'IN')
268 response
= dns
.message
.make_response(query
)
269 rrset
= dns
.rrset
.from_text(name
,
274 response
.answer
.append(rrset
)
276 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = query
.id
280 self
.assertEquals(query
, receivedQuery
)
281 self
.assertEquals(response
, receivedResponse
)
283 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
284 self
.assertTrue(receivedQuery
)
285 self
.assertTrue(receivedResponse
)
286 receivedQuery
.id = query
.id
287 self
.assertEquals(query
, receivedQuery
)
288 self
.assertEquals(response
, receivedResponse
)
290 class TestAdvancedClearRD(DNSDistTest
):
292 _config_template
= """
293 addAction("clearrd.advanced.tests.powerdns.com.", NoRecurseAction())
294 addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
295 newServer{address="127.0.0.1:%s"}
298 def testAdvancedClearRD(self
):
302 Send a query with RD set,
303 check that dnsdist clears the RD flag.
305 name
= 'clearrd.advanced.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN')
307 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
308 expectedQuery
.flags
&= ~dns
.flags
.RD
310 response
= dns
.message
.make_response(query
)
311 rrset
= dns
.rrset
.from_text(name
,
316 response
.answer
.append(rrset
)
318 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
319 self
.assertTrue(receivedQuery
)
320 self
.assertTrue(receivedResponse
)
321 receivedQuery
.id = expectedQuery
.id
322 self
.assertEquals(expectedQuery
, receivedQuery
)
323 self
.assertEquals(response
, receivedResponse
)
325 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
326 self
.assertTrue(receivedQuery
)
327 self
.assertTrue(receivedResponse
)
328 receivedQuery
.id = expectedQuery
.id
329 self
.assertEquals(expectedQuery
, receivedQuery
)
330 self
.assertEquals(response
, receivedResponse
)
332 def testAdvancedClearRDViaAction(self
):
334 Advanced: Clear RD via Action
336 Send a query with RD set,
337 check that dnsdist clears the RD flag.
339 name
= 'clearrdviaaction.advanced.tests.powerdns.com.'
340 query
= dns
.message
.make_query(name
, 'A', 'IN')
341 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
342 expectedQuery
.flags
&= ~dns
.flags
.RD
344 response
= dns
.message
.make_response(query
)
345 rrset
= dns
.rrset
.from_text(name
,
350 response
.answer
.append(rrset
)
352 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
353 self
.assertTrue(receivedQuery
)
354 self
.assertTrue(receivedResponse
)
355 receivedQuery
.id = expectedQuery
.id
356 self
.assertEquals(expectedQuery
, receivedQuery
)
357 self
.assertEquals(response
, receivedResponse
)
359 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
360 self
.assertTrue(receivedQuery
)
361 self
.assertTrue(receivedResponse
)
362 receivedQuery
.id = expectedQuery
.id
363 self
.assertEquals(expectedQuery
, receivedQuery
)
364 self
.assertEquals(response
, receivedResponse
)
366 def testAdvancedKeepRD(self
):
368 Advanced: Preserve RD canary
370 Send a query with RD for a canary domain,
371 check that dnsdist does not clear the RD flag.
373 name
= 'keeprd.advanced.tests.powerdns.com.'
374 query
= dns
.message
.make_query(name
, 'A', 'IN')
376 response
= dns
.message
.make_response(query
)
377 rrset
= dns
.rrset
.from_text(name
,
382 response
.answer
.append(rrset
)
384 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
385 self
.assertTrue(receivedQuery
)
386 self
.assertTrue(receivedResponse
)
387 receivedQuery
.id = query
.id
388 self
.assertEquals(query
, receivedQuery
)
389 self
.assertEquals(response
, receivedResponse
)
391 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
392 self
.assertTrue(receivedQuery
)
393 self
.assertTrue(receivedResponse
)
394 receivedQuery
.id = query
.id
395 self
.assertEquals(query
, receivedQuery
)
396 self
.assertEquals(response
, receivedResponse
)
399 class TestAdvancedACL(DNSDistTest
):
401 _config_template
= """
402 newServer{address="127.0.0.1:%s"}
404 _acl
= ['192.0.2.1/32']
406 def testACLBlocked(self
):
408 Advanced: ACL blocked
410 Send an A query to "tests.powerdns.com.",
411 we expect no response since 127.0.0.1 is not on the
414 name
= 'tests.powerdns.com.'
415 query
= dns
.message
.make_query(name
, 'A', 'IN')
417 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
418 self
.assertEquals(receivedResponse
, None)
420 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
421 self
.assertEquals(receivedResponse
, None)
423 class TestAdvancedDelay(DNSDistTest
):
425 _config_template
= """
426 addAction(AllRule(), DelayAction(1000))
427 newServer{address="127.0.0.1:%s"}
430 def testDelayed(self
):
434 Send an A query to "tests.powerdns.com.",
435 check that the response delay is longer than 1000 ms
436 over UDP, less than that over TCP.
438 name
= 'tests.powerdns.com.'
439 query
= dns
.message
.make_query(name
, 'A', 'IN')
440 response
= dns
.message
.make_response(query
)
441 rrset
= dns
.rrset
.from_text(name
,
446 response
.answer
.append(rrset
)
448 begin
= datetime
.now()
449 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
451 receivedQuery
.id = query
.id
452 self
.assertEquals(query
, receivedQuery
)
453 self
.assertEquals(response
, receivedResponse
)
454 self
.assertTrue((end
- begin
) > timedelta(0, 1))
456 begin
= datetime
.now()
457 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
459 receivedQuery
.id = query
.id
460 self
.assertEquals(query
, receivedQuery
)
461 self
.assertEquals(response
, receivedResponse
)
462 self
.assertTrue((end
- begin
) < timedelta(0, 1))
465 class TestAdvancedTruncateAnyAndTCP(DNSDistTest
):
467 _config_template
= """
469 addAction(AndRule({QTypeRule("ANY"), TCPRule(true)}), TCAction())
470 newServer{address="127.0.0.1:%s"}
472 def testTruncateAnyOverTCP(self
):
474 Advanced: Truncate ANY over TCP
476 Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
477 should be truncated over TCP, not over UDP (yes, it makes no sense,
480 name
= 'anytruncatetcp.advanced.tests.powerdns.com.'
481 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
483 response
= dns
.message
.make_response(query
)
484 rrset
= dns
.rrset
.from_text(name
,
490 response
.answer
.append(rrset
)
492 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
493 self
.assertTrue(receivedQuery
)
494 self
.assertTrue(receivedResponse
)
495 receivedQuery
.id = query
.id
496 self
.assertEquals(query
, receivedQuery
)
497 self
.assertEquals(receivedResponse
, response
)
499 expectedResponse
= dns
.message
.make_response(query
)
500 expectedResponse
.flags |
= dns
.flags
.TC
502 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
503 self
.assertEquals(receivedResponse
, expectedResponse
)
505 class TestAdvancedAndNot(DNSDistTest
):
507 _config_template
= """
508 addAction(AndRule({NotRule(QTypeRule("A")), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
509 newServer{address="127.0.0.1:%s"}
511 def testAOverUDPReturnsNotImplementedCanary(self
):
513 Advanced: !A && UDP canary
515 dnsdist is configured to reply 'not implemented' for query
516 over UDP AND !qtype A.
517 We send an A query over UDP and TCP, and check that the
520 name
= 'andnot.advanced.tests.powerdns.com.'
521 query
= dns
.message
.make_query(name
, 'A', 'IN')
522 response
= dns
.message
.make_response(query
)
523 rrset
= dns
.rrset
.from_text(name
,
528 response
.answer
.append(rrset
)
530 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
531 self
.assertTrue(receivedQuery
)
532 self
.assertTrue(receivedResponse
)
533 receivedQuery
.id = query
.id
534 self
.assertEquals(query
, receivedQuery
)
535 self
.assertEquals(receivedResponse
, response
)
537 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
538 self
.assertTrue(receivedQuery
)
539 self
.assertTrue(receivedResponse
)
540 receivedQuery
.id = query
.id
541 self
.assertEquals(query
, receivedQuery
)
542 self
.assertEquals(receivedResponse
, response
)
544 def testAOverUDPReturnsNotImplemented(self
):
548 dnsdist is configured to reply 'not implemented' for query
549 over UDP AND !qtype A.
550 We send a TXT query over UDP and TCP, and check that the
551 response is OK for TCP and 'not implemented' for UDP.
553 name
= 'andnot.advanced.tests.powerdns.com.'
554 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
556 expectedResponse
= dns
.message
.make_response(query
)
557 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
559 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
560 self
.assertEquals(receivedResponse
, expectedResponse
)
562 response
= dns
.message
.make_response(query
)
563 rrset
= dns
.rrset
.from_text(name
,
567 'nothing to see here')
568 response
.answer
.append(rrset
)
570 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
571 self
.assertTrue(receivedQuery
)
572 self
.assertTrue(receivedResponse
)
573 receivedQuery
.id = query
.id
574 self
.assertEquals(query
, receivedQuery
)
575 self
.assertEquals(receivedResponse
, response
)
577 class TestAdvancedOr(DNSDistTest
):
579 _config_template
= """
580 addAction(OrRule({QTypeRule("A"), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
581 newServer{address="127.0.0.1:%s"}
583 def testAAAAOverUDPReturnsNotImplemented(self
):
585 Advanced: A || UDP: AAAA
587 dnsdist is configured to reply 'not implemented' for query
589 We send an AAAA query over UDP and TCP, and check that the
590 response is 'not implemented' for UDP and OK for TCP.
592 name
= 'aorudp.advanced.tests.powerdns.com.'
593 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
594 response
= dns
.message
.make_response(query
)
595 rrset
= dns
.rrset
.from_text(name
,
600 response
.answer
.append(rrset
)
602 expectedResponse
= dns
.message
.make_response(query
)
603 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
605 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
606 self
.assertEquals(receivedResponse
, expectedResponse
)
608 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
609 self
.assertTrue(receivedQuery
)
610 self
.assertTrue(receivedResponse
)
611 receivedQuery
.id = query
.id
612 self
.assertEquals(query
, receivedQuery
)
613 self
.assertEquals(receivedResponse
, response
)
615 def testAOverUDPReturnsNotImplemented(self
):
617 Advanced: A || UDP: A
619 dnsdist is configured to reply 'not implemented' for query
621 We send an A query over UDP and TCP, and check that the
622 response is 'not implemented' for both.
624 name
= 'aorudp.advanced.tests.powerdns.com.'
625 query
= dns
.message
.make_query(name
, 'A', 'IN')
627 expectedResponse
= dns
.message
.make_response(query
)
628 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
630 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
631 self
.assertEquals(receivedResponse
, expectedResponse
)
633 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
634 self
.assertEquals(receivedResponse
, expectedResponse
)
637 class TestAdvancedLogAction(DNSDistTest
):
639 _config_template
= """
640 newServer{address="127.0.0.1:%s"}
641 addAction(AllRule(), LogAction("dnsdist.log", false))
643 def testAdvancedLogAction(self
):
645 Advanced: Log all queries
649 name
= 'logaction.advanced.tests.powerdns.com.'
650 query
= dns
.message
.make_query(name
, 'A', 'IN')
651 response
= dns
.message
.make_response(query
)
652 rrset
= dns
.rrset
.from_text(name
,
657 response
.answer
.append(rrset
)
659 for _
in range(count
):
660 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
661 self
.assertTrue(receivedQuery
)
662 self
.assertTrue(receivedResponse
)
663 receivedQuery
.id = query
.id
664 self
.assertEquals(query
, receivedQuery
)
665 self
.assertEquals(response
, receivedResponse
)
667 self
.assertTrue(os
.path
.isfile('dnsdist.log'))
668 self
.assertTrue(os
.stat('dnsdist.log').st_size
> 0)
670 class TestAdvancedDNSSEC(DNSDistTest
):
672 _config_template
= """
673 newServer{address="127.0.0.1:%s"}
674 addAction(DNSSECRule(), DropAction())
676 def testAdvancedDNSSECDrop(self
):
678 Advanced: DNSSEC Rule
681 name
= 'dnssec.advanced.tests.powerdns.com.'
682 query
= dns
.message
.make_query(name
, 'A', 'IN')
683 doquery
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
684 response
= dns
.message
.make_response(query
)
685 rrset
= dns
.rrset
.from_text(name
,
690 response
.answer
.append(rrset
)
692 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
693 self
.assertTrue(receivedQuery
)
694 self
.assertTrue(receivedResponse
)
695 receivedQuery
.id = query
.id
696 self
.assertEquals(query
, receivedQuery
)
697 self
.assertEquals(response
, receivedResponse
)
699 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
700 self
.assertTrue(receivedQuery
)
701 self
.assertTrue(receivedResponse
)
702 receivedQuery
.id = query
.id
703 self
.assertEquals(query
, receivedQuery
)
704 self
.assertEquals(response
, receivedResponse
)
706 (_
, receivedResponse
) = self
.sendUDPQuery(doquery
, response
)
707 self
.assertEquals(receivedResponse
, None)
708 (_
, receivedResponse
) = self
.sendTCPQuery(doquery
, response
)
709 self
.assertEquals(receivedResponse
, None)
711 class TestAdvancedQClass(DNSDistTest
):
713 _config_template
= """
714 newServer{address="127.0.0.1:%s"}
715 addAction(QClassRule(DNSClass.CHAOS), DropAction())
717 def testAdvancedQClassChaosDrop(self
):
719 Advanced: Drop QClass CHAOS
722 name
= 'qclasschaos.advanced.tests.powerdns.com.'
723 query
= dns
.message
.make_query(name
, 'TXT', 'CHAOS')
725 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
726 self
.assertEquals(receivedResponse
, None)
727 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
728 self
.assertEquals(receivedResponse
, None)
730 def testAdvancedQClassINAllow(self
):
732 Advanced: Allow QClass IN
735 name
= 'qclassin.advanced.tests.powerdns.com.'
736 query
= dns
.message
.make_query(name
, 'A', 'IN')
737 response
= dns
.message
.make_response(query
)
738 rrset
= dns
.rrset
.from_text(name
,
743 response
.answer
.append(rrset
)
745 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
746 self
.assertTrue(receivedQuery
)
747 self
.assertTrue(receivedResponse
)
748 receivedQuery
.id = query
.id
749 self
.assertEquals(query
, receivedQuery
)
750 self
.assertEquals(response
, receivedResponse
)
752 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
753 self
.assertTrue(receivedQuery
)
754 self
.assertTrue(receivedResponse
)
755 receivedQuery
.id = query
.id
756 self
.assertEquals(query
, receivedQuery
)
757 self
.assertEquals(response
, receivedResponse
)
759 class TestAdvancedOpcode(DNSDistTest
):
761 _config_template
= """
762 newServer{address="127.0.0.1:%s"}
763 addAction(OpcodeRule(DNSOpcode.Notify), DropAction())
765 def testAdvancedOpcodeNotifyDrop(self
):
767 Advanced: Drop Opcode NOTIFY
770 name
= 'opcodenotify.advanced.tests.powerdns.com.'
771 query
= dns
.message
.make_query(name
, 'A', 'IN')
772 query
.set_opcode(dns
.opcode
.NOTIFY
)
774 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
775 self
.assertEquals(receivedResponse
, None)
776 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
777 self
.assertEquals(receivedResponse
, None)
779 def testAdvancedOpcodeUpdateINAllow(self
):
781 Advanced: Allow Opcode UPDATE
784 name
= 'opcodeupdate.advanced.tests.powerdns.com.'
785 query
= dns
.message
.make_query(name
, 'A', 'IN')
786 query
.set_opcode(dns
.opcode
.UPDATE
)
787 response
= dns
.message
.make_response(query
)
788 rrset
= dns
.rrset
.from_text(name
,
793 response
.answer
.append(rrset
)
795 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
796 self
.assertTrue(receivedQuery
)
797 self
.assertTrue(receivedResponse
)
798 receivedQuery
.id = query
.id
799 self
.assertEquals(query
, receivedQuery
)
800 self
.assertEquals(response
, receivedResponse
)
802 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
803 self
.assertTrue(receivedQuery
)
804 self
.assertTrue(receivedResponse
)
805 receivedQuery
.id = query
.id
806 self
.assertEquals(query
, receivedQuery
)
807 self
.assertEquals(response
, receivedResponse
)
809 class TestAdvancedNonTerminalRule(DNSDistTest
):
811 _config_template
= """
812 newServer{address="127.0.0.1:%s", pool="real"}
813 addAction(AllRule(), DisableValidationAction())
814 addAction(AllRule(), PoolAction("real"))
815 addAction(AllRule(), DropAction())
817 def testAdvancedNonTerminalRules(self
):
819 Advanced: Non terminal rules
821 We check that DisableValidationAction() is applied
822 but does not stop the processing, then that
823 PoolAction() is applied _and_ stop the processing.
825 name
= 'nonterminal.advanced.tests.powerdns.com.'
826 query
= dns
.message
.make_query(name
, 'A', 'IN')
827 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
828 expectedQuery
.flags |
= dns
.flags
.CD
829 response
= dns
.message
.make_response(query
)
830 rrset
= dns
.rrset
.from_text(name
,
835 response
.answer
.append(rrset
)
837 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
838 self
.assertTrue(receivedQuery
)
839 self
.assertTrue(receivedResponse
)
840 receivedQuery
.id = expectedQuery
.id
841 self
.assertEquals(expectedQuery
, receivedQuery
)
842 self
.assertEquals(response
, receivedResponse
)
844 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
845 self
.assertTrue(receivedQuery
)
846 self
.assertTrue(receivedResponse
)
847 receivedQuery
.id = expectedQuery
.id
848 self
.assertEquals(expectedQuery
, receivedQuery
)
849 self
.assertEquals(response
, receivedResponse
)
851 class TestAdvancedStringOnlyServer(DNSDistTest
):
853 _config_template
= """
854 newServer("127.0.0.1:%s")
857 def testAdvancedStringOnlyServer(self
):
859 Advanced: "string-only" server is placed in the default pool
861 name
= 'string-only-server.advanced.tests.powerdns.com.'
862 query
= dns
.message
.make_query(name
, 'A', 'IN')
863 response
= dns
.message
.make_response(query
)
864 rrset
= dns
.rrset
.from_text(name
,
869 response
.answer
.append(rrset
)
871 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
872 self
.assertTrue(receivedQuery
)
873 self
.assertTrue(receivedResponse
)
874 receivedQuery
.id = query
.id
875 self
.assertEquals(query
, receivedQuery
)
876 self
.assertEquals(response
, receivedResponse
)
878 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
879 self
.assertTrue(receivedQuery
)
880 self
.assertTrue(receivedResponse
)
881 receivedQuery
.id = query
.id
882 self
.assertEquals(query
, receivedQuery
)
883 self
.assertEquals(response
, receivedResponse
)
885 class TestAdvancedRestoreFlagsOnSelfResponse(DNSDistTest
):
887 _config_template
= """
888 addAction(AllRule(), DisableValidationAction())
889 addAction(AllRule(), SpoofAction("192.0.2.1"))
890 newServer{address="127.0.0.1:%s"}
893 def testAdvancedRestoreFlagsOnSpoofResponse(self
):
895 Advanced: Restore flags on spoofed response
897 Send a query with CD flag cleared, dnsdist is
898 instructed to set it, then to spoof the response,
899 check that response has the flag cleared.
901 name
= 'spoofed.restoreflags.advanced.tests.powerdns.com.'
902 query
= dns
.message
.make_query(name
, 'A', 'IN')
903 # dnsdist set RA = RD for spoofed responses
904 query
.flags
&= ~dns
.flags
.RD
906 response
= dns
.message
.make_response(query
)
907 rrset
= dns
.rrset
.from_text(name
,
912 response
.answer
.append(rrset
)
914 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
915 self
.assertTrue(receivedResponse
)
916 self
.assertEquals(response
, receivedResponse
)
918 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
919 self
.assertTrue(receivedResponse
)
920 self
.assertEquals(response
, receivedResponse
)
922 class TestAdvancedQPS(DNSDistTest
):
924 _config_template
= """
925 addAction("qps.advanced.tests.powerdns.com", QPSAction(10))
926 newServer{address="127.0.0.1:%s"}
929 def testAdvancedQPSLimit(self
):
933 Send queries to "qps.advanced.tests.powerdns.com."
934 check that dnsdist drops queries when the max QPS has been reached.
937 name
= 'qps.advanced.tests.powerdns.com.'
938 query
= dns
.message
.make_query(name
, 'A', 'IN')
939 response
= dns
.message
.make_response(query
)
940 rrset
= dns
.rrset
.from_text(name
,
945 response
.answer
.append(rrset
)
947 for _
in range(maxQPS
):
948 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
949 receivedQuery
.id = query
.id
950 self
.assertEquals(query
, receivedQuery
)
951 self
.assertEquals(response
, receivedResponse
)
953 # we should now be dropped
954 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
955 self
.assertEquals(receivedResponse
, None)
959 # again, over TCP this time
960 for _
in range(maxQPS
):
961 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
962 receivedQuery
.id = query
.id
963 self
.assertEquals(query
, receivedQuery
)
964 self
.assertEquals(response
, receivedResponse
)
967 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
968 self
.assertEquals(receivedResponse
, None)
970 class TestAdvancedQPSNone(DNSDistTest
):
972 _config_template
= """
973 addAction("qpsnone.advanced.tests.powerdns.com", QPSAction(100))
974 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
975 newServer{address="127.0.0.1:%s"}
978 def testAdvancedQPSNone(self
):
980 Advanced: Not matching QPS returns None, not Allow
982 Send queries to "qps.advanced.tests.powerdns.com."
983 check that the rule returns None when the QPS has not been
986 name
= 'qpsnone.advanced.tests.powerdns.com.'
987 query
= dns
.message
.make_query(name
, 'A', 'IN')
988 expectedResponse
= dns
.message
.make_response(query
)
989 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
991 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
992 self
.assertEquals(receivedResponse
, expectedResponse
)
994 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
995 self
.assertEquals(receivedResponse
, expectedResponse
)
997 class TestAdvancedNMGRule(DNSDistTest
):
999 _config_template
= """
1001 allowed:addMask("192.0.2.1/32")
1002 addAction(NotRule(NetmaskGroupRule(allowed)), RCodeAction(dnsdist.REFUSED))
1003 newServer{address="127.0.0.1:%s"}
1006 def testAdvancedNMGRule(self
):
1008 Advanced: NMGRule should refuse our queries
1010 Send queries to "nmgrule.advanced.tests.powerdns.com.",
1011 check that we are getting a REFUSED response.
1013 name
= 'nmgrule.advanced.tests.powerdns.com.'
1014 query
= dns
.message
.make_query(name
, 'A', 'IN')
1015 expectedResponse
= dns
.message
.make_response(query
)
1016 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1018 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1019 self
.assertEquals(receivedResponse
, expectedResponse
)
1021 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1022 self
.assertEquals(receivedResponse
, expectedResponse
)
1024 class TestAdvancedLabelsCountRule(DNSDistTest
):
1026 _config_template
= """
1027 addAction(QNameLabelsCountRule(5,6), RCodeAction(dnsdist.REFUSED))
1028 newServer{address="127.0.0.1:%s"}
1031 def testAdvancedLabelsCountRule(self
):
1033 Advanced: QNameLabelsCountRule(5,6)
1035 # 6 labels, we should be fine
1036 name
= 'ok.labelscount.advanced.tests.powerdns.com.'
1037 query
= dns
.message
.make_query(name
, 'A', 'IN')
1038 response
= dns
.message
.make_response(query
)
1039 rrset
= dns
.rrset
.from_text(name
,
1044 response
.answer
.append(rrset
)
1046 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1047 self
.assertTrue(receivedQuery
)
1048 self
.assertTrue(receivedResponse
)
1049 receivedQuery
.id = query
.id
1050 self
.assertEquals(query
, receivedQuery
)
1051 self
.assertEquals(response
, receivedResponse
)
1053 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1054 self
.assertTrue(receivedQuery
)
1055 self
.assertTrue(receivedResponse
)
1056 receivedQuery
.id = query
.id
1057 self
.assertEquals(query
, receivedQuery
)
1058 self
.assertEquals(response
, receivedResponse
)
1060 # more than 6 labels, the query should be refused
1061 name
= 'not.ok.labelscount.advanced.tests.powerdns.com.'
1062 query
= dns
.message
.make_query(name
, 'A', 'IN')
1063 expectedResponse
= dns
.message
.make_response(query
)
1064 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1066 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1067 self
.assertEquals(receivedResponse
, expectedResponse
)
1069 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1070 self
.assertEquals(receivedResponse
, expectedResponse
)
1072 # less than 5 labels, the query should be refused
1073 name
= 'labelscountadvanced.tests.powerdns.com.'
1074 query
= dns
.message
.make_query(name
, 'A', 'IN')
1075 expectedResponse
= dns
.message
.make_response(query
)
1076 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1078 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1079 self
.assertEquals(receivedResponse
, expectedResponse
)
1081 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1082 self
.assertEquals(receivedResponse
, expectedResponse
)
1084 class TestAdvancedWireLengthRule(DNSDistTest
):
1086 _config_template
= """
1087 addAction(QNameWireLengthRule(54,56), RCodeAction(dnsdist.REFUSED))
1088 newServer{address="127.0.0.1:%s"}
1091 def testAdvancedWireLengthRule(self
):
1093 Advanced: QNameWireLengthRule(54,56)
1095 name
= 'longenough.qnamewirelength.advanced.tests.powerdns.com.'
1096 query
= dns
.message
.make_query(name
, 'A', 'IN')
1097 response
= dns
.message
.make_response(query
)
1098 rrset
= dns
.rrset
.from_text(name
,
1103 response
.answer
.append(rrset
)
1105 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1106 self
.assertTrue(receivedQuery
)
1107 self
.assertTrue(receivedResponse
)
1108 receivedQuery
.id = query
.id
1109 self
.assertEquals(query
, receivedQuery
)
1110 self
.assertEquals(response
, receivedResponse
)
1112 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1113 self
.assertTrue(receivedQuery
)
1114 self
.assertTrue(receivedResponse
)
1115 receivedQuery
.id = query
.id
1116 self
.assertEquals(query
, receivedQuery
)
1117 self
.assertEquals(response
, receivedResponse
)
1119 # too short, the query should be refused
1120 name
= 'short.qnamewirelength.advanced.tests.powerdns.com.'
1121 query
= dns
.message
.make_query(name
, 'A', 'IN')
1122 expectedResponse
= dns
.message
.make_response(query
)
1123 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1125 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1126 self
.assertEquals(receivedResponse
, expectedResponse
)
1128 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1129 self
.assertEquals(receivedResponse
, expectedResponse
)
1131 # too long, the query should be refused
1132 name
= 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
1133 query
= dns
.message
.make_query(name
, 'A', 'IN')
1134 expectedResponse
= dns
.message
.make_response(query
)
1135 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1137 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1138 self
.assertEquals(receivedResponse
, expectedResponse
)
1140 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1141 self
.assertEquals(receivedResponse
, expectedResponse
)
1143 class TestAdvancedIncludeDir(DNSDistTest
):
1145 _config_template
= """
1146 -- this directory contains a file allowing includedir.advanced.tests.powerdns.com.
1147 includeDirectory('test-include-dir')
1148 newServer{address="127.0.0.1:%s"}
1151 def testAdvancedIncludeDirAllowed(self
):
1153 Advanced: includeDirectory()
1155 name
= 'includedir.advanced.tests.powerdns.com.'
1156 query
= dns
.message
.make_query(name
, 'A', 'IN')
1157 response
= dns
.message
.make_response(query
)
1158 rrset
= dns
.rrset
.from_text(name
,
1163 response
.answer
.append(rrset
)
1165 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1166 self
.assertTrue(receivedQuery
)
1167 self
.assertTrue(receivedResponse
)
1168 receivedQuery
.id = query
.id
1169 self
.assertEquals(query
, receivedQuery
)
1170 self
.assertEquals(response
, receivedResponse
)
1172 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1173 self
.assertTrue(receivedQuery
)
1174 self
.assertTrue(receivedResponse
)
1175 receivedQuery
.id = query
.id
1176 self
.assertEquals(query
, receivedQuery
)
1177 self
.assertEquals(response
, receivedResponse
)
1179 # this one should be refused
1180 name
= 'notincludedir.advanced.tests.powerdns.com.'
1181 query
= dns
.message
.make_query(name
, 'A', 'IN')
1182 expectedResponse
= dns
.message
.make_response(query
)
1183 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1185 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1186 self
.assertEquals(receivedResponse
, expectedResponse
)
1188 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1189 self
.assertEquals(receivedResponse
, expectedResponse
)
1191 class TestAdvancedLuaDO(DNSDistTest
):
1193 _config_template
= """
1194 function nxDOLua(dq)
1196 return DNSAction.Nxdomain, ""
1198 return DNSAction.None, ""
1200 addAction(AllRule(), LuaAction(nxDOLua))
1201 newServer{address="127.0.0.1:%s"}
1204 def testNxDOViaLua(self
):
1206 Advanced: Nx DO queries via Lua
1208 name
= 'nxdo.advanced.tests.powerdns.com.'
1209 query
= dns
.message
.make_query(name
, 'A', 'IN')
1210 response
= dns
.message
.make_response(query
)
1211 rrset
= dns
.rrset
.from_text(name
,
1216 response
.answer
.append(rrset
)
1217 queryWithDO
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
1218 doResponse
= dns
.message
.make_response(queryWithDO
)
1219 doResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
1222 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1223 self
.assertTrue(receivedQuery
)
1224 self
.assertTrue(receivedResponse
)
1225 receivedQuery
.id = query
.id
1226 self
.assertEquals(query
, receivedQuery
)
1227 self
.assertEquals(receivedResponse
, response
)
1229 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1230 self
.assertTrue(receivedQuery
)
1231 self
.assertTrue(receivedResponse
)
1232 receivedQuery
.id = query
.id
1233 self
.assertEquals(query
, receivedQuery
)
1234 self
.assertEquals(receivedResponse
, response
)
1237 (_
, receivedResponse
) = self
.sendUDPQuery(queryWithDO
, response
=None, useQueue
=False)
1238 self
.assertTrue(receivedResponse
)
1239 doResponse
.id = receivedResponse
.id
1240 self
.assertEquals(receivedResponse
, doResponse
)
1242 (_
, receivedResponse
) = self
.sendTCPQuery(queryWithDO
, response
=None, useQueue
=False)
1243 self
.assertTrue(receivedResponse
)
1244 doResponse
.id = receivedResponse
.id
1245 self
.assertEquals(receivedResponse
, doResponse
)
1247 class TestAdvancedLuaRefused(DNSDistTest
):
1249 _config_template
= """
1251 return DNSAction.Refused, ""
1253 addAction(AllRule(), LuaAction(refuse))
1254 newServer{address="127.0.0.1:%s"}
1257 def testRefusedViaLua(self
):
1259 Advanced: Refused via Lua
1261 name
= 'refused.advanced.tests.powerdns.com.'
1262 query
= dns
.message
.make_query(name
, 'A', 'IN')
1263 response
= dns
.message
.make_response(query
)
1264 rrset
= dns
.rrset
.from_text(name
,
1269 response
.answer
.append(rrset
)
1270 refusedResponse
= dns
.message
.make_response(query
)
1271 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1273 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1274 self
.assertTrue(receivedResponse
)
1275 refusedResponse
.id = receivedResponse
.id
1276 self
.assertEquals(receivedResponse
, refusedResponse
)
1278 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1279 self
.assertTrue(receivedResponse
)
1280 refusedResponse
.id = receivedResponse
.id
1281 self
.assertEquals(receivedResponse
, refusedResponse
)
1283 class TestAdvancedLuaActionReturnSyntax(DNSDistTest
):
1285 _config_template
= """
1287 return DNSAction.Refused
1289 addAction(AllRule(), LuaAction(refuse))
1290 newServer{address="127.0.0.1:%s"}
1293 def testRefusedWithEmptyRule(self
):
1295 Advanced: Short syntax for LuaAction return values
1297 name
= 'short.refused.advanced.tests.powerdns.com.'
1298 query
= dns
.message
.make_query(name
, 'A', 'IN')
1299 response
= dns
.message
.make_response(query
)
1300 rrset
= dns
.rrset
.from_text(name
,
1305 response
.answer
.append(rrset
)
1306 refusedResponse
= dns
.message
.make_response(query
)
1307 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1309 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1310 self
.assertTrue(receivedResponse
)
1311 refusedResponse
.id = receivedResponse
.id
1312 self
.assertEquals(receivedResponse
, refusedResponse
)
1314 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1315 self
.assertTrue(receivedResponse
)
1316 refusedResponse
.id = receivedResponse
.id
1317 self
.assertEquals(receivedResponse
, refusedResponse
)
1319 class TestAdvancedLuaTruncated(DNSDistTest
):
1321 _config_template
= """
1324 return DNSAction.Truncate, ""
1326 return DNSAction.None, ""
1328 addAction(AllRule(), LuaAction(trunc))
1329 newServer{address="127.0.0.1:%s"}
1332 def testTCViaLua(self
):
1334 Advanced: TC via Lua
1336 name
= 'tc.advanced.tests.powerdns.com.'
1337 query
= dns
.message
.make_query(name
, 'A', 'IN')
1338 response
= dns
.message
.make_response(query
)
1339 rrset
= dns
.rrset
.from_text(name
,
1344 response
.answer
.append(rrset
)
1346 truncatedResponse
= dns
.message
.make_response(query
)
1347 truncatedResponse
.flags |
= dns
.flags
.TC
1349 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1350 self
.assertTrue(receivedResponse
)
1351 truncatedResponse
.id = receivedResponse
.id
1352 self
.assertEquals(receivedResponse
, truncatedResponse
)
1354 # no truncation over TCP
1355 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1356 self
.assertTrue(receivedQuery
)
1357 self
.assertTrue(receivedResponse
)
1358 receivedQuery
.id = query
.id
1359 self
.assertEquals(query
, receivedQuery
)
1360 self
.assertEquals(receivedResponse
, response
)
1362 class TestStatNodeRespRingSince(DNSDistTest
):
1364 _consoleKey
= DNSDistTest
.generateConsoleKey()
1365 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
1366 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort']
1367 _config_template
= """
1369 controlSocket("127.0.0.1:%s")
1370 s1 = newServer{address="127.0.0.1:%s"}
1372 function visitor(node, self, childstat)
1373 table.insert(nodesSeen, node.fullname)
1377 def testStatNodeRespRingSince(self
):
1379 Advanced: StatNodeRespRing with optional since parameter
1382 name
= 'statnodesince.advanced.tests.powerdns.com.'
1383 query
= dns
.message
.make_query(name
, 'A', 'IN')
1384 response
= dns
.message
.make_response(query
)
1385 rrset
= dns
.rrset
.from_text(name
,
1390 response
.answer
.append(rrset
)
1392 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1393 self
.assertTrue(receivedQuery
)
1394 self
.assertTrue(receivedResponse
)
1395 receivedQuery
.id = query
.id
1396 self
.assertEquals(query
, receivedQuery
)
1397 self
.assertEquals(response
, receivedResponse
)
1399 self
.sendConsoleCommand("nodesSeen = {}")
1400 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1401 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1402 nodes
= nodes
.strip("\n")
1403 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1404 advanced.tests.powerdns.com.
1409 self
.sendConsoleCommand("nodesSeen = {}")
1410 self
.sendConsoleCommand("statNodeRespRing(visitor, 0)")
1411 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1412 nodes
= nodes
.strip("\n")
1413 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1414 advanced.tests.powerdns.com.
1421 self
.sendConsoleCommand("nodesSeen = {}")
1422 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1423 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1424 nodes
= nodes
.strip("\n")
1425 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1426 advanced.tests.powerdns.com.
1431 self
.sendConsoleCommand("nodesSeen = {}")
1432 self
.sendConsoleCommand("statNodeRespRing(visitor, 5)")
1433 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1434 nodes
= nodes
.strip("\n")
1435 self
.assertEquals(nodes
, """""")
1437 self
.sendConsoleCommand("nodesSeen = {}")
1438 self
.sendConsoleCommand("statNodeRespRing(visitor, 10)")
1439 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1440 nodes
= nodes
.strip("\n")
1441 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1442 advanced.tests.powerdns.com.
1447 class TestAdvancedRD(DNSDistTest
):
1449 _config_template
= """
1450 addAction(RDRule(), RCodeAction(dnsdist.REFUSED))
1451 newServer{address="127.0.0.1:%s"}
1454 def testAdvancedRDRefused(self
):
1456 Advanced: RD query is refused
1458 name
= 'rd.advanced.tests.powerdns.com.'
1459 query
= dns
.message
.make_query(name
, 'A', 'IN')
1460 expectedResponse
= dns
.message
.make_response(query
)
1461 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1463 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1464 self
.assertEquals(receivedResponse
, expectedResponse
)
1466 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1467 self
.assertEquals(receivedResponse
, expectedResponse
)
1469 def testAdvancedNoRDAllowed(self
):
1471 Advanced: No-RD query is allowed
1473 name
= 'no-rd.advanced.tests.powerdns.com.'
1474 query
= dns
.message
.make_query(name
, 'A', 'IN')
1475 query
.flags
&= ~dns
.flags
.RD
1476 response
= dns
.message
.make_response(query
)
1478 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1479 receivedQuery
.id = query
.id
1480 self
.assertEquals(receivedQuery
, query
)
1481 self
.assertEquals(receivedResponse
, response
)
1483 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1484 receivedQuery
.id = query
.id
1485 self
.assertEquals(receivedQuery
, query
)
1486 self
.assertEquals(receivedResponse
, response
)
1488 class TestAdvancedGetLocalPort(DNSDistTest
):
1490 _config_template
= """
1491 function answerBasedOnLocalPort(dq)
1492 local port = dq.localaddr:getPort()
1493 return DNSAction.Spoof, "port-was-"..port..".local-port.advanced.tests.powerdns.com."
1495 addAction("local-port.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1496 newServer{address="127.0.0.1:%s"}
1499 def testAdvancedGetLocalPort(self
):
1501 Advanced: Return CNAME containing the local port
1503 name
= 'local-port.advanced.tests.powerdns.com.'
1504 query
= dns
.message
.make_query(name
, 'A', 'IN')
1505 # dnsdist set RA = RD for spoofed responses
1506 query
.flags
&= ~dns
.flags
.RD
1508 response
= dns
.message
.make_response(query
)
1509 rrset
= dns
.rrset
.from_text(name
,
1512 dns
.rdatatype
.CNAME
,
1513 'port-was-{}.local-port.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1514 response
.answer
.append(rrset
)
1516 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1517 self
.assertEquals(receivedResponse
, response
)
1519 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1520 self
.assertEquals(receivedResponse
, response
)
1522 class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest
):
1524 _config_template
= """
1525 function answerBasedOnLocalPort(dq)
1526 local port = dq.localaddr:getPort()
1527 return DNSAction.Spoof, "port-was-"..port..".local-port-any.advanced.tests.powerdns.com."
1529 addAction("local-port-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1530 newServer{address="127.0.0.1:%s"}
1532 _dnsDistListeningAddr
= "0.0.0.0"
1534 def testAdvancedGetLocalPortOnAnyBind(self
):
1536 Advanced: Return CNAME containing the local port for an ANY bind
1538 name
= 'local-port-any.advanced.tests.powerdns.com.'
1539 query
= dns
.message
.make_query(name
, 'A', 'IN')
1540 # dnsdist set RA = RD for spoofed responses
1541 query
.flags
&= ~dns
.flags
.RD
1543 response
= dns
.message
.make_response(query
)
1544 rrset
= dns
.rrset
.from_text(name
,
1547 dns
.rdatatype
.CNAME
,
1548 'port-was-{}.local-port-any.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1549 response
.answer
.append(rrset
)
1551 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1552 self
.assertEquals(receivedResponse
, response
)
1554 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1555 self
.assertEquals(receivedResponse
, response
)
1557 class TestAdvancedLuaTempFailureTTL(DNSDistTest
):
1559 _config_template
= """
1560 function testAction(dq)
1561 if dq.tempFailureTTL ~= nil then
1562 return DNSAction.Spoof, "initially.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1564 dq.tempFailureTTL = 30
1565 if dq.tempFailureTTL ~= 30 then
1566 return DNSAction.Spoof, "after.set.not.expected.value.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1568 dq.tempFailureTTL = nil
1569 if dq.tempFailureTTL ~= nil then
1570 return DNSAction.Spoof, "after.unset.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1572 return DNSAction.None, ""
1574 addAction(AllRule(), LuaAction(testAction))
1575 newServer{address="127.0.0.1:%s"}
1578 def testTempFailureTTLBinding(self
):
1580 Exercise dq.tempFailureTTL Lua binding
1582 name
= 'tempfailurettlbinding.advanced.tests.powerdns.com.'
1583 query
= dns
.message
.make_query(name
, 'A', 'IN')
1584 response
= dns
.message
.make_response(query
)
1585 rrset
= dns
.rrset
.from_text(name
,
1590 response
.answer
.append(rrset
)
1592 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1593 self
.assertTrue(receivedQuery
)
1594 self
.assertTrue(receivedResponse
)
1595 receivedQuery
.id = query
.id
1596 self
.assertEquals(query
, receivedQuery
)
1597 self
.assertEquals(receivedResponse
, response
)