]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Advanced.py
3 from datetime
import datetime
, timedelta
8 from dnsdisttests
import DNSDistTest
10 class TestAdvancedAllow(DNSDistTest
):
12 _config_template
= """
13 addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
14 addAction(AllRule(), DropAction())
15 newServer{address="127.0.0.1:%s"}
18 def testAdvancedAllow(self
):
20 Advanced: Allowed qname is not dropped
22 A query for allowed.advanced.tests.powerdns.com. should be allowed
23 while others should be dropped.
25 name
= 'allowed.advanced.tests.powerdns.com.'
26 query
= dns
.message
.make_query(name
, 'A', 'IN')
27 response
= dns
.message
.make_response(query
)
28 rrset
= dns
.rrset
.from_text(name
,
33 response
.answer
.append(rrset
)
35 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
36 self
.assertTrue(receivedQuery
)
37 self
.assertTrue(receivedResponse
)
38 receivedQuery
.id = query
.id
39 self
.assertEquals(query
, receivedQuery
)
40 self
.assertEquals(response
, receivedResponse
)
42 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
43 self
.assertTrue(receivedQuery
)
44 self
.assertTrue(receivedResponse
)
45 receivedQuery
.id = query
.id
46 self
.assertEquals(query
, receivedQuery
)
47 self
.assertEquals(response
, receivedResponse
)
49 def testAdvancedAllowDropped(self
):
51 Advanced: Not allowed qname is dropped
53 A query for notallowed.advanced.tests.powerdns.com. should be dropped.
55 name
= 'notallowed.advanced.tests.powerdns.com.'
56 query
= dns
.message
.make_query(name
, 'A', 'IN')
57 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
58 self
.assertEquals(receivedResponse
, None)
60 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
61 self
.assertEquals(receivedResponse
, None)
63 class TestAdvancedFixupCase(DNSDistTest
):
65 _config_template
= """
68 newServer{address="127.0.0.1:%s"}
71 def testAdvancedFixupCase(self
):
75 Send a query with lower and upper chars,
76 make the backend return a lowercase version,
77 check that dnsdist fixes the response.
79 name
= 'fiXuPCasE.advanced.tests.powerdns.com.'
80 query
= dns
.message
.make_query(name
, 'A', 'IN')
81 lowercasequery
= dns
.message
.make_query(name
.lower(), 'A', 'IN')
82 response
= dns
.message
.make_response(lowercasequery
)
83 expectedResponse
= dns
.message
.make_response(query
)
84 rrset
= dns
.rrset
.from_text(name
,
89 response
.answer
.append(rrset
)
90 expectedResponse
.answer
.append(rrset
)
92 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
93 self
.assertTrue(receivedQuery
)
94 self
.assertTrue(receivedResponse
)
95 receivedQuery
.id = query
.id
96 self
.assertEquals(query
, receivedQuery
)
97 self
.assertEquals(expectedResponse
, receivedResponse
)
99 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
100 self
.assertTrue(receivedQuery
)
101 self
.assertTrue(receivedResponse
)
102 receivedQuery
.id = query
.id
103 self
.assertEquals(query
, receivedQuery
)
104 self
.assertEquals(expectedResponse
, receivedResponse
)
107 class TestAdvancedRemoveRD(DNSDistTest
):
109 _config_template
= """
110 addAction("norecurse.advanced.tests.powerdns.com.", NoRecurseAction())
111 newServer{address="127.0.0.1:%s"}
114 def testAdvancedNoRD(self
):
118 Send a query with RD,
119 check that dnsdist clears the RD flag.
121 name
= 'norecurse.advanced.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'A', 'IN')
123 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
124 expectedQuery
.flags
&= ~dns
.flags
.RD
126 response
= dns
.message
.make_response(query
)
127 rrset
= dns
.rrset
.from_text(name
,
132 response
.answer
.append(rrset
)
134 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
135 self
.assertTrue(receivedQuery
)
136 self
.assertTrue(receivedResponse
)
137 receivedQuery
.id = expectedQuery
.id
138 self
.assertEquals(expectedQuery
, receivedQuery
)
139 self
.assertEquals(response
, receivedResponse
)
141 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
142 self
.assertTrue(receivedQuery
)
143 self
.assertTrue(receivedResponse
)
144 receivedQuery
.id = expectedQuery
.id
145 self
.assertEquals(expectedQuery
, receivedQuery
)
146 self
.assertEquals(response
, receivedResponse
)
148 def testAdvancedKeepRD(self
):
150 Advanced: No RD canary
152 Send a query with RD for a canary domain,
153 check that dnsdist does not clear the RD flag.
155 name
= 'keeprecurse.advanced.tests.powerdns.com.'
156 query
= dns
.message
.make_query(name
, 'A', 'IN')
158 response
= dns
.message
.make_response(query
)
159 rrset
= dns
.rrset
.from_text(name
,
164 response
.answer
.append(rrset
)
166 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
167 self
.assertTrue(receivedQuery
)
168 self
.assertTrue(receivedResponse
)
169 receivedQuery
.id = query
.id
170 self
.assertEquals(query
, receivedQuery
)
171 self
.assertEquals(response
, receivedResponse
)
173 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
174 self
.assertTrue(receivedQuery
)
175 self
.assertTrue(receivedResponse
)
176 receivedQuery
.id = query
.id
177 self
.assertEquals(query
, receivedQuery
)
178 self
.assertEquals(response
, receivedResponse
)
181 class TestAdvancedAddCD(DNSDistTest
):
183 _config_template
= """
184 addAction("setcd.advanced.tests.powerdns.com.", DisableValidationAction())
185 addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
186 newServer{address="127.0.0.1:%s"}
189 def testAdvancedSetCD(self
):
193 Send a query with CD cleared,
194 check that dnsdist set the CD flag.
196 name
= 'setcd.advanced.tests.powerdns.com.'
197 query
= dns
.message
.make_query(name
, 'A', 'IN')
198 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
199 expectedQuery
.flags |
= dns
.flags
.CD
201 response
= dns
.message
.make_response(query
)
202 rrset
= dns
.rrset
.from_text(name
,
207 response
.answer
.append(rrset
)
209 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
210 self
.assertTrue(receivedQuery
)
211 self
.assertTrue(receivedResponse
)
212 receivedQuery
.id = expectedQuery
.id
213 self
.assertEquals(expectedQuery
, receivedQuery
)
214 self
.assertEquals(response
, receivedResponse
)
216 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
217 self
.assertTrue(receivedQuery
)
218 self
.assertTrue(receivedResponse
)
219 receivedQuery
.id = expectedQuery
.id
220 self
.assertEquals(expectedQuery
, receivedQuery
)
221 self
.assertEquals(response
, receivedResponse
)
223 def testAdvancedSetCDViaAction(self
):
225 Advanced: Set CD via Action
227 Send a query with CD cleared,
228 check that dnsdist set the CD flag.
230 name
= 'setcdviaaction.advanced.tests.powerdns.com.'
231 query
= dns
.message
.make_query(name
, 'A', 'IN')
232 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
233 expectedQuery
.flags |
= dns
.flags
.CD
235 response
= dns
.message
.make_response(query
)
236 rrset
= dns
.rrset
.from_text(name
,
241 response
.answer
.append(rrset
)
243 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
244 self
.assertTrue(receivedQuery
)
245 self
.assertTrue(receivedResponse
)
246 receivedQuery
.id = expectedQuery
.id
247 self
.assertEquals(expectedQuery
, receivedQuery
)
248 self
.assertEquals(response
, receivedResponse
)
250 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
251 self
.assertTrue(receivedQuery
)
252 self
.assertTrue(receivedResponse
)
253 receivedQuery
.id = expectedQuery
.id
254 self
.assertEquals(expectedQuery
, receivedQuery
)
255 self
.assertEquals(response
, receivedResponse
)
257 def testAdvancedKeepNoCD(self
):
259 Advanced: Preserve CD canary
261 Send a query without CD for a canary domain,
262 check that dnsdist does not set the CD flag.
264 name
= 'keepnocd.advanced.tests.powerdns.com.'
265 query
= dns
.message
.make_query(name
, 'A', 'IN')
267 response
= dns
.message
.make_response(query
)
268 rrset
= dns
.rrset
.from_text(name
,
273 response
.answer
.append(rrset
)
275 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
276 self
.assertTrue(receivedQuery
)
277 self
.assertTrue(receivedResponse
)
278 receivedQuery
.id = query
.id
279 self
.assertEquals(query
, receivedQuery
)
280 self
.assertEquals(response
, receivedResponse
)
282 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
283 self
.assertTrue(receivedQuery
)
284 self
.assertTrue(receivedResponse
)
285 receivedQuery
.id = query
.id
286 self
.assertEquals(query
, receivedQuery
)
287 self
.assertEquals(response
, receivedResponse
)
289 class TestAdvancedClearRD(DNSDistTest
):
291 _config_template
= """
292 addAction("clearrd.advanced.tests.powerdns.com.", NoRecurseAction())
293 addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
294 newServer{address="127.0.0.1:%s"}
297 def testAdvancedClearRD(self
):
301 Send a query with RD set,
302 check that dnsdist clears the RD flag.
304 name
= 'clearrd.advanced.tests.powerdns.com.'
305 query
= dns
.message
.make_query(name
, 'A', 'IN')
306 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
307 expectedQuery
.flags
&= ~dns
.flags
.RD
309 response
= dns
.message
.make_response(query
)
310 rrset
= dns
.rrset
.from_text(name
,
315 response
.answer
.append(rrset
)
317 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
318 self
.assertTrue(receivedQuery
)
319 self
.assertTrue(receivedResponse
)
320 receivedQuery
.id = expectedQuery
.id
321 self
.assertEquals(expectedQuery
, receivedQuery
)
322 self
.assertEquals(response
, receivedResponse
)
324 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
325 self
.assertTrue(receivedQuery
)
326 self
.assertTrue(receivedResponse
)
327 receivedQuery
.id = expectedQuery
.id
328 self
.assertEquals(expectedQuery
, receivedQuery
)
329 self
.assertEquals(response
, receivedResponse
)
331 def testAdvancedClearRDViaAction(self
):
333 Advanced: Clear RD via Action
335 Send a query with RD set,
336 check that dnsdist clears the RD flag.
338 name
= 'clearrdviaaction.advanced.tests.powerdns.com.'
339 query
= dns
.message
.make_query(name
, 'A', 'IN')
340 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
341 expectedQuery
.flags
&= ~dns
.flags
.RD
343 response
= dns
.message
.make_response(query
)
344 rrset
= dns
.rrset
.from_text(name
,
349 response
.answer
.append(rrset
)
351 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
352 self
.assertTrue(receivedQuery
)
353 self
.assertTrue(receivedResponse
)
354 receivedQuery
.id = expectedQuery
.id
355 self
.assertEquals(expectedQuery
, receivedQuery
)
356 self
.assertEquals(response
, receivedResponse
)
358 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
359 self
.assertTrue(receivedQuery
)
360 self
.assertTrue(receivedResponse
)
361 receivedQuery
.id = expectedQuery
.id
362 self
.assertEquals(expectedQuery
, receivedQuery
)
363 self
.assertEquals(response
, receivedResponse
)
365 def testAdvancedKeepRD(self
):
367 Advanced: Preserve RD canary
369 Send a query with RD for a canary domain,
370 check that dnsdist does not clear the RD flag.
372 name
= 'keeprd.advanced.tests.powerdns.com.'
373 query
= dns
.message
.make_query(name
, 'A', 'IN')
375 response
= dns
.message
.make_response(query
)
376 rrset
= dns
.rrset
.from_text(name
,
381 response
.answer
.append(rrset
)
383 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
384 self
.assertTrue(receivedQuery
)
385 self
.assertTrue(receivedResponse
)
386 receivedQuery
.id = query
.id
387 self
.assertEquals(query
, receivedQuery
)
388 self
.assertEquals(response
, receivedResponse
)
390 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
391 self
.assertTrue(receivedQuery
)
392 self
.assertTrue(receivedResponse
)
393 receivedQuery
.id = query
.id
394 self
.assertEquals(query
, receivedQuery
)
395 self
.assertEquals(response
, receivedResponse
)
398 class TestAdvancedACL(DNSDistTest
):
400 _config_template
= """
401 newServer{address="127.0.0.1:%s"}
403 _acl
= ['192.0.2.1/32']
405 def testACLBlocked(self
):
407 Advanced: ACL blocked
409 Send an A query to "tests.powerdns.com.",
410 we expect no response since 127.0.0.1 is not on the
413 name
= 'tests.powerdns.com.'
414 query
= dns
.message
.make_query(name
, 'A', 'IN')
416 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
417 self
.assertEquals(receivedResponse
, None)
419 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
420 self
.assertEquals(receivedResponse
, None)
422 class TestAdvancedDelay(DNSDistTest
):
424 _config_template
= """
425 addAction(AllRule(), DelayAction(1000))
426 newServer{address="127.0.0.1:%s"}
429 def testDelayed(self
):
433 Send an A query to "tests.powerdns.com.",
434 check that the response delay is longer than 1000 ms
435 over UDP, less than that over TCP.
437 name
= 'tests.powerdns.com.'
438 query
= dns
.message
.make_query(name
, 'A', 'IN')
439 response
= dns
.message
.make_response(query
)
440 rrset
= dns
.rrset
.from_text(name
,
445 response
.answer
.append(rrset
)
447 begin
= datetime
.now()
448 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
450 receivedQuery
.id = query
.id
451 self
.assertEquals(query
, receivedQuery
)
452 self
.assertEquals(response
, receivedResponse
)
453 self
.assertTrue((end
- begin
) > timedelta(0, 1))
455 begin
= datetime
.now()
456 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
458 receivedQuery
.id = query
.id
459 self
.assertEquals(query
, receivedQuery
)
460 self
.assertEquals(response
, receivedResponse
)
461 self
.assertTrue((end
- begin
) < timedelta(0, 1))
464 class TestAdvancedTruncateAnyAndTCP(DNSDistTest
):
466 _config_template
= """
468 addAction(AndRule({QTypeRule("ANY"), TCPRule(true)}), TCAction())
469 newServer{address="127.0.0.1:%s"}
471 def testTruncateAnyOverTCP(self
):
473 Advanced: Truncate ANY over TCP
475 Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
476 should be truncated over TCP, not over UDP (yes, it makes no sense,
479 name
= 'anytruncatetcp.advanced.tests.powerdns.com.'
480 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
482 response
= dns
.message
.make_response(query
)
483 rrset
= dns
.rrset
.from_text(name
,
489 response
.answer
.append(rrset
)
491 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
492 self
.assertTrue(receivedQuery
)
493 self
.assertTrue(receivedResponse
)
494 receivedQuery
.id = query
.id
495 self
.assertEquals(query
, receivedQuery
)
496 self
.assertEquals(receivedResponse
, response
)
498 expectedResponse
= dns
.message
.make_response(query
)
499 expectedResponse
.flags |
= dns
.flags
.TC
501 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
502 self
.assertEquals(receivedResponse
, expectedResponse
)
504 class TestAdvancedAndNot(DNSDistTest
):
506 _config_template
= """
507 addAction(AndRule({NotRule(QTypeRule("A")), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
508 newServer{address="127.0.0.1:%s"}
510 def testAOverUDPReturnsNotImplementedCanary(self
):
512 Advanced: !A && UDP canary
514 dnsdist is configured to reply 'not implemented' for query
515 over UDP AND !qtype A.
516 We send an A query over UDP and TCP, and check that the
519 name
= 'andnot.advanced.tests.powerdns.com.'
520 query
= dns
.message
.make_query(name
, 'A', 'IN')
521 response
= dns
.message
.make_response(query
)
522 rrset
= dns
.rrset
.from_text(name
,
527 response
.answer
.append(rrset
)
529 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
530 self
.assertTrue(receivedQuery
)
531 self
.assertTrue(receivedResponse
)
532 receivedQuery
.id = query
.id
533 self
.assertEquals(query
, receivedQuery
)
534 self
.assertEquals(receivedResponse
, response
)
536 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
537 self
.assertTrue(receivedQuery
)
538 self
.assertTrue(receivedResponse
)
539 receivedQuery
.id = query
.id
540 self
.assertEquals(query
, receivedQuery
)
541 self
.assertEquals(receivedResponse
, response
)
543 def testAOverUDPReturnsNotImplemented(self
):
547 dnsdist is configured to reply 'not implemented' for query
548 over UDP AND !qtype A.
549 We send a TXT query over UDP and TCP, and check that the
550 response is OK for TCP and 'not implemented' for UDP.
552 name
= 'andnot.advanced.tests.powerdns.com.'
553 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
555 expectedResponse
= dns
.message
.make_response(query
)
556 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
558 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
559 self
.assertEquals(receivedResponse
, expectedResponse
)
561 response
= dns
.message
.make_response(query
)
562 rrset
= dns
.rrset
.from_text(name
,
566 'nothing to see here')
567 response
.answer
.append(rrset
)
569 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
570 self
.assertTrue(receivedQuery
)
571 self
.assertTrue(receivedResponse
)
572 receivedQuery
.id = query
.id
573 self
.assertEquals(query
, receivedQuery
)
574 self
.assertEquals(receivedResponse
, response
)
576 class TestAdvancedOr(DNSDistTest
):
578 _config_template
= """
579 addAction(OrRule({QTypeRule("A"), TCPRule(false)}), RCodeAction(dnsdist.NOTIMP))
580 newServer{address="127.0.0.1:%s"}
582 def testAAAAOverUDPReturnsNotImplemented(self
):
584 Advanced: A || UDP: AAAA
586 dnsdist is configured to reply 'not implemented' for query
588 We send an AAAA query over UDP and TCP, and check that the
589 response is 'not implemented' for UDP and OK for TCP.
591 name
= 'aorudp.advanced.tests.powerdns.com.'
592 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
593 response
= dns
.message
.make_response(query
)
594 rrset
= dns
.rrset
.from_text(name
,
599 response
.answer
.append(rrset
)
601 expectedResponse
= dns
.message
.make_response(query
)
602 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
604 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
605 self
.assertEquals(receivedResponse
, expectedResponse
)
607 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
608 self
.assertTrue(receivedQuery
)
609 self
.assertTrue(receivedResponse
)
610 receivedQuery
.id = query
.id
611 self
.assertEquals(query
, receivedQuery
)
612 self
.assertEquals(receivedResponse
, response
)
614 def testAOverUDPReturnsNotImplemented(self
):
616 Advanced: A || UDP: A
618 dnsdist is configured to reply 'not implemented' for query
620 We send an A query over UDP and TCP, and check that the
621 response is 'not implemented' for both.
623 name
= 'aorudp.advanced.tests.powerdns.com.'
624 query
= dns
.message
.make_query(name
, 'A', 'IN')
626 expectedResponse
= dns
.message
.make_response(query
)
627 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
629 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
630 self
.assertEquals(receivedResponse
, expectedResponse
)
632 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
633 self
.assertEquals(receivedResponse
, expectedResponse
)
636 class TestAdvancedLogAction(DNSDistTest
):
638 _config_template
= """
639 newServer{address="127.0.0.1:%s"}
640 addAction(AllRule(), LogAction("dnsdist.log", false))
642 def testAdvancedLogAction(self
):
644 Advanced: Log all queries
648 name
= 'logaction.advanced.tests.powerdns.com.'
649 query
= dns
.message
.make_query(name
, 'A', 'IN')
650 response
= dns
.message
.make_response(query
)
651 rrset
= dns
.rrset
.from_text(name
,
656 response
.answer
.append(rrset
)
658 for _
in range(count
):
659 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
660 self
.assertTrue(receivedQuery
)
661 self
.assertTrue(receivedResponse
)
662 receivedQuery
.id = query
.id
663 self
.assertEquals(query
, receivedQuery
)
664 self
.assertEquals(response
, receivedResponse
)
666 self
.assertTrue(os
.path
.isfile('dnsdist.log'))
667 self
.assertTrue(os
.stat('dnsdist.log').st_size
> 0)
669 class TestAdvancedDNSSEC(DNSDistTest
):
671 _config_template
= """
672 newServer{address="127.0.0.1:%s"}
673 addAction(DNSSECRule(), DropAction())
675 def testAdvancedDNSSECDrop(self
):
677 Advanced: DNSSEC Rule
680 name
= 'dnssec.advanced.tests.powerdns.com.'
681 query
= dns
.message
.make_query(name
, 'A', 'IN')
682 doquery
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
683 response
= dns
.message
.make_response(query
)
684 rrset
= dns
.rrset
.from_text(name
,
689 response
.answer
.append(rrset
)
691 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
692 self
.assertTrue(receivedQuery
)
693 self
.assertTrue(receivedResponse
)
694 receivedQuery
.id = query
.id
695 self
.assertEquals(query
, receivedQuery
)
696 self
.assertEquals(response
, receivedResponse
)
698 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
699 self
.assertTrue(receivedQuery
)
700 self
.assertTrue(receivedResponse
)
701 receivedQuery
.id = query
.id
702 self
.assertEquals(query
, receivedQuery
)
703 self
.assertEquals(response
, receivedResponse
)
705 (_
, receivedResponse
) = self
.sendUDPQuery(doquery
, response
)
706 self
.assertEquals(receivedResponse
, None)
707 (_
, receivedResponse
) = self
.sendTCPQuery(doquery
, response
)
708 self
.assertEquals(receivedResponse
, None)
710 class TestAdvancedQClass(DNSDistTest
):
712 _config_template
= """
713 newServer{address="127.0.0.1:%s"}
714 addAction(QClassRule(DNSClass.CHAOS), DropAction())
716 def testAdvancedQClassChaosDrop(self
):
718 Advanced: Drop QClass CHAOS
721 name
= 'qclasschaos.advanced.tests.powerdns.com.'
722 query
= dns
.message
.make_query(name
, 'TXT', 'CHAOS')
724 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
725 self
.assertEquals(receivedResponse
, None)
726 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
727 self
.assertEquals(receivedResponse
, None)
729 def testAdvancedQClassINAllow(self
):
731 Advanced: Allow QClass IN
734 name
= 'qclassin.advanced.tests.powerdns.com.'
735 query
= dns
.message
.make_query(name
, 'A', 'IN')
736 response
= dns
.message
.make_response(query
)
737 rrset
= dns
.rrset
.from_text(name
,
742 response
.answer
.append(rrset
)
744 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
745 self
.assertTrue(receivedQuery
)
746 self
.assertTrue(receivedResponse
)
747 receivedQuery
.id = query
.id
748 self
.assertEquals(query
, receivedQuery
)
749 self
.assertEquals(response
, receivedResponse
)
751 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
752 self
.assertTrue(receivedQuery
)
753 self
.assertTrue(receivedResponse
)
754 receivedQuery
.id = query
.id
755 self
.assertEquals(query
, receivedQuery
)
756 self
.assertEquals(response
, receivedResponse
)
758 class TestAdvancedOpcode(DNSDistTest
):
760 _config_template
= """
761 newServer{address="127.0.0.1:%s"}
762 addAction(OpcodeRule(DNSOpcode.Notify), DropAction())
764 def testAdvancedOpcodeNotifyDrop(self
):
766 Advanced: Drop Opcode NOTIFY
769 name
= 'opcodenotify.advanced.tests.powerdns.com.'
770 query
= dns
.message
.make_query(name
, 'A', 'IN')
771 query
.set_opcode(dns
.opcode
.NOTIFY
)
773 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None)
774 self
.assertEquals(receivedResponse
, None)
775 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None)
776 self
.assertEquals(receivedResponse
, None)
778 def testAdvancedOpcodeUpdateINAllow(self
):
780 Advanced: Allow Opcode UPDATE
783 name
= 'opcodeupdate.advanced.tests.powerdns.com.'
784 query
= dns
.message
.make_query(name
, 'A', 'IN')
785 query
.set_opcode(dns
.opcode
.UPDATE
)
786 response
= dns
.message
.make_response(query
)
787 rrset
= dns
.rrset
.from_text(name
,
792 response
.answer
.append(rrset
)
794 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
795 self
.assertTrue(receivedQuery
)
796 self
.assertTrue(receivedResponse
)
797 receivedQuery
.id = query
.id
798 self
.assertEquals(query
, receivedQuery
)
799 self
.assertEquals(response
, receivedResponse
)
801 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
802 self
.assertTrue(receivedQuery
)
803 self
.assertTrue(receivedResponse
)
804 receivedQuery
.id = query
.id
805 self
.assertEquals(query
, receivedQuery
)
806 self
.assertEquals(response
, receivedResponse
)
808 class TestAdvancedNonTerminalRule(DNSDistTest
):
810 _config_template
= """
811 newServer{address="127.0.0.1:%s", pool="real"}
812 addAction(AllRule(), DisableValidationAction())
813 addAction(AllRule(), PoolAction("real"))
814 addAction(AllRule(), DropAction())
816 def testAdvancedNonTerminalRules(self
):
818 Advanced: Non terminal rules
820 We check that DisableValidationAction() is applied
821 but does not stop the processing, then that
822 PoolAction() is applied _and_ stop the processing.
824 name
= 'nonterminal.advanced.tests.powerdns.com.'
825 query
= dns
.message
.make_query(name
, 'A', 'IN')
826 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
827 expectedQuery
.flags |
= dns
.flags
.CD
828 response
= dns
.message
.make_response(query
)
829 rrset
= dns
.rrset
.from_text(name
,
834 response
.answer
.append(rrset
)
836 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
837 self
.assertTrue(receivedQuery
)
838 self
.assertTrue(receivedResponse
)
839 receivedQuery
.id = expectedQuery
.id
840 self
.assertEquals(expectedQuery
, receivedQuery
)
841 self
.assertEquals(response
, receivedResponse
)
843 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
844 self
.assertTrue(receivedQuery
)
845 self
.assertTrue(receivedResponse
)
846 receivedQuery
.id = expectedQuery
.id
847 self
.assertEquals(expectedQuery
, receivedQuery
)
848 self
.assertEquals(response
, receivedResponse
)
850 class TestAdvancedStringOnlyServer(DNSDistTest
):
852 _config_template
= """
853 newServer("127.0.0.1:%s")
856 def testAdvancedStringOnlyServer(self
):
858 Advanced: "string-only" server is placed in the default pool
860 name
= 'string-only-server.advanced.tests.powerdns.com.'
861 query
= dns
.message
.make_query(name
, 'A', 'IN')
862 response
= dns
.message
.make_response(query
)
863 rrset
= dns
.rrset
.from_text(name
,
868 response
.answer
.append(rrset
)
870 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
871 self
.assertTrue(receivedQuery
)
872 self
.assertTrue(receivedResponse
)
873 receivedQuery
.id = query
.id
874 self
.assertEquals(query
, receivedQuery
)
875 self
.assertEquals(response
, receivedResponse
)
877 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
878 self
.assertTrue(receivedQuery
)
879 self
.assertTrue(receivedResponse
)
880 receivedQuery
.id = query
.id
881 self
.assertEquals(query
, receivedQuery
)
882 self
.assertEquals(response
, receivedResponse
)
884 class TestAdvancedRestoreFlagsOnSelfResponse(DNSDistTest
):
886 _config_template
= """
887 addAction(AllRule(), DisableValidationAction())
888 addAction(AllRule(), SpoofAction("192.0.2.1"))
889 newServer{address="127.0.0.1:%s"}
892 def testAdvancedRestoreFlagsOnSpoofResponse(self
):
894 Advanced: Restore flags on spoofed response
896 Send a query with CD flag cleared, dnsdist is
897 instructed to set it, then to spoof the response,
898 check that response has the flag cleared.
900 name
= 'spoofed.restoreflags.advanced.tests.powerdns.com.'
901 query
= dns
.message
.make_query(name
, 'A', 'IN')
902 # dnsdist set RA = RD for spoofed responses
903 query
.flags
&= ~dns
.flags
.RD
905 response
= dns
.message
.make_response(query
)
906 rrset
= dns
.rrset
.from_text(name
,
911 response
.answer
.append(rrset
)
913 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
914 self
.assertTrue(receivedResponse
)
915 self
.assertEquals(response
, receivedResponse
)
917 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
918 self
.assertTrue(receivedResponse
)
919 self
.assertEquals(response
, receivedResponse
)
921 class TestAdvancedQPS(DNSDistTest
):
923 _config_template
= """
924 addAction("qps.advanced.tests.powerdns.com", QPSAction(10))
925 newServer{address="127.0.0.1:%s"}
928 def testAdvancedQPSLimit(self
):
932 Send queries to "qps.advanced.tests.powerdns.com."
933 check that dnsdist drops queries when the max QPS has been reached.
936 name
= 'qps.advanced.tests.powerdns.com.'
937 query
= dns
.message
.make_query(name
, 'A', 'IN')
938 response
= dns
.message
.make_response(query
)
939 rrset
= dns
.rrset
.from_text(name
,
944 response
.answer
.append(rrset
)
946 for _
in range(maxQPS
):
947 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
948 receivedQuery
.id = query
.id
949 self
.assertEquals(query
, receivedQuery
)
950 self
.assertEquals(response
, receivedResponse
)
952 # we should now be dropped
953 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
954 self
.assertEquals(receivedResponse
, None)
958 # again, over TCP this time
959 for _
in range(maxQPS
):
960 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
961 receivedQuery
.id = query
.id
962 self
.assertEquals(query
, receivedQuery
)
963 self
.assertEquals(response
, receivedResponse
)
966 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
967 self
.assertEquals(receivedResponse
, None)
969 class TestAdvancedQPSNone(DNSDistTest
):
971 _config_template
= """
972 addAction("qpsnone.advanced.tests.powerdns.com", QPSAction(100))
973 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
974 newServer{address="127.0.0.1:%s"}
977 def testAdvancedQPSNone(self
):
979 Advanced: Not matching QPS returns None, not Allow
981 Send queries to "qps.advanced.tests.powerdns.com."
982 check that the rule returns None when the QPS has not been
985 name
= 'qpsnone.advanced.tests.powerdns.com.'
986 query
= dns
.message
.make_query(name
, 'A', 'IN')
987 expectedResponse
= dns
.message
.make_response(query
)
988 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
990 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
991 self
.assertEquals(receivedResponse
, expectedResponse
)
993 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
994 self
.assertEquals(receivedResponse
, expectedResponse
)
996 class TestAdvancedNMGRule(DNSDistTest
):
998 _config_template
= """
1000 allowed:addMask("192.0.2.1/32")
1001 addAction(NotRule(NetmaskGroupRule(allowed)), RCodeAction(dnsdist.REFUSED))
1002 newServer{address="127.0.0.1:%s"}
1005 def testAdvancedNMGRule(self
):
1007 Advanced: NMGRule should refuse our queries
1009 Send queries to "nmgrule.advanced.tests.powerdns.com.",
1010 check that we are getting a REFUSED response.
1012 name
= 'nmgrule.advanced.tests.powerdns.com.'
1013 query
= dns
.message
.make_query(name
, 'A', 'IN')
1014 expectedResponse
= dns
.message
.make_response(query
)
1015 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1017 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1018 self
.assertEquals(receivedResponse
, expectedResponse
)
1020 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1021 self
.assertEquals(receivedResponse
, expectedResponse
)
1023 class TestAdvancedLabelsCountRule(DNSDistTest
):
1025 _config_template
= """
1026 addAction(QNameLabelsCountRule(5,6), RCodeAction(dnsdist.REFUSED))
1027 newServer{address="127.0.0.1:%s"}
1030 def testAdvancedLabelsCountRule(self
):
1032 Advanced: QNameLabelsCountRule(5,6)
1034 # 6 labels, we should be fine
1035 name
= 'ok.labelscount.advanced.tests.powerdns.com.'
1036 query
= dns
.message
.make_query(name
, 'A', 'IN')
1037 response
= dns
.message
.make_response(query
)
1038 rrset
= dns
.rrset
.from_text(name
,
1043 response
.answer
.append(rrset
)
1045 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1046 self
.assertTrue(receivedQuery
)
1047 self
.assertTrue(receivedResponse
)
1048 receivedQuery
.id = query
.id
1049 self
.assertEquals(query
, receivedQuery
)
1050 self
.assertEquals(response
, receivedResponse
)
1052 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1053 self
.assertTrue(receivedQuery
)
1054 self
.assertTrue(receivedResponse
)
1055 receivedQuery
.id = query
.id
1056 self
.assertEquals(query
, receivedQuery
)
1057 self
.assertEquals(response
, receivedResponse
)
1059 # more than 6 labels, the query should be refused
1060 name
= 'not.ok.labelscount.advanced.tests.powerdns.com.'
1061 query
= dns
.message
.make_query(name
, 'A', 'IN')
1062 expectedResponse
= dns
.message
.make_response(query
)
1063 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1065 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1066 self
.assertEquals(receivedResponse
, expectedResponse
)
1068 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1069 self
.assertEquals(receivedResponse
, expectedResponse
)
1071 # less than 5 labels, the query should be refused
1072 name
= 'labelscountadvanced.tests.powerdns.com.'
1073 query
= dns
.message
.make_query(name
, 'A', 'IN')
1074 expectedResponse
= dns
.message
.make_response(query
)
1075 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1077 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1078 self
.assertEquals(receivedResponse
, expectedResponse
)
1080 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1081 self
.assertEquals(receivedResponse
, expectedResponse
)
1083 class TestAdvancedWireLengthRule(DNSDistTest
):
1085 _config_template
= """
1086 addAction(QNameWireLengthRule(54,56), RCodeAction(dnsdist.REFUSED))
1087 newServer{address="127.0.0.1:%s"}
1090 def testAdvancedWireLengthRule(self
):
1092 Advanced: QNameWireLengthRule(54,56)
1094 name
= 'longenough.qnamewirelength.advanced.tests.powerdns.com.'
1095 query
= dns
.message
.make_query(name
, 'A', 'IN')
1096 response
= dns
.message
.make_response(query
)
1097 rrset
= dns
.rrset
.from_text(name
,
1102 response
.answer
.append(rrset
)
1104 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1105 self
.assertTrue(receivedQuery
)
1106 self
.assertTrue(receivedResponse
)
1107 receivedQuery
.id = query
.id
1108 self
.assertEquals(query
, receivedQuery
)
1109 self
.assertEquals(response
, receivedResponse
)
1111 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1112 self
.assertTrue(receivedQuery
)
1113 self
.assertTrue(receivedResponse
)
1114 receivedQuery
.id = query
.id
1115 self
.assertEquals(query
, receivedQuery
)
1116 self
.assertEquals(response
, receivedResponse
)
1118 # too short, the query should be refused
1119 name
= 'short.qnamewirelength.advanced.tests.powerdns.com.'
1120 query
= dns
.message
.make_query(name
, 'A', 'IN')
1121 expectedResponse
= dns
.message
.make_response(query
)
1122 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1124 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1125 self
.assertEquals(receivedResponse
, expectedResponse
)
1127 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1128 self
.assertEquals(receivedResponse
, expectedResponse
)
1130 # too long, the query should be refused
1131 name
= 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
1132 query
= dns
.message
.make_query(name
, 'A', 'IN')
1133 expectedResponse
= dns
.message
.make_response(query
)
1134 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1136 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1137 self
.assertEquals(receivedResponse
, expectedResponse
)
1139 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1140 self
.assertEquals(receivedResponse
, expectedResponse
)
1142 class TestAdvancedIncludeDir(DNSDistTest
):
1144 _config_template
= """
1145 -- this directory contains a file allowing includedir.advanced.tests.powerdns.com.
1146 includeDirectory('test-include-dir')
1147 newServer{address="127.0.0.1:%s"}
1150 def testAdvancedIncludeDirAllowed(self
):
1152 Advanced: includeDirectory()
1154 name
= 'includedir.advanced.tests.powerdns.com.'
1155 query
= dns
.message
.make_query(name
, 'A', 'IN')
1156 response
= dns
.message
.make_response(query
)
1157 rrset
= dns
.rrset
.from_text(name
,
1162 response
.answer
.append(rrset
)
1164 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1165 self
.assertTrue(receivedQuery
)
1166 self
.assertTrue(receivedResponse
)
1167 receivedQuery
.id = query
.id
1168 self
.assertEquals(query
, receivedQuery
)
1169 self
.assertEquals(response
, receivedResponse
)
1171 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1172 self
.assertTrue(receivedQuery
)
1173 self
.assertTrue(receivedResponse
)
1174 receivedQuery
.id = query
.id
1175 self
.assertEquals(query
, receivedQuery
)
1176 self
.assertEquals(response
, receivedResponse
)
1178 # this one should be refused
1179 name
= 'notincludedir.advanced.tests.powerdns.com.'
1180 query
= dns
.message
.make_query(name
, 'A', 'IN')
1181 expectedResponse
= dns
.message
.make_response(query
)
1182 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1184 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1185 self
.assertEquals(receivedResponse
, expectedResponse
)
1187 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1188 self
.assertEquals(receivedResponse
, expectedResponse
)
1190 class TestAdvancedLuaDO(DNSDistTest
):
1192 _config_template
= """
1193 function nxDOLua(dq)
1195 return DNSAction.Nxdomain, ""
1197 return DNSAction.None, ""
1199 addAction(AllRule(), LuaAction(nxDOLua))
1200 newServer{address="127.0.0.1:%s"}
1203 def testNxDOViaLua(self
):
1205 Advanced: Nx DO queries via Lua
1207 name
= 'nxdo.advanced.tests.powerdns.com.'
1208 query
= dns
.message
.make_query(name
, 'A', 'IN')
1209 response
= dns
.message
.make_response(query
)
1210 rrset
= dns
.rrset
.from_text(name
,
1215 response
.answer
.append(rrset
)
1216 queryWithDO
= dns
.message
.make_query(name
, 'A', 'IN', want_dnssec
=True)
1217 doResponse
= dns
.message
.make_response(queryWithDO
)
1218 doResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
1221 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1222 self
.assertTrue(receivedQuery
)
1223 self
.assertTrue(receivedResponse
)
1224 receivedQuery
.id = query
.id
1225 self
.assertEquals(query
, receivedQuery
)
1226 self
.assertEquals(receivedResponse
, response
)
1228 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1229 self
.assertTrue(receivedQuery
)
1230 self
.assertTrue(receivedResponse
)
1231 receivedQuery
.id = query
.id
1232 self
.assertEquals(query
, receivedQuery
)
1233 self
.assertEquals(receivedResponse
, response
)
1236 (_
, receivedResponse
) = self
.sendUDPQuery(queryWithDO
, response
=None, useQueue
=False)
1237 self
.assertTrue(receivedResponse
)
1238 doResponse
.id = receivedResponse
.id
1239 self
.assertEquals(receivedResponse
, doResponse
)
1241 (_
, receivedResponse
) = self
.sendTCPQuery(queryWithDO
, response
=None, useQueue
=False)
1242 self
.assertTrue(receivedResponse
)
1243 doResponse
.id = receivedResponse
.id
1244 self
.assertEquals(receivedResponse
, doResponse
)
1246 class TestAdvancedLuaRefused(DNSDistTest
):
1248 _config_template
= """
1250 return DNSAction.Refused, ""
1252 addAction(AllRule(), LuaAction(refuse))
1253 newServer{address="127.0.0.1:%s"}
1256 def testRefusedViaLua(self
):
1258 Advanced: Refused via Lua
1260 name
= 'refused.advanced.tests.powerdns.com.'
1261 query
= dns
.message
.make_query(name
, 'A', 'IN')
1262 response
= dns
.message
.make_response(query
)
1263 rrset
= dns
.rrset
.from_text(name
,
1268 response
.answer
.append(rrset
)
1269 refusedResponse
= dns
.message
.make_response(query
)
1270 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1272 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1273 self
.assertTrue(receivedResponse
)
1274 refusedResponse
.id = receivedResponse
.id
1275 self
.assertEquals(receivedResponse
, refusedResponse
)
1277 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1278 self
.assertTrue(receivedResponse
)
1279 refusedResponse
.id = receivedResponse
.id
1280 self
.assertEquals(receivedResponse
, refusedResponse
)
1282 class TestAdvancedLuaActionReturnSyntax(DNSDistTest
):
1284 _config_template
= """
1286 return DNSAction.Refused
1288 addAction(AllRule(), LuaAction(refuse))
1289 newServer{address="127.0.0.1:%s"}
1292 def testRefusedWithEmptyRule(self
):
1294 Advanced: Short syntax for LuaAction return values
1296 name
= 'short.refused.advanced.tests.powerdns.com.'
1297 query
= dns
.message
.make_query(name
, 'A', 'IN')
1298 response
= dns
.message
.make_response(query
)
1299 rrset
= dns
.rrset
.from_text(name
,
1304 response
.answer
.append(rrset
)
1305 refusedResponse
= dns
.message
.make_response(query
)
1306 refusedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1308 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1309 self
.assertTrue(receivedResponse
)
1310 refusedResponse
.id = receivedResponse
.id
1311 self
.assertEquals(receivedResponse
, refusedResponse
)
1313 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1314 self
.assertTrue(receivedResponse
)
1315 refusedResponse
.id = receivedResponse
.id
1316 self
.assertEquals(receivedResponse
, refusedResponse
)
1318 class TestAdvancedLuaTruncated(DNSDistTest
):
1320 _config_template
= """
1323 return DNSAction.Truncate, ""
1325 return DNSAction.None, ""
1327 addAction(AllRule(), LuaAction(trunc))
1328 newServer{address="127.0.0.1:%s"}
1331 def testTCViaLua(self
):
1333 Advanced: TC via Lua
1335 name
= 'tc.advanced.tests.powerdns.com.'
1336 query
= dns
.message
.make_query(name
, 'A', 'IN')
1337 response
= dns
.message
.make_response(query
)
1338 rrset
= dns
.rrset
.from_text(name
,
1343 response
.answer
.append(rrset
)
1345 truncatedResponse
= dns
.message
.make_response(query
)
1346 truncatedResponse
.flags |
= dns
.flags
.TC
1348 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1349 self
.assertTrue(receivedResponse
)
1350 truncatedResponse
.id = receivedResponse
.id
1351 self
.assertEquals(receivedResponse
, truncatedResponse
)
1353 # no truncation over TCP
1354 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1355 self
.assertTrue(receivedQuery
)
1356 self
.assertTrue(receivedResponse
)
1357 receivedQuery
.id = query
.id
1358 self
.assertEquals(query
, receivedQuery
)
1359 self
.assertEquals(receivedResponse
, response
)
1361 class TestStatNodeRespRingSince(DNSDistTest
):
1363 _consoleKey
= DNSDistTest
.generateConsoleKey()
1364 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
1365 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort']
1366 _config_template
= """
1368 controlSocket("127.0.0.1:%s")
1369 s1 = newServer{address="127.0.0.1:%s"}
1371 function visitor(node, self, childstat)
1372 table.insert(nodesSeen, node.fullname)
1376 def testStatNodeRespRingSince(self
):
1378 Advanced: StatNodeRespRing with optional since parameter
1381 name
= 'statnodesince.advanced.tests.powerdns.com.'
1382 query
= dns
.message
.make_query(name
, 'A', 'IN')
1383 response
= dns
.message
.make_response(query
)
1384 rrset
= dns
.rrset
.from_text(name
,
1389 response
.answer
.append(rrset
)
1391 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1392 self
.assertTrue(receivedQuery
)
1393 self
.assertTrue(receivedResponse
)
1394 receivedQuery
.id = query
.id
1395 self
.assertEquals(query
, receivedQuery
)
1396 self
.assertEquals(response
, receivedResponse
)
1398 self
.sendConsoleCommand("nodesSeen = {}")
1399 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1400 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1401 nodes
= nodes
.strip("\n")
1402 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1403 advanced.tests.powerdns.com.
1408 self
.sendConsoleCommand("nodesSeen = {}")
1409 self
.sendConsoleCommand("statNodeRespRing(visitor, 0)")
1410 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1411 nodes
= nodes
.strip("\n")
1412 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1413 advanced.tests.powerdns.com.
1420 self
.sendConsoleCommand("nodesSeen = {}")
1421 self
.sendConsoleCommand("statNodeRespRing(visitor)")
1422 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1423 nodes
= nodes
.strip("\n")
1424 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1425 advanced.tests.powerdns.com.
1430 self
.sendConsoleCommand("nodesSeen = {}")
1431 self
.sendConsoleCommand("statNodeRespRing(visitor, 5)")
1432 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1433 nodes
= nodes
.strip("\n")
1434 self
.assertEquals(nodes
, """""")
1436 self
.sendConsoleCommand("nodesSeen = {}")
1437 self
.sendConsoleCommand("statNodeRespRing(visitor, 10)")
1438 nodes
= self
.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1439 nodes
= nodes
.strip("\n")
1440 self
.assertEquals(nodes
, """statnodesince.advanced.tests.powerdns.com.
1441 advanced.tests.powerdns.com.
1446 class TestAdvancedRD(DNSDistTest
):
1448 _config_template
= """
1449 addAction(RDRule(), RCodeAction(dnsdist.REFUSED))
1450 newServer{address="127.0.0.1:%s"}
1453 def testAdvancedRDRefused(self
):
1455 Advanced: RD query is refused
1457 name
= 'rd.advanced.tests.powerdns.com.'
1458 query
= dns
.message
.make_query(name
, 'A', 'IN')
1459 expectedResponse
= dns
.message
.make_response(query
)
1460 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
1462 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1463 self
.assertEquals(receivedResponse
, expectedResponse
)
1465 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1466 self
.assertEquals(receivedResponse
, expectedResponse
)
1468 def testAdvancedNoRDAllowed(self
):
1470 Advanced: No-RD query is allowed
1472 name
= 'no-rd.advanced.tests.powerdns.com.'
1473 query
= dns
.message
.make_query(name
, 'A', 'IN')
1474 query
.flags
&= ~dns
.flags
.RD
1475 response
= dns
.message
.make_response(query
)
1477 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1478 receivedQuery
.id = query
.id
1479 self
.assertEquals(receivedQuery
, query
)
1480 self
.assertEquals(receivedResponse
, response
)
1482 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
1483 receivedQuery
.id = query
.id
1484 self
.assertEquals(receivedQuery
, query
)
1485 self
.assertEquals(receivedResponse
, response
)
1487 class TestAdvancedGetLocalPort(DNSDistTest
):
1489 _config_template
= """
1490 function answerBasedOnLocalPort(dq)
1491 local port = dq.localaddr:getPort()
1492 return DNSAction.Spoof, "port-was-"..port..".local-port.advanced.tests.powerdns.com."
1494 addAction("local-port.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1495 newServer{address="127.0.0.1:%s"}
1498 def testAdvancedGetLocalPort(self
):
1500 Advanced: Return CNAME containing the local port
1502 name
= 'local-port.advanced.tests.powerdns.com.'
1503 query
= dns
.message
.make_query(name
, 'A', 'IN')
1504 # dnsdist set RA = RD for spoofed responses
1505 query
.flags
&= ~dns
.flags
.RD
1507 response
= dns
.message
.make_response(query
)
1508 rrset
= dns
.rrset
.from_text(name
,
1511 dns
.rdatatype
.CNAME
,
1512 'port-was-{}.local-port.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1513 response
.answer
.append(rrset
)
1515 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1516 self
.assertEquals(receivedResponse
, response
)
1518 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1519 self
.assertEquals(receivedResponse
, response
)
1521 class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest
):
1523 _config_template
= """
1524 function answerBasedOnLocalPort(dq)
1525 local port = dq.localaddr:getPort()
1526 return DNSAction.Spoof, "port-was-"..port..".local-port-any.advanced.tests.powerdns.com."
1528 addAction("local-port-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1529 newServer{address="127.0.0.1:%s"}
1531 _dnsDistListeningAddr
= "0.0.0.0"
1533 def testAdvancedGetLocalPortOnAnyBind(self
):
1535 Advanced: Return CNAME containing the local port for an ANY bind
1537 name
= 'local-port-any.advanced.tests.powerdns.com.'
1538 query
= dns
.message
.make_query(name
, 'A', 'IN')
1539 # dnsdist set RA = RD for spoofed responses
1540 query
.flags
&= ~dns
.flags
.RD
1542 response
= dns
.message
.make_response(query
)
1543 rrset
= dns
.rrset
.from_text(name
,
1546 dns
.rdatatype
.CNAME
,
1547 'port-was-{}.local-port-any.advanced.tests.powerdns.com.'.format(self
._dnsDistPort
))
1548 response
.answer
.append(rrset
)
1550 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
1551 self
.assertEquals(receivedResponse
, response
)
1553 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
1554 self
.assertEquals(receivedResponse
, response
)
1556 class TestAdvancedLuaTempFailureTTL(DNSDistTest
):
1558 _config_template
= """
1559 function testAction(dq)
1560 if dq.tempFailureTTL ~= nil then
1561 return DNSAction.Spoof, "initially.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1563 dq.tempFailureTTL = 30
1564 if dq.tempFailureTTL ~= 30 then
1565 return DNSAction.Spoof, "after.set.not.expected.value.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1567 dq.tempFailureTTL = nil
1568 if dq.tempFailureTTL ~= nil then
1569 return DNSAction.Spoof, "after.unset.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1571 return DNSAction.None, ""
1573 addAction(AllRule(), LuaAction(testAction))
1574 newServer{address="127.0.0.1:%s"}
1577 def testTempFailureTTLBinding(self
):
1579 Exercise dq.tempFailureTTL Lua binding
1581 name
= 'tempfailurettlbinding.advanced.tests.powerdns.com.'
1582 query
= dns
.message
.make_query(name
, 'A', 'IN')
1583 response
= dns
.message
.make_response(query
)
1584 rrset
= dns
.rrset
.from_text(name
,
1589 response
.answer
.append(rrset
)
1591 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
1592 self
.assertTrue(receivedQuery
)
1593 self
.assertTrue(receivedResponse
)
1594 receivedQuery
.id = query
.id
1595 self
.assertEquals(query
, receivedQuery
)
1596 self
.assertEquals(receivedResponse
, response
)