]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Advanced.py
3 from datetime
import datetime
, timedelta
8 from dnsdisttests
import DNSDistTest
10 class TestAdvancedAllow(DNSDistTest
):
12 _config_template
= """
13 addAction(AllRule(), NoneAction())
14 addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
15 addAction(AllRule(), DropAction())
16 newServer{address="127.0.0.1:%s"}
19 def testAdvancedAllow(self
):
21 Advanced: Allowed qname is not dropped
23 A query for allowed.advanced.tests.powerdns.com. should be allowed
24 while others should be dropped.
26 name
= 'allowed.advanced.tests.powerdns.com.'
27 query
= dns
.message
.make_query(name
, 'A', 'IN')
28 response
= dns
.message
.make_response(query
)
29 rrset
= dns
.rrset
.from_text(name
,
34 response
.answer
.append(rrset
)
36 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
37 self
.assertTrue(receivedQuery
)
38 self
.assertTrue(receivedResponse
)
39 receivedQuery
.id = query
.id
40 self
.assertEquals(query
, receivedQuery
)
41 self
.assertEquals(response
, receivedResponse
)
43 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
44 self
.assertTrue(receivedQuery
)
45 self
.assertTrue(receivedResponse
)
46 receivedQuery
.id = query
.id
47 self
.assertEquals(query
, receivedQuery
)
48 self
.assertEquals(response
, receivedResponse
)
50 def testAdvancedAllowDropped(self
):
52 Advanced: Not allowed qname is dropped
54 A query for notallowed.advanced.tests.powerdns.com. should be dropped.
56 name
= 'notallowed.advanced.tests.powerdns.com.'
57 query
= dns
.message
.make_query(name
, 'A', 'IN')
58 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
59 self
.assertEquals(receivedResponse
, None)
61 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
62 self
.assertEquals(receivedResponse
, None)
64 class TestAdvancedFixupCase(DNSDistTest
):
66 _config_template
= """
69 newServer{address="127.0.0.1:%s"}
72 def testAdvancedFixupCase(self
):
76 Send a query with lower and upper chars,
77 make the backend return a lowercase version,
78 check that dnsdist fixes the response.
80 name
= 'fiXuPCasE.advanced.tests.powerdns.com.'
81 query
= dns
.message
.make_query(name
, 'A', 'IN')
82 lowercasequery
= dns
.message
.make_query(name
.lower(), 'A', 'IN')
83 response
= dns
.message
.make_response(lowercasequery
)
84 expectedResponse
= dns
.message
.make_response(query
)
85 rrset
= dns
.rrset
.from_text(name
,
90 response
.answer
.append(rrset
)
91 expectedResponse
.answer
.append(rrset
)
93 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
94 self
.assertTrue(receivedQuery
)
95 self
.assertTrue(receivedResponse
)
96 receivedQuery
.id = query
.id
97 self
.assertEquals(query
, receivedQuery
)
98 self
.assertEquals(expectedResponse
, receivedResponse
)
100 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
101 self
.assertTrue(receivedQuery
)
102 self
.assertTrue(receivedResponse
)
103 receivedQuery
.id = query
.id
104 self
.assertEquals(query
, receivedQuery
)
105 self
.assertEquals(expectedResponse
, receivedResponse
)
108 class TestAdvancedRemoveRD(DNSDistTest
):
110 _config_template
= """
111 addAction("norecurse.advanced.tests.powerdns.com.", NoRecurseAction())
112 newServer{address="127.0.0.1:%s"}
115 def testAdvancedNoRD(self
):
119 Send a query with RD,
120 check that dnsdist clears the RD flag.
122 name
= 'norecurse.advanced.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN')
124 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
125 expectedQuery
.flags
&= ~dns
.flags
.RD
127 response
= dns
.message
.make_response(query
)
128 rrset
= dns
.rrset
.from_text(name
,
133 response
.answer
.append(rrset
)
135 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
136 self
.assertTrue(receivedQuery
)
137 self
.assertTrue(receivedResponse
)
138 receivedQuery
.id = expectedQuery
.id
139 self
.assertEquals(expectedQuery
, receivedQuery
)
140 self
.assertEquals(response
, receivedResponse
)
142 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
143 self
.assertTrue(receivedQuery
)
144 self
.assertTrue(receivedResponse
)
145 receivedQuery
.id = expectedQuery
.id
146 self
.assertEquals(expectedQuery
, receivedQuery
)
147 self
.assertEquals(response
, receivedResponse
)
149 def testAdvancedKeepRD(self
):
151 Advanced: No RD canary
153 Send a query with RD for a canary domain,
154 check that dnsdist does not clear the RD flag.
156 name
= 'keeprecurse.advanced.tests.powerdns.com.'
157 query
= dns
.message
.make_query(name
, 'A', 'IN')
159 response
= dns
.message
.make_response(query
)
160 rrset
= dns
.rrset
.from_text(name
,
165 response
.answer
.append(rrset
)
167 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
168 self
.assertTrue(receivedQuery
)
169 self
.assertTrue(receivedResponse
)
170 receivedQuery
.id = query
.id
171 self
.assertEquals(query
, receivedQuery
)
172 self
.assertEquals(response
, receivedResponse
)
174 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
175 self
.assertTrue(receivedQuery
)
176 self
.assertTrue(receivedResponse
)
177 receivedQuery
.id = query
.id
178 self
.assertEquals(query
, receivedQuery
)
179 self
.assertEquals(response
, receivedResponse
)
182 class TestAdvancedAddCD(DNSDistTest
):
184 _config_template
= """
185 addAction("setcd.advanced.tests.powerdns.com.", DisableValidationAction())
186 addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
187 newServer{address="127.0.0.1:%s"}
190 def testAdvancedSetCD(self
):
194 Send a query with CD cleared,
195 check that dnsdist set the CD flag.
197 name
= 'setcd.advanced.tests.powerdns.com.'
198 query
= dns
.message
.make_query(name
, 'A', 'IN')
199 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
200 expectedQuery
.flags |
= dns
.flags
.CD
202 response
= dns
.message
.make_response(query
)
203 rrset
= dns
.rrset
.from_text(name
,
208 response
.answer
.append(rrset
)
210 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
211 self
.assertTrue(receivedQuery
)
212 self
.assertTrue(receivedResponse
)
213 receivedQuery
.id = expectedQuery
.id
214 self
.assertEquals(expectedQuery
, receivedQuery
)
215 self
.assertEquals(response
, receivedResponse
)
217 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
218 self
.assertTrue(receivedQuery
)
219 self
.assertTrue(receivedResponse
)
220 receivedQuery
.id = expectedQuery
.id
221 self
.assertEquals(expectedQuery
, receivedQuery
)
222 self
.assertEquals(response
, receivedResponse
)
224 def testAdvancedSetCDViaAction(self
):
226 Advanced: Set CD via Action
228 Send a query with CD cleared,
229 check that dnsdist set the CD flag.
231 name
= 'setcdviaaction.advanced.tests.powerdns.com.'
232 query
= dns
.message
.make_query(name
, 'A', 'IN')
233 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
234 expectedQuery
.flags |
= dns
.flags
.CD
236 response
= dns
.message
.make_response(query
)
237 rrset
= dns
.rrset
.from_text(name
,
242 response
.answer
.append(rrset
)
244 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
245 self
.assertTrue(receivedQuery
)
246 self
.assertTrue(receivedResponse
)
247 receivedQuery
.id = expectedQuery
.id
248 self
.assertEquals(expectedQuery
, receivedQuery
)
249 self
.assertEquals(response
, receivedResponse
)
251 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
252 self
.assertTrue(receivedQuery
)
253 self
.assertTrue(receivedResponse
)
254 receivedQuery
.id = expectedQuery
.id
255 self
.assertEquals(expectedQuery
, receivedQuery
)
256 self
.assertEquals(response
, receivedResponse
)
258 def testAdvancedKeepNoCD(self
):
260 Advanced: Preserve CD canary
262 Send a query without CD for a canary domain,
263 check that dnsdist does not set the CD flag.
265 name
= 'keepnocd.advanced.tests.powerdns.com.'
266 query
= dns
.message
.make_query(name
, 'A', 'IN')
268 response
= dns
.message
.make_response(query
)
269 rrset
= dns
.rrset
.from_text(name
,
274 response
.answer
.append(rrset
)
276 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = query
.id
280 self
.assertEquals(query
, receivedQuery
)
281 self
.assertEquals(response
, receivedResponse
)
283 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
284 self
.assertTrue(receivedQuery
)
285 self
.assertTrue(receivedResponse
)
286 receivedQuery
.id = query
.id
287 self
.assertEquals(query
, receivedQuery
)
288 self
.assertEquals(response
, receivedResponse
)
290 class TestAdvancedClearRD(DNSDistTest
):
292 _config_template
= """
293 addAction("clearrd.advanced.tests.powerdns.com.", NoRecurseAction())
294 addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
295 newServer{address="127.0.0.1:%s"}
298 def testAdvancedClearRD(self
):
302 Send a query with RD set,
303 check that dnsdist clears the RD flag.
305 name
= 'clearrd.advanced.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN')
307 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
308 expectedQuery
.flags
&= ~dns
.flags
.RD
310 response
= dns
.message
.make_response(query
)
311 rrset
= dns
.rrset
.from_text(name
,
316 response
.answer
.append(rrset
)
318 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
319 self
.assertTrue(receivedQuery
)
320 self
.assertTrue(receivedResponse
)
321 receivedQuery
.id = expectedQuery
.id
322 self
.assertEquals(expectedQuery
, receivedQuery
)
323 self
.assertEquals(response
, receivedResponse
)
325 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
326 self
.assertTrue(receivedQuery
)
327 self
.assertTrue(receivedResponse
)
328 receivedQuery
.id = expectedQuery
.id
329 self
.assertEquals(expectedQuery
, receivedQuery
)
330 self
.assertEquals(response
, receivedResponse
)
332 def testAdvancedClearRDViaAction(self
):
334 Advanced: Clear RD via Action
336 Send a query with RD set,
337 check that dnsdist clears the RD flag.
339 name
= 'clearrdviaaction.advanced.tests.powerdns.com.'
340 query
= dns
.message
.make_query(name
, 'A', 'IN')
341 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
342 expectedQuery
.flags
&= ~dns
.flags
.RD
344 response
= dns
.message
.make_response(query
)
345 rrset
= dns
.rrset
.from_text(name
,
350 response
.answer
.append(rrset
)
352 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
353 self
.assertTrue(receivedQuery
)
354 self
.assertTrue(receivedResponse
)
355 receivedQuery
.id = expectedQuery
.id
356 self
.assertEquals(expectedQuery
, receivedQuery
)
357 self
.assertEquals(response
, receivedResponse
)
359 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
360 self
.assertTrue(receivedQuery
)
361 self
.assertTrue(receivedResponse
)
362 receivedQuery
.id = expectedQuery
.id
363 self
.assertEquals(expectedQuery
, receivedQuery
)
364 self
.assertEquals(response
, receivedResponse
)
366 def testAdvancedKeepRD(self
):
368 Advanced: Preserve RD canary
370 Send a query with RD for a canary domain,
371 check that dnsdist does not clear the RD flag.
373 name
= 'keeprd.advanced.tests.powerdns.com.'
374 query
= dns
.message
.make_query(name
, 'A', 'IN')
376 response
= dns
.message
.make_response(query
)
377 rrset
= dns
.rrset
.from_text(name
,
382 response
.answer
.append(rrset
)
384 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
385 self
.assertTrue(receivedQuery
)
386 self
.assertTrue(receivedResponse
)
387 receivedQuery
.id = query
.id
388 self
.assertEquals(query
, receivedQuery
)
389 self
.assertEquals(response
, receivedResponse
)
391 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
392 self
.assertTrue(receivedQuery
)
393 self
.assertTrue(receivedResponse
)
394 receivedQuery
.id = query
.id
395 self
.assertEquals(query
, receivedQuery
)
396 self
.assertEquals(response
, receivedResponse
)
399 class TestAdvancedACL(DNSDistTest
):
401 _config_template
= """
402 newServer{address="127.0.0.1:%s"}
404 _acl
= ['192.0.2.1/32']
406 def testACLBlocked(self
):
408 Advanced: ACL blocked
410 Send an A query to "tests.powerdns.com.",
411 we expect no response since 127.0.0.1 is not on the
414 name
= 'tests.powerdns.com.'
415 query
= dns
.message
.make_query(name
, 'A', 'IN')
417 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
418 self
.assertEquals(receivedResponse
, None)
420 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
421 self
.assertEquals(receivedResponse
, None)
423 class TestAdvancedDelay(DNSDistTest
):
425 _config_template
= """
426 addAction(AllRule(), DelayAction(1000))
427 newServer{address="127.0.0.1:%s"}
430 def testDelayed(self
):
434 Send an A query to "tests.powerdns.com.",
435 check that the response delay is longer than 1000 ms
436 over UDP, less than that over TCP.
438 name
= 'tests.powerdns.com.'
439 query
= dns
.message
.make_query(name
, 'A', 'IN')
440 response
= dns
.message
.make_response(query
)
441 rrset
= dns
.rrset
.from_text(name
,
446 response
.answer
.append(rrset
)
448 begin
= datetime
.now()
449 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
451 receivedQuery
.id = query
.id
452 self
.assertEquals(query
, receivedQuery
)
453 self
.assertEquals(response
, receivedResponse
)
454 self
.assertTrue((end
- begin
) > timedelta(0, 1))
456 begin
= datetime
.now()
457 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
459 receivedQuery
.id = query
.id
460 self
.assertEquals(query
, receivedQuery
)
461 self
.assertEquals(response
, receivedResponse
)
462 self
.assertTrue((end
- begin
) < timedelta(0, 1))
465 class TestAdvancedTruncateAnyAndTCP(DNSDistTest
):
467 _config_template
= """
469 addAction(AndRule({QTypeRule("ANY"), TCPRule(true)}), TCAction())
470 newServer{address="127.0.0.1:%s"}
472 def testTruncateAnyOverTCP(self
):
474 Advanced: Truncate ANY over TCP
476 Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
477 should be truncated over TCP, not over UDP (yes, it makes no sense,
480 name
= 'anytruncatetcp.advanced.tests.powerdns.com.'
481 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
483 response
= dns
.message
.make_response(query
)
484 rrset
= dns
.rrset
.from_text(name
,
490 response
.answer
.append(rrset
)
492 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
493 self
.assertTrue(receivedQuery
)
494 self
.assertTrue(receivedResponse
)
495 receivedQuery
.id = query
.id
496 self
.assertEquals(query
, receivedQuery
)
497 self
.assertEquals(receivedResponse
, response
)
499 expectedResponse
= dns
.message
.make_response(query
)
500 expectedResponse
.flags |
= dns
.flags
.TC
502 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
503 self
.assertEquals(receivedResponse
, expectedResponse
)
505 class TestAdvancedAndNot(DNSDistTest
):
507 _config_template
= """
508 addAction(AndRule({NotRule(QTypeRule("A")), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
509 newServer{address="127.0.0.1:%s"}
511 def testAOverUDPReturnsNotImplementedCanary(self
):
513 Advanced: !A && UDP canary
515 dnsdist is configured to reply 'not implemented' for query
516 over UDP AND !qtype A.
517 We send an A query over UDP and TCP, and check that the
520 name
= 'andnot.advanced.tests.powerdns.com.'
521 query
= dns
.message
.make_query(name
, 'A', 'IN')
522 response
= dns
.message
.make_response(query
)
523 rrset
= dns
.rrset
.from_text(name
,
528 response
.answer
.append(rrset
)
530 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
531 self
.assertTrue(receivedQuery
)
532 self
.assertTrue(receivedResponse
)
533 receivedQuery
.id = query
.id
534 self
.assertEquals(query
, receivedQuery
)
535 self
.assertEquals(receivedResponse
, response
)
537 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
538 self
.assertTrue(receivedQuery
)
539 self
.assertTrue(receivedResponse
)
540 receivedQuery
.id = query
.id
541 self
.assertEquals(query
, receivedQuery
)
542 self
.assertEquals(receivedResponse
, response
)
544 def testAOverUDPReturnsNotImplemented(self
):
548 dnsdist is configured to reply 'not implemented' for query
549 over UDP AND !qtype A.
550 We send a TXT query over UDP and TCP, and check that the
551 response is OK for TCP and 'not implemented' for UDP.
553 name
= 'andnot.advanced.tests.powerdns.com.'
554 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
556 expectedResponse
= dns
.message
.make_response(query
)
557 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
559 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
560 self
.assertEquals(receivedResponse
, expectedResponse
)
562 response
= dns
.message
.make_response(query
)
563 rrset
= dns
.rrset
.from_text(name
,
567 'nothing to see here')
568 response
.answer
.append(rrset
)
570 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
571 self
.assertTrue(receivedQuery
)
572 self
.assertTrue(receivedResponse
)
573 receivedQuery
.id = query
.id
574 self
.assertEquals(query
, receivedQuery
)
575 self
.assertEquals(receivedResponse
, response
)
577 class TestAdvancedOr(DNSDistTest
):
579 _config_template
= """
580 addAction(OrRule({QTypeRule("A"), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
581 newServer{address="127.0.0.1:%s"}
583 def testAAAAOverUDPReturnsNotImplemented(self
):
585 Advanced: A || UDP: AAAA
587 dnsdist is configured to reply 'not implemented' for query
589 We send an AAAA query over UDP and TCP, and check that the
590 response is 'not implemented' for UDP and OK for TCP.
592 name
= 'aorudp.advanced.tests.powerdns.com.'
593 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
594 response
= dns
.message
.make_response(query
)
595 rrset
= dns
.rrset
.from_text(name
,
600 response
.answer
.append(rrset
)
602 expectedResponse
= dns
.message
.make_response(query
)
603 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
605 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
606 self
.assertEquals(receivedResponse
, expectedResponse
)
608 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
609 self
.assertTrue(receivedQuery
)
610 self
.assertTrue(receivedResponse
)
611 receivedQuery
.id = query
.id
612 self
.assertEquals(query
, receivedQuery
)
613 self
.assertEquals(receivedResponse
, response
)
615 def testAOverUDPReturnsNotImplemented(self
):
617 Advanced: A || UDP: A
619 dnsdist is configured to reply 'not implemented' for query
621 We send an A query over UDP and TCP, and check that the
622 response is 'not implemented' for both.
624 name
= 'aorudp.advanced.tests.powerdns.com.'
625 query
= dns
.message
.make_query(name
, 'A', 'IN')
627 expectedResponse
= dns
.message
.make_response(query
)
628 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
630 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
631 self
.assertEquals(receivedResponse
, expectedResponse
)
633 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
634 self
.assertEquals(receivedResponse
, expectedResponse
)
637 class TestAdvancedLogAction(DNSDistTest
):
639 _config_template
= """
640 newServer{address="127.0.0.1:%s"}
641 addAction(AllRule(), LogAction("dnsdist.log", false))
643 def testAdvancedLogAction(self
):
645 Advanced: Log all queries
649 name
= 'logaction.advanced.tests.powerdns.com.'
650 query
= dns
.message
.make_query(name
, 'A', 'IN')
651 response
= dns
.message
.make_response(query
)
652 rrset
= dns
.rrset
.from_text(name
,
657 response
.answer
.append(rrset
)
659 for _
in range(count
):
660 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
661 self
.assertTrue(receivedQuery
)
662 self
.assertTrue(receivedResponse
)
663 receivedQuery
.id = query
.id
664 self
.assertEquals(query
, receivedQuery
)
665 self
.assertEquals(response
, receivedResponse
)
667 self
.assertTrue(os
.path
.isfile('dnsdist.log'))
668 self
.assertTrue(os
.stat('dnsdist.log').st_size
> 0)
670 class TestAdvancedDNSSEC(DNSDistTest
):
672 _config_template
= """
673 newServer{address="127.0.0.1:%s"}
674 addAction(DNSSECRule(), DropAction())
676 def testAdvancedDNSSECDrop(self
):
678 Advanced: DNSSEC Rule
681 name
= 'dnssec.advanced.tests.powerdns.com.'
682 query
= dns
.message
.make_query(name
, 'A', 'IN')
683 doquery
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
684 response
= dns
.message
.make_response(query
)
685 rrset
= dns
.rrset
.from_text(name
,
690 response
.answer
.append(rrset
)
692 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
693 self
.assertTrue(receivedQuery
)
694 self
.assertTrue(receivedResponse
)
695 receivedQuery
.id = query
.id
696 self
.assertEquals(query
, receivedQuery
)
697 self
.assertEquals(response
, receivedResponse
)
699 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
700 self
.assertTrue(receivedQuery
)
701 self
.assertTrue(receivedResponse
)
702 receivedQuery
.id = query
.id
703 self
.assertEquals(query
, receivedQuery
)
704 self
.assertEquals(response
, receivedResponse
)
706 (_
, receivedResponse
) = self
.sendUDPQuery(doquery
, response
)
707 self
.assertEquals(receivedResponse
, None)
708 (_
, receivedResponse
) = self
.sendTCPQuery(doquery
, response
)
709 self
.assertEquals(receivedResponse
, None)
711 class TestAdvancedQClass(DNSDistTest
):
713 _config_template
= """
714 newServer{address="127.0.0.1:%s"}
715 addAction(QClassRule(DNSClass.CHAOS), DropAction())
717 def testAdvancedQClassChaosDrop(self
):
719 Advanced: Drop QClass CHAOS
722 name
= 'qclasschaos.advanced.tests.powerdns.com.'
723 query
= dns
.message
.make_query(name
, 'TXT', 'CHAOS')
725 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
726 self
.assertEquals(receivedResponse
, None)
727 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
728 self
.assertEquals(receivedResponse
, None)
730 def testAdvancedQClassINAllow(self
):
732 Advanced: Allow QClass IN
735 name
= 'qclassin.advanced.tests.powerdns.com.'
736 query
= dns
.message
.make_query(name
, 'A', 'IN')
737 response
= dns
.message
.make_response(query
)
738 rrset
= dns
.rrset
.from_text(name
,
743 response
.answer
.append(rrset
)
745 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
746 self
.assertTrue(receivedQuery
)
747 self
.assertTrue(receivedResponse
)
748 receivedQuery
.id = query
.id
749 self
.assertEquals(query
, receivedQuery
)
750 self
.assertEquals(response
, receivedResponse
)
752 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
753 self
.assertTrue(receivedQuery
)
754 self
.assertTrue(receivedResponse
)
755 receivedQuery
.id = query
.id
756 self
.assertEquals(query
, receivedQuery
)
757 self
.assertEquals(response
, receivedResponse
)
759 class TestAdvancedOpcode(DNSDistTest
):
761 _config_template
= """
762 newServer{address="127.0.0.1:%s"}
763 addAction(OpcodeRule(DNSOpcode.Notify), DropAction())
765 def testAdvancedOpcodeNotifyDrop(self
):
767 Advanced: Drop Opcode NOTIFY
770 name
= 'opcodenotify.advanced.tests.powerdns.com.'
771 query
= dns
.message
.make_query(name
, 'A', 'IN')
772 query
.set_opcode(dns
.opcode
.NOTIFY
)
774 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
775 self
.assertEquals(receivedResponse
, None)
776 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
777 self
.assertEquals(receivedResponse
, None)
779 def testAdvancedOpcodeUpdateINAllow(self
):
781 Advanced: Allow Opcode UPDATE
784 name
= 'opcodeupdate.advanced.tests.powerdns.com.'
785 query
= dns
.message
.make_query(name
, 'A', 'IN')
786 query
.set_opcode(dns
.opcode
.UPDATE
)
787 response
= dns
.message
.make_response(query
)
788 rrset
= dns
.rrset
.from_text(name
,
793 response
.answer
.append(rrset
)
795 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
796 self
.assertTrue(receivedQuery
)
797 self
.assertTrue(receivedResponse
)
798 receivedQuery
.id = query
.id
799 self
.assertEquals(query
, receivedQuery
)
800 self
.assertEquals(response
, receivedResponse
)
802 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
803 self
.assertTrue(receivedQuery
)
804 self
.assertTrue(receivedResponse
)
805 receivedQuery
.id = query
.id
806 self
.assertEquals(query
, receivedQuery
)
807 self
.assertEquals(response
, receivedResponse
)
809 class TestAdvancedNonTerminalRule(DNSDistTest
):
811 _config_template
= """
812 newServer{address="127.0.0.1:%s", pool="real"}
813 addAction(AllRule(), DisableValidationAction())
814 addAction(AllRule(), PoolAction("real"))
815 addAction(AllRule(), DropAction())
817 def testAdvancedNonTerminalRules(self
):
819 Advanced: Non terminal rules
821 We check that DisableValidationAction() is applied
822 but does not stop the processing, then that
823 PoolAction() is applied _and_ stop the processing.
825 name
= 'nonterminal.advanced.tests.powerdns.com.'
826 query
= dns
.message
.make_query(name
, 'A', 'IN')
827 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
828 expectedQuery
.flags |
= dns
.flags
.CD
829 response
= dns
.message
.make_response(query
)
830 rrset
= dns
.rrset
.from_text(name
,
835 response
.answer
.append(rrset
)
837 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
838 self
.assertTrue(receivedQuery
)
839 self
.assertTrue(receivedResponse
)
840 receivedQuery
.id = expectedQuery
.id
841 self
.assertEquals(expectedQuery
, receivedQuery
)
842 self
.assertEquals(response
, receivedResponse
)
844 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
845 self
.assertTrue(receivedQuery
)
846 self
.assertTrue(receivedResponse
)
847 receivedQuery
.id = expectedQuery
.id
848 self
.assertEquals(expectedQuery
, receivedQuery
)
849 self
.assertEquals(response
, receivedResponse
)
851 class TestAdvancedStringOnlyServer(DNSDistTest
):
853 _config_template
= """
854 newServer("127.0.0.1:%s")
857 def testAdvancedStringOnlyServer(self
):
859 Advanced: "string-only" server is placed in the default pool
861 name
= 'string-only-server.advanced.tests.powerdns.com.'
862 query
= dns
.message
.make_query(name
, 'A', 'IN')
863 response
= dns
.message
.make_response(query
)
864 rrset
= dns
.rrset
.from_text(name
,
869 response
.answer
.append(rrset
)
871 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
872 self
.assertTrue(receivedQuery
)
873 self
.assertTrue(receivedResponse
)
874 receivedQuery
.id = query
.id
875 self
.assertEquals(query
, receivedQuery
)
876 self
.assertEquals(response
, receivedResponse
)
878 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
879 self
.assertTrue(receivedQuery
)
880 self
.assertTrue(receivedResponse
)
881 receivedQuery
.id = query
.id
882 self
.assertEquals(query
, receivedQuery
)
883 self
.assertEquals(response
, receivedResponse
)
885 class TestAdvancedRestoreFlagsOnSelfResponse(DNSDistTest
):
887 _config_template
= """
888 addAction(AllRule(), DisableValidationAction())
889 addAction(AllRule(), SpoofAction("192.0.2.1"))
890 newServer{address="127.0.0.1:%s"}
893 def testAdvancedRestoreFlagsOnSpoofResponse(self
):
895 Advanced: Restore flags on spoofed response
897 Send a query with CD flag cleared, dnsdist is
898 instructed to set it, then to spoof the response,
899 check that response has the flag cleared.
901 name
= 'spoofed.restoreflags.advanced.tests.powerdns.com.'
902 query
= dns
.message
.make_query(name
, 'A', 'IN')
903 # dnsdist set RA = RD for spoofed responses
904 query
.flags
&= ~dns
.flags
.RD
906 response
= dns
.message
.make_response(query
)
907 rrset
= dns
.rrset
.from_text(name
,
912 response
.answer
.append(rrset
)
914 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
915 self
.assertTrue(receivedResponse
)
916 self
.assertEquals(response
, receivedResponse
)
918 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
919 self
.assertTrue(receivedResponse
)
920 self
.assertEquals(response
, receivedResponse
)
922 class TestAdvancedQPS(DNSDistTest
):
924 _config_template
= """
925 addAction("qps.advanced.tests.powerdns.com", QPSAction(10))
926 newServer{address="127.0.0.1:%s"}
929 def testAdvancedQPSLimit(self
):
933 Send queries to "qps.advanced.tests.powerdns.com."
934 check that dnsdist drops queries when the max QPS has been reached.
937 name
= 'qps.advanced.tests.powerdns.com.'
938 query
= dns
.message
.make_query(name
, 'A', 'IN')
939 response
= dns
.message
.make_response(query
)
940 rrset
= dns
.rrset
.from_text(name
,
945 response
.answer
.append(rrset
)
947 for _
in range(maxQPS
):
948 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
949 receivedQuery
.id = query
.id
950 self
.assertEquals(query
, receivedQuery
)
951 self
.assertEquals(response
, receivedResponse
)
953 # we should now be dropped
954 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
955 self
.assertEquals(receivedResponse
, None)
959 # again, over TCP this time
960 for _
in range(maxQPS
):
961 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
962 receivedQuery
.id = query
.id
963 self
.assertEquals(query
, receivedQuery
)
964 self
.assertEquals(response
, receivedResponse
)
967 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
968 self
.assertEquals(receivedResponse
, None)
970 class TestAdvancedQPSNone(DNSDistTest
):
972 _config_template
= """
973 addAction("qpsnone.advanced.tests.powerdns.com", QPSAction(100))
974 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
975 newServer{address="127.0.0.1:%s"}
978 def testAdvancedQPSNone(self
):
980 Advanced: Not matching QPS returns None, not Allow
982 Send queries to "qps.advanced.tests.powerdns.com."
983 check that the rule returns None when the QPS has not been
986 name
= 'qpsnone.advanced.tests.powerdns.com.'
987 query
= dns
.message
.make_query(name
, 'A', 'IN')
988 expectedResponse
= dns
.message
.make_response(query
)
989 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
991 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
992 self
.assertEquals(receivedResponse
, expectedResponse
)
994 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
995 self
.assertEquals(receivedResponse
, expectedResponse
)
997 class TestAdvancedNMGRule(DNSDistTest
):
999 _config_template
= """
1001 allowed:addMask("192.0.2.1/32")
1002 addAction(NotRule(NetmaskGroupRule(allowed)), RCodeAction(dnsdist.REFUSED))
1003 newServer{address="127.0.0.1:%s"}
1006 def testAdvancedNMGRule(self
):
1008 Advanced: NMGRule should refuse our queries
1010 Send queries to "nmgrule.advanced.tests.powerdns.com.",
1011 check that we are getting a REFUSED response.
1013 name
= 'nmgrule.advanced.tests.powerdns.com.'
1014 query
= dns
.message
.make_query(name
, 'A', 'IN')
1015 expectedResponse
= dns
.message
.make_response(query
)
1016 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1018 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1019 self
.assertEquals(receivedResponse
, expectedResponse
)
1021 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1022 self
.assertEquals(receivedResponse
, expectedResponse
)
1024 class TestDSTPortRule(DNSDistTest
):
1026 _config_params
= ['_dnsDistPort', '_testServerPort']
1027 _config_template
= """
1028 addAction(DSTPortRule(%d), RCodeAction(dnsdist.REFUSED))
1029 newServer{address="127.0.0.1:%s"}
1032 def testDSTPortRule(self
):
1034 Advanced: DSTPortRule should capture our queries
1036 Send queries to "dstportrule.advanced.tests.powerdns.com.",
1037 check that we are getting a REFUSED response.
1040 name
= 'dstportrule.advanced.tests.powerdns.com.'
1041 query
= dns
.message
.make_query(name
, 'A', 'IN')
1042 expectedResponse
= dns
.message
.make_response(query
)
1043 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1045 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1046 self
.assertEquals(receivedResponse
, expectedResponse
)
1048 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1049 self
.assertEquals(receivedResponse
, expectedResponse
)
1051 class TestAdvancedLabelsCountRule(DNSDistTest
):
1053 _config_template
= """
1054 addAction(QNameLabelsCountRule(5,6), RCodeAction(dnsdist.REFUSED))
1055 newServer{address="127.0.0.1:%s"}
1058 def testAdvancedLabelsCountRule(self
):
1060 Advanced: QNameLabelsCountRule(5,6)
1062 # 6 labels, we should be fine
1063 name
= 'ok.labelscount.advanced.tests.powerdns.com.'
1064 query
= dns
.message
.make_query(name
, 'A', 'IN')
1065 response
= dns
.message
.make_response(query
)
1066 rrset
= dns
.rrset
.from_text(name
,
1071 response
.answer
.append(rrset
)
1073 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1074 self
.assertTrue(receivedQuery
)
1075 self
.assertTrue(receivedResponse
)
1076 receivedQuery
.id = query
.id
1077 self
.assertEquals(query
, receivedQuery
)
1078 self
.assertEquals(response
, receivedResponse
)
1080 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1081 self
.assertTrue(receivedQuery
)
1082 self
.assertTrue(receivedResponse
)
1083 receivedQuery
.id = query
.id
1084 self
.assertEquals(query
, receivedQuery
)
1085 self
.assertEquals(response
, receivedResponse
)
1087 # more than 6 labels, the query should be refused
1088 name
= 'not.ok.labelscount.advanced.tests.powerdns.com.'
1089 query
= dns
.message
.make_query(name
, 'A', 'IN')
1090 expectedResponse
= dns
.message
.make_response(query
)
1091 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1093 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1094 self
.assertEquals(receivedResponse
, expectedResponse
)
1096 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1097 self
.assertEquals(receivedResponse
, expectedResponse
)
1099 # less than 5 labels, the query should be refused
1100 name
= 'labelscountadvanced.tests.powerdns.com.'
1101 query
= dns
.message
.make_query(name
, 'A', 'IN')
1102 expectedResponse
= dns
.message
.make_response(query
)
1103 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1105 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1106 self
.assertEquals(receivedResponse
, expectedResponse
)
1108 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1109 self
.assertEquals(receivedResponse
, expectedResponse
)
1111 class TestAdvancedWireLengthRule(DNSDistTest
):
1113 _config_template
= """
1114 addAction(QNameWireLengthRule(54,56), RCodeAction(dnsdist.REFUSED))
1115 newServer{address="127.0.0.1:%s"}
1118 def testAdvancedWireLengthRule(self
):
1120 Advanced: QNameWireLengthRule(54,56)
1122 name
= 'longenough.qnamewirelength.advanced.tests.powerdns.com.'
1123 query
= dns
.message
.make_query(name
, 'A', 'IN')
1124 response
= dns
.message
.make_response(query
)
1125 rrset
= dns
.rrset
.from_text(name
,
1130 response
.answer
.append(rrset
)
1132 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1133 self
.assertTrue(receivedQuery
)
1134 self
.assertTrue(receivedResponse
)
1135 receivedQuery
.id = query
.id
1136 self
.assertEquals(query
, receivedQuery
)
1137 self
.assertEquals(response
, receivedResponse
)
1139 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1140 self
.assertTrue(receivedQuery
)
1141 self
.assertTrue(receivedResponse
)
1142 receivedQuery
.id = query
.id
1143 self
.assertEquals(query
, receivedQuery
)
1144 self
.assertEquals(response
, receivedResponse
)
1146 # too short, the query should be refused
1147 name
= 'short.qnamewirelength.advanced.tests.powerdns.com.'
1148 query
= dns
.message
.make_query(name
, 'A', 'IN')
1149 expectedResponse
= dns
.message
.make_response(query
)
1150 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1152 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1153 self
.assertEquals(receivedResponse
, expectedResponse
)
1155 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1156 self
.assertEquals(receivedResponse
, expectedResponse
)
1158 # too long, the query should be refused
1159 name
= 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
1160 query
= dns
.message
.make_query(name
, 'A', 'IN')
1161 expectedResponse
= dns
.message
.make_response(query
)
1162 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1164 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1165 self
.assertEquals(receivedResponse
, expectedResponse
)
1167 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1168 self
.assertEquals(receivedResponse
, expectedResponse
)
1170 class TestAdvancedIncludeDir(DNSDistTest
):
1172 _config_template
= """
1173 -- this directory contains a file allowing includedir.advanced.tests.powerdns.com.
1174 includeDirectory('test-include-dir')
1175 newServer{address="127.0.0.1:%s"}
1178 def testAdvancedIncludeDirAllowed(self
):
1180 Advanced: includeDirectory()
1182 name
= 'includedir.advanced.tests.powerdns.com.'
1183 query
= dns
.message
.make_query(name
, 'A', 'IN')
1184 response
= dns
.message
.make_response(query
)
1185 rrset
= dns
.rrset
.from_text(name
,
1190 response
.answer
.append(rrset
)
1192 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1193 self
.assertTrue(receivedQuery
)
1194 self
.assertTrue(receivedResponse
)
1195 receivedQuery
.id = query
.id
1196 self
.assertEquals(query
, receivedQuery
)
1197 self
.assertEquals(response
, receivedResponse
)
1199 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1200 self
.assertTrue(receivedQuery
)
1201 self
.assertTrue(receivedResponse
)
1202 receivedQuery
.id = query
.id
1203 self
.assertEquals(query
, receivedQuery
)
1204 self
.assertEquals(response
, receivedResponse
)
1206 # this one should be refused
1207 name
= 'notincludedir.advanced.tests.powerdns.com.'
1208 query
= dns
.message
.make_query(name
, 'A', 'IN')
1209 expectedResponse
= dns
.message
.make_response(query
)
1210 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1212 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1213 self
.assertEquals(receivedResponse
, expectedResponse
)
1215 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1216 self
.assertEquals(receivedResponse
, expectedResponse
)
1218 class TestAdvancedLuaDO(DNSDistTest
):
1220 _config_template
= """
1221 function nxDOLua(dq)
1223 return DNSAction.Nxdomain, ""
1225 return DNSAction.None, ""
1227 addAction(AllRule(), LuaAction(nxDOLua))
1228 newServer{address="127.0.0.1:%s"}
1231 def testNxDOViaLua(self
):
1233 Advanced: Nx DO queries via Lua
1235 name
= 'nxdo.advanced.tests.powerdns.com.'
1236 query
= dns
.message
.make_query(name
, 'A', 'IN')
1237 response
= dns
.message
.make_response(query
)
1238 rrset
= dns
.rrset
.from_text(name
,
1243 response
.answer
.append(rrset
)
1244 queryWithDO
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
1245 doResponse
= dns
.message
.make_response(queryWithDO
)
1246 doResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
1249 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1250 self
.assertTrue(receivedQuery
)
1251 self
.assertTrue(receivedResponse
)
1252 receivedQuery
.id = query
.id
1253 self
.assertEquals(query
, receivedQuery
)
1254 self
.assertEquals(receivedResponse
, response
)
1256 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1257 self
.assertTrue(receivedQuery
)
1258 self
.assertTrue(receivedResponse
)
1259 receivedQuery
.id = query
.id
1260 self
.assertEquals(query
, receivedQuery
)
1261 self
.assertEquals(receivedResponse
, response
)
1264 (_
, receivedResponse
) = self
.sendUDPQuery(queryWithDO
, response
=None, useQueue
=False)
1265 self
.assertTrue(receivedResponse
)
1266 doResponse
.id = receivedResponse
.id
1267 self
.assertEquals(receivedResponse
, doResponse
)
1269 (_
, receivedResponse
) = self
.sendTCPQuery(queryWithDO
, response
=None, useQueue
=False)
1270 self
.assertTrue(receivedResponse
)
1271 doResponse
.id = receivedResponse
.id
1272 self
.assertEquals(receivedResponse
, doResponse
)
1274 class TestAdvancedLuaRefused(DNSDistTest
):
1276 _config_template
= """
1278 return DNSAction.Refused, ""
1280 addAction(AllRule(), LuaAction(refuse))
1281 newServer{address="127.0.0.1:%s"}
1284 def testRefusedViaLua(self
):
1286 Advanced: Refused via Lua
1288 name
= 'refused.advanced.tests.powerdns.com.'
1289 query
= dns
.message
.make_query(name
, 'A', 'IN')
1290 response
= dns
.message
.make_response(query
)
1291 rrset
= dns
.rrset
.from_text(name
,
1296 response
.answer
.append(rrset
)
1297 refusedResponse
= dns
.message
.make_response(query
)
1298 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1300 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1301 self
.assertTrue(receivedResponse
)
1302 refusedResponse
.id = receivedResponse
.id
1303 self
.assertEquals(receivedResponse
, refusedResponse
)
1305 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1306 self
.assertTrue(receivedResponse
)
1307 refusedResponse
.id = receivedResponse
.id
1308 self
.assertEquals(receivedResponse
, refusedResponse
)
1310 class TestAdvancedLuaActionReturnSyntax(DNSDistTest
):
1312 _config_template
= """
1314 return DNSAction.Refused
1316 addAction(AllRule(), LuaAction(refuse))
1317 newServer{address="127.0.0.1:%s"}
1320 def testRefusedWithEmptyRule(self
):
1322 Advanced: Short syntax for LuaAction return values
1324 name
= 'short.refused.advanced.tests.powerdns.com.'
1325 query
= dns
.message
.make_query(name
, 'A', 'IN')
1326 response
= dns
.message
.make_response(query
)
1327 rrset
= dns
.rrset
.from_text(name
,
1332 response
.answer
.append(rrset
)
1333 refusedResponse
= dns
.message
.make_response(query
)
1334 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1336 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1337 self
.assertTrue(receivedResponse
)
1338 refusedResponse
.id = receivedResponse
.id
1339 self
.assertEquals(receivedResponse
, refusedResponse
)
1341 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1342 self
.assertTrue(receivedResponse
)
1343 refusedResponse
.id = receivedResponse
.id
1344 self
.assertEquals(receivedResponse
, refusedResponse
)
1346 class TestAdvancedLuaTruncated(DNSDistTest
):
1348 _config_template
= """
1351 return DNSAction.Truncate, ""
1353 return DNSAction.None, ""
1355 addAction(AllRule(), LuaAction(trunc))
1356 newServer{address="127.0.0.1:%s"}
1359 def testTCViaLua(self
):
1361 Advanced: TC via Lua
1363 name
= 'tc.advanced.tests.powerdns.com.'
1364 query
= dns
.message
.make_query(name
, 'A', 'IN')
1365 response
= dns
.message
.make_response(query
)
1366 rrset
= dns
.rrset
.from_text(name
,
1371 response
.answer
.append(rrset
)
1373 truncatedResponse
= dns
.message
.make_response(query
)
1374 truncatedResponse
.flags |
= dns
.flags
.TC
1376 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1377 self
.assertTrue(receivedResponse
)
1378 truncatedResponse
.id = receivedResponse
.id
1379 self
.assertEquals(receivedResponse
, truncatedResponse
)
1381 # no truncation over TCP
1382 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1383 self
.assertTrue(receivedQuery
)
1384 self
.assertTrue(receivedResponse
)
1385 receivedQuery
.id = query
.id
1386 self
.assertEquals(query
, receivedQuery
)
1387 self
.assertEquals(receivedResponse
, response
)
1389 class TestStatNodeRespRingSince(DNSDistTest
):
1391 _consoleKey
= DNSDistTest
.generateConsoleKey()
1392 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
1393 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort']
1394 _config_template
= """
1396 controlSocket("127.0.0.1:%s")
1397 s1 = newServer{address="127.0.0.1:%s"}
1399 function visitor(node, self, childstat)
1400 table.insert(nodesSeen, node.fullname)
1404 def testStatNodeRespRingSince(self
):
1406 Advanced: StatNodeRespRing with optional since parameter
1409 name
= 'statnodesince.advanced.tests.powerdns.com.'
1410 query
= dns
.message
.make_query(name
, 'A', 'IN')
1411 response
= dns
.message
.make_response(query
)
1412 rrset
= dns
.rrset
.from_text(name
,
1417 response
.answer
.append(rrset
)
1419 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1420 self
.assertTrue(receivedQuery
)
1421 self
.assertTrue(receivedResponse
)
1422 receivedQuery
.id = query
.id
1423 self
.assertEquals(query
, receivedQuery
)
1424 self
.assertEquals(response
, receivedResponse
)
1426 self
.sendConsoleCommand("nodesSeen = {}")
1427 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1428 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1429 nodes
= nodes
.strip("\n")
1430 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1431 advanced.tests.powerdns.com.
1436 self
.sendConsoleCommand("nodesSeen = {}")
1437 self
.sendConsoleCommand("statNodeRespRing(visitor, 0)")
1438 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1439 nodes
= nodes
.strip("\n")
1440 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1441 advanced.tests.powerdns.com.
1448 self
.sendConsoleCommand("nodesSeen = {}")
1449 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1450 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1451 nodes
= nodes
.strip("\n")
1452 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1453 advanced.tests.powerdns.com.
1458 self
.sendConsoleCommand("nodesSeen = {}")
1459 self
.sendConsoleCommand("statNodeRespRing(visitor, 5)")
1460 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1461 nodes
= nodes
.strip("\n")
1462 self
.assertEquals(nodes
, """""")
1464 self
.sendConsoleCommand("nodesSeen = {}")
1465 self
.sendConsoleCommand("statNodeRespRing(visitor, 10)")
1466 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1467 nodes
= nodes
.strip("\n")
1468 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1469 advanced.tests.powerdns.com.
1474 class TestAdvancedRD(DNSDistTest
):
1476 _config_template
= """
1477 addAction(RDRule(), RCodeAction(dnsdist.REFUSED))
1478 newServer{address="127.0.0.1:%s"}
1481 def testAdvancedRDRefused(self
):
1483 Advanced: RD query is refused
1485 name
= 'rd.advanced.tests.powerdns.com.'
1486 query
= dns
.message
.make_query(name
, 'A', 'IN')
1487 expectedResponse
= dns
.message
.make_response(query
)
1488 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1490 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1491 self
.assertEquals(receivedResponse
, expectedResponse
)
1493 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1494 self
.assertEquals(receivedResponse
, expectedResponse
)
1496 def testAdvancedNoRDAllowed(self
):
1498 Advanced: No-RD query is allowed
1500 name
= 'no-rd.advanced.tests.powerdns.com.'
1501 query
= dns
.message
.make_query(name
, 'A', 'IN')
1502 query
.flags
&= ~dns
.flags
.RD
1503 response
= dns
.message
.make_response(query
)
1505 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1506 receivedQuery
.id = query
.id
1507 self
.assertEquals(receivedQuery
, query
)
1508 self
.assertEquals(receivedResponse
, response
)
1510 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1511 receivedQuery
.id = query
.id
1512 self
.assertEquals(receivedQuery
, query
)
1513 self
.assertEquals(receivedResponse
, response
)
1515 class TestAdvancedGetLocalPort(DNSDistTest
):
1517 _config_template
= """
1518 function answerBasedOnLocalPort(dq)
1519 local port = dq.localaddr:getPort()
1520 return DNSAction.Spoof, "port-was-"..port..".local-port.advanced.tests.powerdns.com."
1522 addAction("local-port.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1523 newServer{address="127.0.0.1:%s"}
1526 def testAdvancedGetLocalPort(self
):
1528 Advanced: Return CNAME containing the local port
1530 name
= 'local-port.advanced.tests.powerdns.com.'
1531 query
= dns
.message
.make_query(name
, 'A', 'IN')
1532 # dnsdist set RA = RD for spoofed responses
1533 query
.flags
&= ~dns
.flags
.RD
1535 response
= dns
.message
.make_response(query
)
1536 rrset
= dns
.rrset
.from_text(name
,
1539 dns
.rdatatype
.CNAME
,
1540 'port-was-{}.local-port.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1541 response
.answer
.append(rrset
)
1543 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1544 self
.assertEquals(receivedResponse
, response
)
1546 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1547 self
.assertEquals(receivedResponse
, response
)
1549 class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest
):
1551 _config_template
= """
1552 function answerBasedOnLocalPort(dq)
1553 local port = dq.localaddr:getPort()
1554 return DNSAction.Spoof, "port-was-"..port..".local-port-any.advanced.tests.powerdns.com."
1556 addAction("local-port-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1557 newServer{address="127.0.0.1:%s"}
1559 _dnsDistListeningAddr
= "0.0.0.0"
1561 def testAdvancedGetLocalPortOnAnyBind(self
):
1563 Advanced: Return CNAME containing the local port for an ANY bind
1565 name
= 'local-port-any.advanced.tests.powerdns.com.'
1566 query
= dns
.message
.make_query(name
, 'A', 'IN')
1567 # dnsdist set RA = RD for spoofed responses
1568 query
.flags
&= ~dns
.flags
.RD
1570 response
= dns
.message
.make_response(query
)
1571 rrset
= dns
.rrset
.from_text(name
,
1574 dns
.rdatatype
.CNAME
,
1575 'port-was-{}.local-port-any.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1576 response
.answer
.append(rrset
)
1578 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1579 self
.assertEquals(receivedResponse
, response
)
1581 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1582 self
.assertEquals(receivedResponse
, response
)
1584 class TestAdvancedLuaTempFailureTTL(DNSDistTest
):
1586 _config_template
= """
1587 function testAction(dq)
1588 if dq.tempFailureTTL ~= nil then
1589 return DNSAction.Spoof, "initially.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1591 dq.tempFailureTTL = 30
1592 if dq.tempFailureTTL ~= 30 then
1593 return DNSAction.Spoof, "after.set.not.expected.value.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1595 dq.tempFailureTTL = nil
1596 if dq.tempFailureTTL ~= nil then
1597 return DNSAction.Spoof, "after.unset.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1599 return DNSAction.None, ""
1601 addAction(AllRule(), LuaAction(testAction))
1602 newServer{address="127.0.0.1:%s"}
1605 def testTempFailureTTLBinding(self
):
1607 Exercise dq.tempFailureTTL Lua binding
1609 name
= 'tempfailurettlbinding.advanced.tests.powerdns.com.'
1610 query
= dns
.message
.make_query(name
, 'A', 'IN')
1611 response
= dns
.message
.make_response(query
)
1612 rrset
= dns
.rrset
.from_text(name
,
1617 response
.answer
.append(rrset
)
1619 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1620 self
.assertTrue(receivedQuery
)
1621 self
.assertTrue(receivedResponse
)
1622 receivedQuery
.id = query
.id
1623 self
.assertEquals(query
, receivedQuery
)
1624 self
.assertEquals(receivedResponse
, response
)