]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Advanced.py
dnsdist: Add SetNegativeAndSOAAction() and its Lua binding
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Advanced.py
1 #!/usr/bin/env python
2 import base64
3 from datetime import datetime, timedelta
4 import os
5 import string
6 import time
7 import dns
8 import clientsubnetoption
9 from dnsdisttests import DNSDistTest
10
11 class TestAdvancedAllow(DNSDistTest):
12
13 _config_template = """
14 addAction(AllRule(), NoneAction())
15 addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
16 addAction(AllRule(), DropAction())
17 newServer{address="127.0.0.1:%s"}
18 """
19
20 def testAdvancedAllow(self):
21 """
22 Advanced: Allowed qname is not dropped
23
24 A query for allowed.advanced.tests.powerdns.com. should be allowed
25 while others should be dropped.
26 """
27 name = 'allowed.advanced.tests.powerdns.com.'
28 query = dns.message.make_query(name, 'A', 'IN')
29 response = dns.message.make_response(query)
30 rrset = dns.rrset.from_text(name,
31 3600,
32 dns.rdataclass.IN,
33 dns.rdatatype.A,
34 '127.0.0.1')
35 response.answer.append(rrset)
36
37 for method in ("sendUDPQuery", "sendTCPQuery"):
38 sender = getattr(self, method)
39 (receivedQuery, receivedResponse) = sender(query, response)
40 self.assertTrue(receivedQuery)
41 self.assertTrue(receivedResponse)
42 receivedQuery.id = query.id
43 self.assertEquals(query, receivedQuery)
44 self.assertEquals(response, receivedResponse)
45
46 def testAdvancedAllowDropped(self):
47 """
48 Advanced: Not allowed qname is dropped
49
50 A query for notallowed.advanced.tests.powerdns.com. should be dropped.
51 """
52 name = 'notallowed.advanced.tests.powerdns.com.'
53 query = dns.message.make_query(name, 'A', 'IN')
54
55 for method in ("sendUDPQuery", "sendTCPQuery"):
56 sender = getattr(self, method)
57 (_, receivedResponse) = sender(query, response=None, useQueue=False)
58
59 class TestAdvancedFixupCase(DNSDistTest):
60
61 _config_template = """
62 truncateTC(true)
63 fixupCase(true)
64 newServer{address="127.0.0.1:%s"}
65 """
66
67 def testAdvancedFixupCase(self):
68 """
69 Advanced: Fixup Case
70
71 Send a query with lower and upper chars,
72 make the backend return a lowercase version,
73 check that dnsdist fixes the response.
74 """
75 name = 'fiXuPCasE.advanced.tests.powerdns.com.'
76 query = dns.message.make_query(name, 'A', 'IN')
77 lowercasequery = dns.message.make_query(name.lower(), 'A', 'IN')
78 response = dns.message.make_response(lowercasequery)
79 expectedResponse = dns.message.make_response(query)
80 rrset = dns.rrset.from_text(name,
81 3600,
82 dns.rdataclass.IN,
83 dns.rdatatype.A,
84 '127.0.0.1')
85 response.answer.append(rrset)
86 expectedResponse.answer.append(rrset)
87
88 for method in ("sendUDPQuery", "sendTCPQuery"):
89 sender = getattr(self, method)
90 (receivedQuery, receivedResponse) = sender(query, response)
91 self.assertTrue(receivedQuery)
92 self.assertTrue(receivedResponse)
93 receivedQuery.id = query.id
94 self.assertEquals(query, receivedQuery)
95 self.assertEquals(expectedResponse, receivedResponse)
96
97 class TestAdvancedRemoveRD(DNSDistTest):
98
99 _config_template = """
100 addAction("norecurse.advanced.tests.powerdns.com.", NoRecurseAction())
101 newServer{address="127.0.0.1:%s"}
102 """
103
104 def testAdvancedNoRD(self):
105 """
106 Advanced: No RD
107
108 Send a query with RD,
109 check that dnsdist clears the RD flag.
110 """
111 name = 'norecurse.advanced.tests.powerdns.com.'
112 query = dns.message.make_query(name, 'A', 'IN')
113 expectedQuery = dns.message.make_query(name, 'A', 'IN')
114 expectedQuery.flags &= ~dns.flags.RD
115
116 response = dns.message.make_response(query)
117 rrset = dns.rrset.from_text(name,
118 3600,
119 dns.rdataclass.IN,
120 dns.rdatatype.A,
121 '127.0.0.1')
122 response.answer.append(rrset)
123
124 for method in ("sendUDPQuery", "sendTCPQuery"):
125 sender = getattr(self, method)
126 (receivedQuery, receivedResponse) = sender(query, response)
127 self.assertTrue(receivedQuery)
128 self.assertTrue(receivedResponse)
129 receivedQuery.id = expectedQuery.id
130 self.assertEquals(expectedQuery, receivedQuery)
131 self.assertEquals(response, receivedResponse)
132
133 def testAdvancedKeepRD(self):
134 """
135 Advanced: No RD canary
136
137 Send a query with RD for a canary domain,
138 check that dnsdist does not clear the RD flag.
139 """
140 name = 'keeprecurse.advanced.tests.powerdns.com.'
141 query = dns.message.make_query(name, 'A', 'IN')
142
143 response = dns.message.make_response(query)
144 rrset = dns.rrset.from_text(name,
145 3600,
146 dns.rdataclass.IN,
147 dns.rdatatype.A,
148 '127.0.0.1')
149 response.answer.append(rrset)
150
151 for method in ("sendUDPQuery", "sendTCPQuery"):
152 sender = getattr(self, method)
153 (receivedQuery, receivedResponse) = sender(query, response)
154 self.assertTrue(receivedQuery)
155 self.assertTrue(receivedResponse)
156 receivedQuery.id = query.id
157 self.assertEquals(query, receivedQuery)
158 self.assertEquals(response, receivedResponse)
159
160 class TestAdvancedAddCD(DNSDistTest):
161
162 _config_template = """
163 addAction("setcd.advanced.tests.powerdns.com.", DisableValidationAction())
164 addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
165 newServer{address="127.0.0.1:%s"}
166 """
167
168 def testAdvancedSetCD(self):
169 """
170 Advanced: Set CD
171
172 Send a query with CD cleared,
173 check that dnsdist set the CD flag.
174 """
175 name = 'setcd.advanced.tests.powerdns.com.'
176 query = dns.message.make_query(name, 'A', 'IN')
177 expectedQuery = dns.message.make_query(name, 'A', 'IN')
178 expectedQuery.flags |= dns.flags.CD
179
180 response = dns.message.make_response(query)
181 rrset = dns.rrset.from_text(name,
182 3600,
183 dns.rdataclass.IN,
184 dns.rdatatype.A,
185 '127.0.0.1')
186 response.answer.append(rrset)
187
188 for method in ("sendUDPQuery", "sendTCPQuery"):
189 sender = getattr(self, method)
190 (receivedQuery, receivedResponse) = sender(query, response)
191 self.assertTrue(receivedQuery)
192 self.assertTrue(receivedResponse)
193 receivedQuery.id = expectedQuery.id
194 self.assertEquals(expectedQuery, receivedQuery)
195 self.assertEquals(response, receivedResponse)
196
197 def testAdvancedSetCDViaAction(self):
198 """
199 Advanced: Set CD via Action
200
201 Send a query with CD cleared,
202 check that dnsdist set the CD flag.
203 """
204 name = 'setcdviaaction.advanced.tests.powerdns.com.'
205 query = dns.message.make_query(name, 'A', 'IN')
206 expectedQuery = dns.message.make_query(name, 'A', 'IN')
207 expectedQuery.flags |= dns.flags.CD
208
209 response = dns.message.make_response(query)
210 rrset = dns.rrset.from_text(name,
211 3600,
212 dns.rdataclass.IN,
213 dns.rdatatype.A,
214 '127.0.0.1')
215 response.answer.append(rrset)
216
217 for method in ("sendUDPQuery", "sendTCPQuery"):
218 sender = getattr(self, method)
219 (receivedQuery, receivedResponse) = sender(query, response)
220 self.assertTrue(receivedQuery)
221 self.assertTrue(receivedResponse)
222 receivedQuery.id = expectedQuery.id
223 self.assertEquals(expectedQuery, receivedQuery)
224 self.assertEquals(response, receivedResponse)
225
226 def testAdvancedKeepNoCD(self):
227 """
228 Advanced: Preserve CD canary
229
230 Send a query without CD for a canary domain,
231 check that dnsdist does not set the CD flag.
232 """
233 name = 'keepnocd.advanced.tests.powerdns.com.'
234 query = dns.message.make_query(name, 'A', 'IN')
235
236 response = dns.message.make_response(query)
237 rrset = dns.rrset.from_text(name,
238 3600,
239 dns.rdataclass.IN,
240 dns.rdatatype.A,
241 '127.0.0.1')
242 response.answer.append(rrset)
243
244 for method in ("sendUDPQuery", "sendTCPQuery"):
245 sender = getattr(self, method)
246 (receivedQuery, receivedResponse) = sender(query, response)
247 self.assertTrue(receivedQuery)
248 self.assertTrue(receivedResponse)
249 receivedQuery.id = query.id
250 self.assertEquals(query, receivedQuery)
251 self.assertEquals(response, receivedResponse)
252
253 class TestAdvancedClearRD(DNSDistTest):
254
255 _config_template = """
256 addAction("clearrd.advanced.tests.powerdns.com.", NoRecurseAction())
257 addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
258 newServer{address="127.0.0.1:%s"}
259 """
260
261 def testAdvancedClearRD(self):
262 """
263 Advanced: Clear RD
264
265 Send a query with RD set,
266 check that dnsdist clears the RD flag.
267 """
268 name = 'clearrd.advanced.tests.powerdns.com.'
269 query = dns.message.make_query(name, 'A', 'IN')
270 expectedQuery = dns.message.make_query(name, 'A', 'IN')
271 expectedQuery.flags &= ~dns.flags.RD
272
273 response = dns.message.make_response(query)
274 rrset = dns.rrset.from_text(name,
275 3600,
276 dns.rdataclass.IN,
277 dns.rdatatype.A,
278 '127.0.0.1')
279 response.answer.append(rrset)
280
281 for method in ("sendUDPQuery", "sendTCPQuery"):
282 sender = getattr(self, method)
283 (receivedQuery, receivedResponse) = sender(query, response)
284 self.assertTrue(receivedQuery)
285 self.assertTrue(receivedResponse)
286 receivedQuery.id = expectedQuery.id
287 self.assertEquals(expectedQuery, receivedQuery)
288 self.assertEquals(response, receivedResponse)
289
290 def testAdvancedClearRDViaAction(self):
291 """
292 Advanced: Clear RD via Action
293
294 Send a query with RD set,
295 check that dnsdist clears the RD flag.
296 """
297 name = 'clearrdviaaction.advanced.tests.powerdns.com.'
298 query = dns.message.make_query(name, 'A', 'IN')
299 expectedQuery = dns.message.make_query(name, 'A', 'IN')
300 expectedQuery.flags &= ~dns.flags.RD
301
302 response = dns.message.make_response(query)
303 rrset = dns.rrset.from_text(name,
304 3600,
305 dns.rdataclass.IN,
306 dns.rdatatype.A,
307 '127.0.0.1')
308 response.answer.append(rrset)
309
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (receivedQuery, receivedResponse) = sender(query, response)
313 self.assertTrue(receivedQuery)
314 self.assertTrue(receivedResponse)
315 receivedQuery.id = expectedQuery.id
316 self.assertEquals(expectedQuery, receivedQuery)
317 self.assertEquals(response, receivedResponse)
318
319 def testAdvancedKeepRD(self):
320 """
321 Advanced: Preserve RD canary
322
323 Send a query with RD for a canary domain,
324 check that dnsdist does not clear the RD flag.
325 """
326 name = 'keeprd.advanced.tests.powerdns.com.'
327 query = dns.message.make_query(name, 'A', 'IN')
328
329 response = dns.message.make_response(query)
330 rrset = dns.rrset.from_text(name,
331 3600,
332 dns.rdataclass.IN,
333 dns.rdatatype.A,
334 '127.0.0.1')
335 response.answer.append(rrset)
336
337 for method in ("sendUDPQuery", "sendTCPQuery"):
338 sender = getattr(self, method)
339 (receivedQuery, receivedResponse) = sender(query, response)
340 self.assertTrue(receivedQuery)
341 self.assertTrue(receivedResponse)
342 receivedQuery.id = query.id
343 self.assertEquals(query, receivedQuery)
344 self.assertEquals(response, receivedResponse)
345
346
347 class TestAdvancedACL(DNSDistTest):
348
349 _config_template = """
350 newServer{address="127.0.0.1:%s"}
351 """
352 _acl = ['192.0.2.1/32']
353
354 def testACLBlocked(self):
355 """
356 Advanced: ACL blocked
357
358 Send an A query to "tests.powerdns.com.",
359 we expect no response since 127.0.0.1 is not on the
360 ACL.
361 """
362 name = 'tests.powerdns.com.'
363 query = dns.message.make_query(name, 'A', 'IN')
364
365 for method in ("sendUDPQuery", "sendTCPQuery"):
366 sender = getattr(self, method)
367 (_, receivedResponse) = sender(query, response=None, useQueue=False)
368 self.assertEquals(receivedResponse, None)
369
370 class TestAdvancedDelay(DNSDistTest):
371
372 _config_template = """
373 addAction(AllRule(), DelayAction(1000))
374 newServer{address="127.0.0.1:%s"}
375 """
376
377 def testDelayed(self):
378 """
379 Advanced: Delayed
380
381 Send an A query to "tests.powerdns.com.",
382 check that the response delay is longer than 1000 ms
383 over UDP, less than that over TCP.
384 """
385 name = 'tests.powerdns.com.'
386 query = dns.message.make_query(name, 'A', 'IN')
387 response = dns.message.make_response(query)
388 rrset = dns.rrset.from_text(name,
389 60,
390 dns.rdataclass.IN,
391 dns.rdatatype.A,
392 '192.0.2.1')
393 response.answer.append(rrset)
394
395 begin = datetime.now()
396 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
397 end = datetime.now()
398 receivedQuery.id = query.id
399 self.assertEquals(query, receivedQuery)
400 self.assertEquals(response, receivedResponse)
401 self.assertTrue((end - begin) > timedelta(0, 1))
402
403 begin = datetime.now()
404 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
405 end = datetime.now()
406 receivedQuery.id = query.id
407 self.assertEquals(query, receivedQuery)
408 self.assertEquals(response, receivedResponse)
409 self.assertTrue((end - begin) < timedelta(0, 1))
410
411
412 class TestAdvancedTruncateAnyAndTCP(DNSDistTest):
413
414 _config_template = """
415 truncateTC(false)
416 addAction(AndRule({QTypeRule("ANY"), TCPRule(true)}), TCAction())
417 newServer{address="127.0.0.1:%s"}
418 """
419 def testTruncateAnyOverTCP(self):
420 """
421 Advanced: Truncate ANY over TCP
422
423 Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
424 should be truncated over TCP, not over UDP (yes, it makes no sense,
425 deal with it).
426 """
427 name = 'anytruncatetcp.advanced.tests.powerdns.com.'
428 query = dns.message.make_query(name, 'ANY', 'IN')
429 # dnsdist sets RA = RD for TC responses
430 query.flags &= ~dns.flags.RD
431
432 response = dns.message.make_response(query)
433 rrset = dns.rrset.from_text(name,
434 3600,
435 dns.rdataclass.IN,
436 dns.rdatatype.A,
437 '127.0.0.1')
438
439 response.answer.append(rrset)
440
441 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
442 self.assertTrue(receivedQuery)
443 self.assertTrue(receivedResponse)
444 receivedQuery.id = query.id
445 self.assertEquals(query, receivedQuery)
446 self.assertEquals(receivedResponse, response)
447
448 expectedResponse = dns.message.make_response(query)
449 expectedResponse.flags |= dns.flags.TC
450
451 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
452 self.assertEquals(receivedResponse, expectedResponse)
453
454 class TestAdvancedAndNot(DNSDistTest):
455
456 _config_template = """
457 addAction(AndRule({NotRule(QTypeRule("A")), TCPRule(false)}), RCodeAction(DNSRCode.NOTIMP))
458 newServer{address="127.0.0.1:%s"}
459 """
460 def testAOverUDPReturnsNotImplementedCanary(self):
461 """
462 Advanced: !A && UDP canary
463
464 dnsdist is configured to reply 'not implemented' for query
465 over UDP AND !qtype A.
466 We send an A query over UDP and TCP, and check that the
467 response is OK.
468 """
469 name = 'andnot.advanced.tests.powerdns.com.'
470 query = dns.message.make_query(name, 'A', 'IN')
471 response = dns.message.make_response(query)
472 rrset = dns.rrset.from_text(name,
473 3600,
474 dns.rdataclass.IN,
475 dns.rdatatype.A,
476 '127.0.0.1')
477 response.answer.append(rrset)
478
479 for method in ("sendUDPQuery", "sendTCPQuery"):
480 sender = getattr(self, method)
481 (receivedQuery, receivedResponse) = sender(query, response)
482 self.assertTrue(receivedQuery)
483 self.assertTrue(receivedResponse)
484 receivedQuery.id = query.id
485 self.assertEquals(query, receivedQuery)
486 self.assertEquals(receivedResponse, response)
487
488 def testAOverUDPReturnsNotImplemented(self):
489 """
490 Advanced: !A && UDP
491
492 dnsdist is configured to reply 'not implemented' for query
493 over UDP AND !qtype A.
494 We send a TXT query over UDP and TCP, and check that the
495 response is OK for TCP and 'not implemented' for UDP.
496 """
497 name = 'andnot.advanced.tests.powerdns.com.'
498 query = dns.message.make_query(name, 'TXT', 'IN')
499 # dnsdist sets RA = RD for TC responses
500 query.flags &= ~dns.flags.RD
501
502 expectedResponse = dns.message.make_response(query)
503 expectedResponse.set_rcode(dns.rcode.NOTIMP)
504
505 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
506 self.assertEquals(receivedResponse, expectedResponse)
507
508 response = dns.message.make_response(query)
509 rrset = dns.rrset.from_text(name,
510 3600,
511 dns.rdataclass.IN,
512 dns.rdatatype.TXT,
513 'nothing to see here')
514 response.answer.append(rrset)
515
516 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
517 self.assertTrue(receivedQuery)
518 self.assertTrue(receivedResponse)
519 receivedQuery.id = query.id
520 self.assertEquals(query, receivedQuery)
521 self.assertEquals(receivedResponse, response)
522
523 class TestAdvancedOr(DNSDistTest):
524
525 _config_template = """
526 addAction(OrRule({QTypeRule("A"), TCPRule(false)}), RCodeAction(DNSRCode.NOTIMP))
527 newServer{address="127.0.0.1:%s"}
528 """
529 def testAAAAOverUDPReturnsNotImplemented(self):
530 """
531 Advanced: A || UDP: AAAA
532
533 dnsdist is configured to reply 'not implemented' for query
534 over UDP OR qtype A.
535 We send an AAAA query over UDP and TCP, and check that the
536 response is 'not implemented' for UDP and OK for TCP.
537 """
538 name = 'aorudp.advanced.tests.powerdns.com.'
539 query = dns.message.make_query(name, 'AAAA', 'IN')
540 query.flags &= ~dns.flags.RD
541 response = dns.message.make_response(query)
542 rrset = dns.rrset.from_text(name,
543 3600,
544 dns.rdataclass.IN,
545 dns.rdatatype.AAAA,
546 '::1')
547 response.answer.append(rrset)
548
549 expectedResponse = dns.message.make_response(query)
550 expectedResponse.set_rcode(dns.rcode.NOTIMP)
551
552 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
553 self.assertEquals(receivedResponse, expectedResponse)
554
555 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
556 self.assertTrue(receivedQuery)
557 self.assertTrue(receivedResponse)
558 receivedQuery.id = query.id
559 self.assertEquals(query, receivedQuery)
560 self.assertEquals(receivedResponse, response)
561
562 def testAOverUDPReturnsNotImplemented(self):
563 """
564 Advanced: A || UDP: A
565
566 dnsdist is configured to reply 'not implemented' for query
567 over UDP OR qtype A.
568 We send an A query over UDP and TCP, and check that the
569 response is 'not implemented' for both.
570 """
571 name = 'aorudp.advanced.tests.powerdns.com.'
572 query = dns.message.make_query(name, 'A', 'IN')
573 query.flags &= ~dns.flags.RD
574
575 expectedResponse = dns.message.make_response(query)
576 expectedResponse.set_rcode(dns.rcode.NOTIMP)
577
578 for method in ("sendUDPQuery", "sendTCPQuery"):
579 sender = getattr(self, method)
580 (_, receivedResponse) = sender(query, response=None, useQueue=False)
581 self.assertEquals(receivedResponse, expectedResponse)
582
583
584 class TestAdvancedLogAction(DNSDistTest):
585
586 _config_template = """
587 newServer{address="127.0.0.1:%s"}
588 addAction(AllRule(), LogAction("dnsdist.log", false))
589 """
590 def testAdvancedLogAction(self):
591 """
592 Advanced: Log all queries
593
594 """
595 count = 50
596 name = 'logaction.advanced.tests.powerdns.com.'
597 query = dns.message.make_query(name, 'A', 'IN')
598 response = dns.message.make_response(query)
599 rrset = dns.rrset.from_text(name,
600 3600,
601 dns.rdataclass.IN,
602 dns.rdatatype.A,
603 '127.0.0.1')
604 response.answer.append(rrset)
605
606 for _ in range(count):
607 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
608 self.assertTrue(receivedQuery)
609 self.assertTrue(receivedResponse)
610 receivedQuery.id = query.id
611 self.assertEquals(query, receivedQuery)
612 self.assertEquals(response, receivedResponse)
613
614 self.assertTrue(os.path.isfile('dnsdist.log'))
615 self.assertTrue(os.stat('dnsdist.log').st_size > 0)
616
617 class TestAdvancedDNSSEC(DNSDistTest):
618
619 _config_template = """
620 newServer{address="127.0.0.1:%s"}
621 addAction(DNSSECRule(), DropAction())
622 """
623 def testAdvancedDNSSECDrop(self):
624 """
625 Advanced: DNSSEC Rule
626
627 """
628 name = 'dnssec.advanced.tests.powerdns.com.'
629 query = dns.message.make_query(name, 'A', 'IN')
630 doquery = dns.message.make_query(name, 'A', 'IN', want_dnssec=True)
631 response = dns.message.make_response(query)
632 rrset = dns.rrset.from_text(name,
633 3600,
634 dns.rdataclass.IN,
635 dns.rdatatype.A,
636 '127.0.0.1')
637 response.answer.append(rrset)
638
639 for method in ("sendUDPQuery", "sendTCPQuery"):
640 sender = getattr(self, method)
641 (receivedQuery, receivedResponse) = sender(query, response)
642 self.assertTrue(receivedQuery)
643 self.assertTrue(receivedResponse)
644 receivedQuery.id = query.id
645 self.assertEquals(query, receivedQuery)
646 self.assertEquals(response, receivedResponse)
647
648 for method in ("sendUDPQuery", "sendTCPQuery"):
649 sender = getattr(self, method)
650 (_, receivedResponse) = sender(doquery, response)
651 self.assertEquals(receivedResponse, None)
652
653 class TestAdvancedQClass(DNSDistTest):
654
655 _config_template = """
656 newServer{address="127.0.0.1:%s"}
657 addAction(QClassRule(DNSClass.CHAOS), DropAction())
658 """
659 def testAdvancedQClassChaosDrop(self):
660 """
661 Advanced: Drop QClass CHAOS
662
663 """
664 name = 'qclasschaos.advanced.tests.powerdns.com.'
665 query = dns.message.make_query(name, 'TXT', 'CHAOS')
666
667 for method in ("sendUDPQuery", "sendTCPQuery"):
668 sender = getattr(self, method)
669 (_, receivedResponse) = sender(query, response=None)
670 self.assertEquals(receivedResponse, None)
671
672 def testAdvancedQClassINAllow(self):
673 """
674 Advanced: Allow QClass IN
675
676 """
677 name = 'qclassin.advanced.tests.powerdns.com.'
678 query = dns.message.make_query(name, 'A', 'IN')
679 response = dns.message.make_response(query)
680 rrset = dns.rrset.from_text(name,
681 3600,
682 dns.rdataclass.IN,
683 dns.rdatatype.A,
684 '127.0.0.1')
685 response.answer.append(rrset)
686
687 for method in ("sendUDPQuery", "sendTCPQuery"):
688 sender = getattr(self, method)
689 (receivedQuery, receivedResponse) = sender(query, response)
690 self.assertTrue(receivedQuery)
691 self.assertTrue(receivedResponse)
692 receivedQuery.id = query.id
693 self.assertEquals(query, receivedQuery)
694 self.assertEquals(response, receivedResponse)
695
696 class TestAdvancedOpcode(DNSDistTest):
697
698 _config_template = """
699 newServer{address="127.0.0.1:%s"}
700 addAction(OpcodeRule(DNSOpcode.Notify), DropAction())
701 """
702 def testAdvancedOpcodeNotifyDrop(self):
703 """
704 Advanced: Drop Opcode NOTIFY
705
706 """
707 name = 'opcodenotify.advanced.tests.powerdns.com.'
708 query = dns.message.make_query(name, 'A', 'IN')
709 query.set_opcode(dns.opcode.NOTIFY)
710
711 for method in ("sendUDPQuery", "sendTCPQuery"):
712 sender = getattr(self, method)
713 (_, receivedResponse) = sender(query, response=None)
714 self.assertEquals(receivedResponse, None)
715
716 def testAdvancedOpcodeUpdateINAllow(self):
717 """
718 Advanced: Allow Opcode UPDATE
719
720 """
721 name = 'opcodeupdate.advanced.tests.powerdns.com.'
722 query = dns.message.make_query(name, 'A', 'IN')
723 query.set_opcode(dns.opcode.UPDATE)
724 response = dns.message.make_response(query)
725 rrset = dns.rrset.from_text(name,
726 3600,
727 dns.rdataclass.IN,
728 dns.rdatatype.A,
729 '127.0.0.1')
730 response.answer.append(rrset)
731
732 for method in ("sendUDPQuery", "sendTCPQuery"):
733 sender = getattr(self, method)
734 (receivedQuery, receivedResponse) = sender(query, response)
735 self.assertTrue(receivedQuery)
736 self.assertTrue(receivedResponse)
737 receivedQuery.id = query.id
738 self.assertEquals(query, receivedQuery)
739 self.assertEquals(response, receivedResponse)
740
741 class TestAdvancedNonTerminalRule(DNSDistTest):
742
743 _config_template = """
744 newServer{address="127.0.0.1:%s", pool="real"}
745 addAction(AllRule(), DisableValidationAction())
746 addAction(AllRule(), PoolAction("real"))
747 addAction(AllRule(), DropAction())
748 """
749 def testAdvancedNonTerminalRules(self):
750 """
751 Advanced: Non terminal rules
752
753 We check that DisableValidationAction() is applied
754 but does not stop the processing, then that
755 PoolAction() is applied _and_ stop the processing.
756 """
757 name = 'nonterminal.advanced.tests.powerdns.com.'
758 query = dns.message.make_query(name, 'A', 'IN')
759 expectedQuery = dns.message.make_query(name, 'A', 'IN')
760 expectedQuery.flags |= dns.flags.CD
761 response = dns.message.make_response(query)
762 rrset = dns.rrset.from_text(name,
763 3600,
764 dns.rdataclass.IN,
765 dns.rdatatype.A,
766 '192.0.2.1')
767 response.answer.append(rrset)
768
769 for method in ("sendUDPQuery", "sendTCPQuery"):
770 sender = getattr(self, method)
771 (receivedQuery, receivedResponse) = sender(query, response)
772 self.assertTrue(receivedQuery)
773 self.assertTrue(receivedResponse)
774 receivedQuery.id = expectedQuery.id
775 self.assertEquals(expectedQuery, receivedQuery)
776 self.assertEquals(response, receivedResponse)
777
778 class TestAdvancedStringOnlyServer(DNSDistTest):
779
780 _config_template = """
781 newServer("127.0.0.1:%s")
782 """
783
784 def testAdvancedStringOnlyServer(self):
785 """
786 Advanced: "string-only" server is placed in the default pool
787 """
788 name = 'string-only-server.advanced.tests.powerdns.com.'
789 query = dns.message.make_query(name, 'A', 'IN')
790 response = dns.message.make_response(query)
791 rrset = dns.rrset.from_text(name,
792 3600,
793 dns.rdataclass.IN,
794 dns.rdatatype.A,
795 '192.0.2.1')
796 response.answer.append(rrset)
797
798 for method in ("sendUDPQuery", "sendTCPQuery"):
799 sender = getattr(self, method)
800 (receivedQuery, receivedResponse) = sender(query, response)
801 self.assertTrue(receivedQuery)
802 self.assertTrue(receivedResponse)
803 receivedQuery.id = query.id
804 self.assertEquals(query, receivedQuery)
805 self.assertEquals(response, receivedResponse)
806
807 class TestAdvancedRestoreFlagsOnSelfResponse(DNSDistTest):
808
809 _config_template = """
810 addAction(AllRule(), DisableValidationAction())
811 addAction(AllRule(), SpoofAction("192.0.2.1"))
812 newServer{address="127.0.0.1:%s"}
813 """
814
815 def testAdvancedRestoreFlagsOnSpoofResponse(self):
816 """
817 Advanced: Restore flags on spoofed response
818
819 Send a query with CD flag cleared, dnsdist is
820 instructed to set it, then to spoof the response,
821 check that response has the flag cleared.
822 """
823 name = 'spoofed.restoreflags.advanced.tests.powerdns.com.'
824 query = dns.message.make_query(name, 'A', 'IN')
825 # dnsdist set RA = RD for spoofed responses
826 query.flags &= ~dns.flags.RD
827
828 response = dns.message.make_response(query)
829 rrset = dns.rrset.from_text(name,
830 60,
831 dns.rdataclass.IN,
832 dns.rdatatype.A,
833 '192.0.2.1')
834 response.answer.append(rrset)
835
836 for method in ("sendUDPQuery", "sendTCPQuery"):
837 sender = getattr(self, method)
838 (_, receivedResponse) = sender(query, response=None, useQueue=False)
839 self.assertTrue(receivedResponse)
840 self.assertEquals(response, receivedResponse)
841
842 class TestAdvancedQPS(DNSDistTest):
843
844 _config_template = """
845 addAction("qps.advanced.tests.powerdns.com", QPSAction(10))
846 newServer{address="127.0.0.1:%s"}
847 """
848
849 def testAdvancedQPSLimit(self):
850 """
851 Advanced: QPS Limit
852
853 Send queries to "qps.advanced.tests.powerdns.com."
854 check that dnsdist drops queries when the max QPS has been reached.
855 """
856 maxQPS = 10
857 name = 'qps.advanced.tests.powerdns.com.'
858 query = dns.message.make_query(name, 'A', 'IN')
859 response = dns.message.make_response(query)
860 rrset = dns.rrset.from_text(name,
861 60,
862 dns.rdataclass.IN,
863 dns.rdatatype.A,
864 '192.0.2.1')
865 response.answer.append(rrset)
866
867 for _ in range(maxQPS):
868 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
869 receivedQuery.id = query.id
870 self.assertEquals(query, receivedQuery)
871 self.assertEquals(response, receivedResponse)
872
873 # we should now be dropped
874 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
875 self.assertEquals(receivedResponse, None)
876
877 time.sleep(1)
878
879 # again, over TCP this time
880 for _ in range(maxQPS):
881 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
882 receivedQuery.id = query.id
883 self.assertEquals(query, receivedQuery)
884 self.assertEquals(response, receivedResponse)
885
886
887 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
888 self.assertEquals(receivedResponse, None)
889
890 class TestAdvancedQPSNone(DNSDistTest):
891
892 _config_template = """
893 addAction("qpsnone.advanced.tests.powerdns.com", QPSAction(100))
894 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
895 newServer{address="127.0.0.1:%s"}
896 """
897
898 def testAdvancedQPSNone(self):
899 """
900 Advanced: Not matching QPS returns None, not Allow
901
902 Send queries to "qps.advanced.tests.powerdns.com."
903 check that the rule returns None when the QPS has not been
904 reached, not Allow.
905 """
906 name = 'qpsnone.advanced.tests.powerdns.com.'
907 query = dns.message.make_query(name, 'A', 'IN')
908 query.flags &= ~dns.flags.RD
909 expectedResponse = dns.message.make_response(query)
910 expectedResponse.set_rcode(dns.rcode.REFUSED)
911
912 for method in ("sendUDPQuery", "sendTCPQuery"):
913 sender = getattr(self, method)
914 (_, receivedResponse) = sender(query, response=None, useQueue=False)
915 self.assertEquals(receivedResponse, expectedResponse)
916
917 class TestAdvancedNMGRule(DNSDistTest):
918
919 _config_template = """
920 allowed = newNMG()
921 allowed:addMask("192.0.2.1/32")
922 addAction(NotRule(NetmaskGroupRule(allowed)), RCodeAction(DNSRCode.REFUSED))
923 newServer{address="127.0.0.1:%s"}
924 """
925
926 def testAdvancedNMGRule(self):
927 """
928 Advanced: NMGRule should refuse our queries
929
930 Send queries to "nmgrule.advanced.tests.powerdns.com.",
931 check that we are getting a REFUSED response.
932 """
933 name = 'nmgrule.advanced.tests.powerdns.com.'
934 query = dns.message.make_query(name, 'A', 'IN')
935 query.flags &= ~dns.flags.RD
936 expectedResponse = dns.message.make_response(query)
937 expectedResponse.set_rcode(dns.rcode.REFUSED)
938
939 for method in ("sendUDPQuery", "sendTCPQuery"):
940 sender = getattr(self, method)
941 (_, receivedResponse) = sender(query, response=None, useQueue=False)
942 self.assertEquals(receivedResponse, expectedResponse)
943
944 class TestDSTPortRule(DNSDistTest):
945
946 _config_params = ['_dnsDistPort', '_testServerPort']
947 _config_template = """
948 addAction(DSTPortRule(%d), RCodeAction(DNSRCode.REFUSED))
949 newServer{address="127.0.0.1:%s"}
950 """
951
952 def testDSTPortRule(self):
953 """
954 Advanced: DSTPortRule should capture our queries
955
956 Send queries to "dstportrule.advanced.tests.powerdns.com.",
957 check that we are getting a REFUSED response.
958 """
959
960 name = 'dstportrule.advanced.tests.powerdns.com.'
961 query = dns.message.make_query(name, 'A', 'IN')
962 query.flags &= ~dns.flags.RD
963 expectedResponse = dns.message.make_response(query)
964 expectedResponse.set_rcode(dns.rcode.REFUSED)
965
966 for method in ("sendUDPQuery", "sendTCPQuery"):
967 sender = getattr(self, method)
968 (_, receivedResponse) = sender(query, response=None, useQueue=False)
969 self.assertEquals(receivedResponse, expectedResponse)
970
971 class TestAdvancedLabelsCountRule(DNSDistTest):
972
973 _config_template = """
974 addAction(QNameLabelsCountRule(5,6), RCodeAction(DNSRCode.REFUSED))
975 newServer{address="127.0.0.1:%s"}
976 """
977
978 def testAdvancedLabelsCountRule(self):
979 """
980 Advanced: QNameLabelsCountRule(5,6)
981 """
982 # 6 labels, we should be fine
983 name = 'ok.labelscount.advanced.tests.powerdns.com.'
984 query = dns.message.make_query(name, 'A', 'IN')
985 response = dns.message.make_response(query)
986 rrset = dns.rrset.from_text(name,
987 3600,
988 dns.rdataclass.IN,
989 dns.rdatatype.A,
990 '192.0.2.1')
991 response.answer.append(rrset)
992
993 for method in ("sendUDPQuery", "sendTCPQuery"):
994 sender = getattr(self, method)
995 (receivedQuery, receivedResponse) = sender(query, response)
996 self.assertTrue(receivedQuery)
997 self.assertTrue(receivedResponse)
998 receivedQuery.id = query.id
999 self.assertEquals(query, receivedQuery)
1000 self.assertEquals(response, receivedResponse)
1001
1002 # more than 6 labels, the query should be refused
1003 name = 'not.ok.labelscount.advanced.tests.powerdns.com.'
1004 query = dns.message.make_query(name, 'A', 'IN')
1005 query.flags &= ~dns.flags.RD
1006 expectedResponse = dns.message.make_response(query)
1007 expectedResponse.set_rcode(dns.rcode.REFUSED)
1008
1009 for method in ("sendUDPQuery", "sendTCPQuery"):
1010 sender = getattr(self, method)
1011 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1012 self.assertEquals(receivedResponse, expectedResponse)
1013
1014 # less than 5 labels, the query should be refused
1015 name = 'labelscountadvanced.tests.powerdns.com.'
1016 query = dns.message.make_query(name, 'A', 'IN')
1017 query.flags &= ~dns.flags.RD
1018 expectedResponse = dns.message.make_response(query)
1019 expectedResponse.set_rcode(dns.rcode.REFUSED)
1020
1021 for method in ("sendUDPQuery", "sendTCPQuery"):
1022 sender = getattr(self, method)
1023 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1024 self.assertEquals(receivedResponse, expectedResponse)
1025
1026 class TestAdvancedWireLengthRule(DNSDistTest):
1027
1028 _config_template = """
1029 addAction(QNameWireLengthRule(54,56), RCodeAction(DNSRCode.REFUSED))
1030 newServer{address="127.0.0.1:%s"}
1031 """
1032
1033 def testAdvancedWireLengthRule(self):
1034 """
1035 Advanced: QNameWireLengthRule(54,56)
1036 """
1037 name = 'longenough.qnamewirelength.advanced.tests.powerdns.com.'
1038 query = dns.message.make_query(name, 'A', 'IN')
1039 response = dns.message.make_response(query)
1040 rrset = dns.rrset.from_text(name,
1041 3600,
1042 dns.rdataclass.IN,
1043 dns.rdatatype.A,
1044 '192.0.2.1')
1045 response.answer.append(rrset)
1046
1047 for method in ("sendUDPQuery", "sendTCPQuery"):
1048 sender = getattr(self, method)
1049 (receivedQuery, receivedResponse) = sender(query, response)
1050 self.assertTrue(receivedQuery)
1051 self.assertTrue(receivedResponse)
1052 receivedQuery.id = query.id
1053 self.assertEquals(query, receivedQuery)
1054 self.assertEquals(response, receivedResponse)
1055
1056 # too short, the query should be refused
1057 name = 'short.qnamewirelength.advanced.tests.powerdns.com.'
1058 query = dns.message.make_query(name, 'A', 'IN')
1059 query.flags &= ~dns.flags.RD
1060 expectedResponse = dns.message.make_response(query)
1061 expectedResponse.set_rcode(dns.rcode.REFUSED)
1062
1063 for method in ("sendUDPQuery", "sendTCPQuery"):
1064 sender = getattr(self, method)
1065 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1066 self.assertEquals(receivedResponse, expectedResponse)
1067
1068 # too long, the query should be refused
1069 name = 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
1070 query = dns.message.make_query(name, 'A', 'IN')
1071 query.flags &= ~dns.flags.RD
1072 expectedResponse = dns.message.make_response(query)
1073 expectedResponse.set_rcode(dns.rcode.REFUSED)
1074
1075 for method in ("sendUDPQuery", "sendTCPQuery"):
1076 sender = getattr(self, method)
1077 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1078 self.assertEquals(receivedResponse, expectedResponse)
1079
1080 class TestAdvancedIncludeDir(DNSDistTest):
1081
1082 _config_template = """
1083 -- this directory contains a file allowing includedir.advanced.tests.powerdns.com.
1084 includeDirectory('test-include-dir')
1085 newServer{address="127.0.0.1:%s"}
1086 """
1087
1088 def testAdvancedIncludeDirAllowed(self):
1089 """
1090 Advanced: includeDirectory()
1091 """
1092 name = 'includedir.advanced.tests.powerdns.com.'
1093 query = dns.message.make_query(name, 'A', 'IN')
1094 response = dns.message.make_response(query)
1095 rrset = dns.rrset.from_text(name,
1096 3600,
1097 dns.rdataclass.IN,
1098 dns.rdatatype.A,
1099 '192.0.2.1')
1100 response.answer.append(rrset)
1101
1102 for method in ("sendUDPQuery", "sendTCPQuery"):
1103 sender = getattr(self, method)
1104 (receivedQuery, receivedResponse) = sender(query, response)
1105 self.assertTrue(receivedQuery)
1106 self.assertTrue(receivedResponse)
1107 receivedQuery.id = query.id
1108 self.assertEquals(query, receivedQuery)
1109 self.assertEquals(response, receivedResponse)
1110
1111 # this one should be refused
1112 name = 'notincludedir.advanced.tests.powerdns.com.'
1113 query = dns.message.make_query(name, 'A', 'IN')
1114 query.flags &= ~dns.flags.RD
1115 expectedResponse = dns.message.make_response(query)
1116 expectedResponse.set_rcode(dns.rcode.REFUSED)
1117
1118 for method in ("sendUDPQuery", "sendTCPQuery"):
1119 sender = getattr(self, method)
1120 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1121 self.assertEquals(receivedResponse, expectedResponse)
1122
1123 class TestAdvancedLuaDO(DNSDistTest):
1124
1125 _config_template = """
1126 function nxDOLua(dq)
1127 if dq:getDO() then
1128 return DNSAction.Nxdomain, ""
1129 end
1130 return DNSAction.None, ""
1131 end
1132 addAction(AllRule(), LuaAction(nxDOLua))
1133 newServer{address="127.0.0.1:%s"}
1134 """
1135
1136 def testNxDOViaLua(self):
1137 """
1138 Advanced: Nx DO queries via Lua
1139 """
1140 name = 'nxdo.advanced.tests.powerdns.com.'
1141 query = dns.message.make_query(name, 'A', 'IN')
1142 response = dns.message.make_response(query)
1143 rrset = dns.rrset.from_text(name,
1144 3600,
1145 dns.rdataclass.IN,
1146 dns.rdatatype.AAAA,
1147 '::1')
1148 response.answer.append(rrset)
1149 queryWithDO = dns.message.make_query(name, 'A', 'IN', want_dnssec=True)
1150 doResponse = dns.message.make_response(queryWithDO)
1151 doResponse.set_rcode(dns.rcode.NXDOMAIN)
1152
1153 # without DO
1154 for method in ("sendUDPQuery", "sendTCPQuery"):
1155 sender = getattr(self, method)
1156 (receivedQuery, receivedResponse) = sender(query, response)
1157 self.assertTrue(receivedQuery)
1158 self.assertTrue(receivedResponse)
1159 receivedQuery.id = query.id
1160 self.assertEquals(query, receivedQuery)
1161 self.assertEquals(receivedResponse, response)
1162
1163 # with DO
1164 for method in ("sendUDPQuery", "sendTCPQuery"):
1165 sender = getattr(self, method)
1166 (_, receivedResponse) = sender(queryWithDO, response=None, useQueue=False)
1167 self.assertTrue(receivedResponse)
1168 doResponse.id = receivedResponse.id
1169 self.assertEquals(receivedResponse, doResponse)
1170
1171 class TestAdvancedLuaRefused(DNSDistTest):
1172
1173 _config_template = """
1174 function refuse(dq)
1175 return DNSAction.Refused, ""
1176 end
1177 addAction(AllRule(), LuaAction(refuse))
1178 newServer{address="127.0.0.1:%s"}
1179 """
1180
1181 def testRefusedViaLua(self):
1182 """
1183 Advanced: Refused via Lua
1184 """
1185 name = 'refused.advanced.tests.powerdns.com.'
1186 query = dns.message.make_query(name, 'A', 'IN')
1187 response = dns.message.make_response(query)
1188 rrset = dns.rrset.from_text(name,
1189 3600,
1190 dns.rdataclass.IN,
1191 dns.rdatatype.AAAA,
1192 '::1')
1193 response.answer.append(rrset)
1194 refusedResponse = dns.message.make_response(query)
1195 refusedResponse.set_rcode(dns.rcode.REFUSED)
1196
1197 for method in ("sendUDPQuery", "sendTCPQuery"):
1198 sender = getattr(self, method)
1199 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1200 self.assertTrue(receivedResponse)
1201 refusedResponse.id = receivedResponse.id
1202 self.assertEquals(receivedResponse, refusedResponse)
1203
1204 class TestAdvancedLuaActionReturnSyntax(DNSDistTest):
1205
1206 _config_template = """
1207 function refuse(dq)
1208 return DNSAction.Refused
1209 end
1210 addAction(AllRule(), LuaAction(refuse))
1211 newServer{address="127.0.0.1:%s"}
1212 """
1213
1214 def testRefusedWithEmptyRule(self):
1215 """
1216 Advanced: Short syntax for LuaAction return values
1217 """
1218 name = 'short.refused.advanced.tests.powerdns.com.'
1219 query = dns.message.make_query(name, 'A', 'IN')
1220 response = dns.message.make_response(query)
1221 rrset = dns.rrset.from_text(name,
1222 3600,
1223 dns.rdataclass.IN,
1224 dns.rdatatype.AAAA,
1225 '::1')
1226 response.answer.append(rrset)
1227 refusedResponse = dns.message.make_response(query)
1228 refusedResponse.set_rcode(dns.rcode.REFUSED)
1229
1230 for method in ("sendUDPQuery", "sendTCPQuery"):
1231 sender = getattr(self, method)
1232 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1233 self.assertTrue(receivedResponse)
1234 refusedResponse.id = receivedResponse.id
1235 self.assertEquals(receivedResponse, refusedResponse)
1236
1237 class TestAdvancedLuaTruncated(DNSDistTest):
1238
1239 _config_template = """
1240 function trunc(dq)
1241 if not dq.tcp then
1242 return DNSAction.Truncate, ""
1243 end
1244 return DNSAction.None, ""
1245 end
1246 addAction(AllRule(), LuaAction(trunc))
1247 newServer{address="127.0.0.1:%s"}
1248 """
1249
1250 def testTCViaLua(self):
1251 """
1252 Advanced: TC via Lua
1253 """
1254 name = 'tc.advanced.tests.powerdns.com.'
1255 query = dns.message.make_query(name, 'A', 'IN')
1256 # dnsdist sets RA = RD for TC responses
1257 query.flags &= ~dns.flags.RD
1258 response = dns.message.make_response(query)
1259 rrset = dns.rrset.from_text(name,
1260 3600,
1261 dns.rdataclass.IN,
1262 dns.rdatatype.AAAA,
1263 '::1')
1264 response.answer.append(rrset)
1265
1266 truncatedResponse = dns.message.make_response(query)
1267 truncatedResponse.flags |= dns.flags.TC
1268
1269 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
1270 self.assertTrue(receivedResponse)
1271 truncatedResponse.id = receivedResponse.id
1272 self.assertEquals(receivedResponse, truncatedResponse)
1273
1274 # no truncation over TCP
1275 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
1276 self.assertTrue(receivedQuery)
1277 self.assertTrue(receivedResponse)
1278 receivedQuery.id = query.id
1279 self.assertEquals(query, receivedQuery)
1280 self.assertEquals(receivedResponse, response)
1281
1282 class TestStatNodeRespRingSince(DNSDistTest):
1283
1284 _consoleKey = DNSDistTest.generateConsoleKey()
1285 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
1286 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
1287 _config_template = """
1288 setKey("%s")
1289 controlSocket("127.0.0.1:%s")
1290 s1 = newServer{address="127.0.0.1:%s"}
1291 s1:setUp()
1292 function visitor(node, self, childstat)
1293 table.insert(nodesSeen, node.fullname)
1294 end
1295 """
1296
1297 def testStatNodeRespRingSince(self):
1298 """
1299 Advanced: StatNodeRespRing with optional since parameter
1300
1301 """
1302 name = 'statnodesince.advanced.tests.powerdns.com.'
1303 query = dns.message.make_query(name, 'A', 'IN')
1304 response = dns.message.make_response(query)
1305 rrset = dns.rrset.from_text(name,
1306 1,
1307 dns.rdataclass.IN,
1308 dns.rdatatype.A,
1309 '127.0.0.1')
1310 response.answer.append(rrset)
1311
1312 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
1313 self.assertTrue(receivedQuery)
1314 self.assertTrue(receivedResponse)
1315 receivedQuery.id = query.id
1316 self.assertEquals(query, receivedQuery)
1317 self.assertEquals(response, receivedResponse)
1318
1319 self.sendConsoleCommand("nodesSeen = {}")
1320 self.sendConsoleCommand("statNodeRespRing(visitor)")
1321 nodes = self.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1322 nodes = nodes.strip("\n")
1323 self.assertEquals(nodes, """statnodesince.advanced.tests.powerdns.com.
1324 advanced.tests.powerdns.com.
1325 tests.powerdns.com.
1326 powerdns.com.
1327 com.""")
1328
1329 self.sendConsoleCommand("nodesSeen = {}")
1330 self.sendConsoleCommand("statNodeRespRing(visitor, 0)")
1331 nodes = self.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1332 nodes = nodes.strip("\n")
1333 self.assertEquals(nodes, """statnodesince.advanced.tests.powerdns.com.
1334 advanced.tests.powerdns.com.
1335 tests.powerdns.com.
1336 powerdns.com.
1337 com.""")
1338
1339 time.sleep(5)
1340
1341 self.sendConsoleCommand("nodesSeen = {}")
1342 self.sendConsoleCommand("statNodeRespRing(visitor)")
1343 nodes = self.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1344 nodes = nodes.strip("\n")
1345 self.assertEquals(nodes, """statnodesince.advanced.tests.powerdns.com.
1346 advanced.tests.powerdns.com.
1347 tests.powerdns.com.
1348 powerdns.com.
1349 com.""")
1350
1351 self.sendConsoleCommand("nodesSeen = {}")
1352 self.sendConsoleCommand("statNodeRespRing(visitor, 5)")
1353 nodes = self.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1354 nodes = nodes.strip("\n")
1355 self.assertEquals(nodes, """""")
1356
1357 self.sendConsoleCommand("nodesSeen = {}")
1358 self.sendConsoleCommand("statNodeRespRing(visitor, 10)")
1359 nodes = self.sendConsoleCommand("str = '' for key,value in pairs(nodesSeen) do str = str..value..\"\\n\" end return str")
1360 nodes = nodes.strip("\n")
1361 self.assertEquals(nodes, """statnodesince.advanced.tests.powerdns.com.
1362 advanced.tests.powerdns.com.
1363 tests.powerdns.com.
1364 powerdns.com.
1365 com.""")
1366
1367 class TestAdvancedRD(DNSDistTest):
1368
1369 _config_template = """
1370 addAction(RDRule(), RCodeAction(DNSRCode.REFUSED))
1371 newServer{address="127.0.0.1:%s"}
1372 """
1373
1374 def testAdvancedRDRefused(self):
1375 """
1376 Advanced: RD query is refused
1377 """
1378 name = 'rd.advanced.tests.powerdns.com.'
1379 query = dns.message.make_query(name, 'A', 'IN')
1380 expectedResponse = dns.message.make_response(query)
1381 expectedResponse.set_rcode(dns.rcode.REFUSED)
1382 expectedResponse.flags |= dns.flags.RA
1383
1384 for method in ("sendUDPQuery", "sendTCPQuery"):
1385 sender = getattr(self, method)
1386 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1387 self.assertEquals(receivedResponse, expectedResponse)
1388
1389 def testAdvancedNoRDAllowed(self):
1390 """
1391 Advanced: No-RD query is allowed
1392 """
1393 name = 'no-rd.advanced.tests.powerdns.com.'
1394 query = dns.message.make_query(name, 'A', 'IN')
1395 query.flags &= ~dns.flags.RD
1396 response = dns.message.make_response(query)
1397
1398 for method in ("sendUDPQuery", "sendTCPQuery"):
1399 sender = getattr(self, method)
1400 (receivedQuery, receivedResponse) = sender(query, response)
1401 receivedQuery.id = query.id
1402 self.assertEquals(receivedQuery, query)
1403 self.assertEquals(receivedResponse, response)
1404
1405 class TestAdvancedGetLocalPort(DNSDistTest):
1406
1407 _config_template = """
1408 function answerBasedOnLocalPort(dq)
1409 local port = dq.localaddr:getPort()
1410 return DNSAction.Spoof, "port-was-"..port..".local-port.advanced.tests.powerdns.com."
1411 end
1412 addAction("local-port.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1413 newServer{address="127.0.0.1:%s"}
1414 """
1415
1416 def testAdvancedGetLocalPort(self):
1417 """
1418 Advanced: Return CNAME containing the local port
1419 """
1420 name = 'local-port.advanced.tests.powerdns.com.'
1421 query = dns.message.make_query(name, 'A', 'IN')
1422 # dnsdist set RA = RD for spoofed responses
1423 query.flags &= ~dns.flags.RD
1424
1425 response = dns.message.make_response(query)
1426 rrset = dns.rrset.from_text(name,
1427 60,
1428 dns.rdataclass.IN,
1429 dns.rdatatype.CNAME,
1430 'port-was-{}.local-port.advanced.tests.powerdns.com.'.format(self._dnsDistPort))
1431 response.answer.append(rrset)
1432
1433 for method in ("sendUDPQuery", "sendTCPQuery"):
1434 sender = getattr(self, method)
1435 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1436 self.assertEquals(receivedResponse, response)
1437
1438 class TestAdvancedGetLocalPortOnAnyBind(DNSDistTest):
1439
1440 _config_template = """
1441 function answerBasedOnLocalPort(dq)
1442 local port = dq.localaddr:getPort()
1443 return DNSAction.Spoof, "port-was-"..port..".local-port-any.advanced.tests.powerdns.com."
1444 end
1445 addAction("local-port-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalPort))
1446 newServer{address="127.0.0.1:%s"}
1447 """
1448 _dnsDistListeningAddr = "0.0.0.0"
1449
1450 def testAdvancedGetLocalPortOnAnyBind(self):
1451 """
1452 Advanced: Return CNAME containing the local port for an ANY bind
1453 """
1454 name = 'local-port-any.advanced.tests.powerdns.com.'
1455 query = dns.message.make_query(name, 'A', 'IN')
1456 # dnsdist set RA = RD for spoofed responses
1457 query.flags &= ~dns.flags.RD
1458
1459 response = dns.message.make_response(query)
1460 rrset = dns.rrset.from_text(name,
1461 60,
1462 dns.rdataclass.IN,
1463 dns.rdatatype.CNAME,
1464 'port-was-{}.local-port-any.advanced.tests.powerdns.com.'.format(self._dnsDistPort))
1465 response.answer.append(rrset)
1466
1467 for method in ("sendUDPQuery", "sendTCPQuery"):
1468 sender = getattr(self, method)
1469 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1470 self.assertEquals(receivedResponse, response)
1471
1472 class TestAdvancedGetLocalAddressOnAnyBind(DNSDistTest):
1473
1474 _config_template = """
1475 function answerBasedOnLocalAddress(dq)
1476 local dest = dq.localaddr:toString()
1477 local i, j = string.find(dest, "[0-9.]+")
1478 local addr = string.sub(dest, i, j)
1479 local dashAddr = string.gsub(addr, "[.]", "-")
1480 return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
1481 end
1482 addAction("local-address-any.advanced.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
1483 newServer{address="127.0.0.1:%s"}
1484 """
1485 _dnsDistListeningAddr = "0.0.0.0"
1486
1487 def testAdvancedGetLocalAddressOnAnyBind(self):
1488 """
1489 Advanced: Return CNAME containing the local address for an ANY bind
1490 """
1491 name = 'local-address-any.advanced.tests.powerdns.com.'
1492 query = dns.message.make_query(name, 'A', 'IN')
1493 # dnsdist set RA = RD for spoofed responses
1494 query.flags &= ~dns.flags.RD
1495
1496 response = dns.message.make_response(query)
1497 rrset = dns.rrset.from_text(name,
1498 60,
1499 dns.rdataclass.IN,
1500 dns.rdatatype.CNAME,
1501 'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.')
1502 response.answer.append(rrset)
1503
1504 for method in ("sendUDPQuery", "sendTCPQuery"):
1505 sender = getattr(self, method)
1506 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1507 self.assertEquals(receivedResponse, response)
1508
1509 class TestAdvancedLuaTempFailureTTL(DNSDistTest):
1510
1511 _config_template = """
1512 function testAction(dq)
1513 if dq.tempFailureTTL ~= nil then
1514 return DNSAction.Spoof, "initially.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1515 end
1516 dq.tempFailureTTL = 30
1517 if dq.tempFailureTTL ~= 30 then
1518 return DNSAction.Spoof, "after.set.not.expected.value.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1519 end
1520 dq.tempFailureTTL = nil
1521 if dq.tempFailureTTL ~= nil then
1522 return DNSAction.Spoof, "after.unset.not.nil.but." + dq.tempFailureTTL + ".tests.powerdns.com."
1523 end
1524 return DNSAction.None, ""
1525 end
1526 addAction(AllRule(), LuaAction(testAction))
1527 newServer{address="127.0.0.1:%s"}
1528 """
1529
1530 def testTempFailureTTLBinding(self):
1531 """
1532 Advanced: Exercise dq.tempFailureTTL Lua binding
1533 """
1534 name = 'tempfailurettlbinding.advanced.tests.powerdns.com.'
1535 query = dns.message.make_query(name, 'A', 'IN')
1536 response = dns.message.make_response(query)
1537 rrset = dns.rrset.from_text(name,
1538 3600,
1539 dns.rdataclass.IN,
1540 dns.rdatatype.AAAA,
1541 '::1')
1542 response.answer.append(rrset)
1543
1544 for method in ("sendUDPQuery", "sendTCPQuery"):
1545 sender = getattr(self, method)
1546 (receivedQuery, receivedResponse) = sender(query, response)
1547 self.assertTrue(receivedQuery)
1548 self.assertTrue(receivedResponse)
1549 receivedQuery.id = query.id
1550 self.assertEquals(query, receivedQuery)
1551 self.assertEquals(receivedResponse, response)
1552
1553 class TestAdvancedEDNSOptionRule(DNSDistTest):
1554
1555 _config_template = """
1556 newServer{address="127.0.0.1:%s"}
1557 addAction(EDNSOptionRule(EDNSOptionCode.ECS), DropAction())
1558 """
1559
1560 def testDropped(self):
1561 """
1562 Advanced: A question with ECS is dropped
1563 """
1564
1565 name = 'ednsoptionrule.advanced.tests.powerdns.com.'
1566
1567 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
1568 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
1569
1570 for method in ("sendUDPQuery", "sendTCPQuery"):
1571 sender = getattr(self, method)
1572 (_, receivedResponse) = sender(query, response=None)
1573 self.assertEquals(receivedResponse, None)
1574
1575 def testReplied(self):
1576 """
1577 Advanced: A question without ECS is answered
1578 """
1579
1580 name = 'ednsoptionrule.advanced.tests.powerdns.com.'
1581
1582 # both with EDNS
1583 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[], payload=512)
1584 response = dns.message.make_response(query)
1585
1586 for method in ("sendUDPQuery", "sendTCPQuery"):
1587 sender = getattr(self, method)
1588 (receivedQuery, receivedResponse) = sender(query, response)
1589 self.assertTrue(receivedQuery)
1590 self.assertTrue(receivedResponse)
1591
1592 receivedQuery.id = query.id
1593 self.assertEquals(query, receivedQuery)
1594 self.assertEquals(receivedResponse, response)
1595
1596 # and with no EDNS at all
1597 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
1598 response = dns.message.make_response(query)
1599
1600 for method in ("sendUDPQuery", "sendTCPQuery"):
1601 sender = getattr(self, method)
1602 (receivedQuery, receivedResponse) = sender(query, response)
1603 self.assertTrue(receivedQuery)
1604 self.assertTrue(receivedResponse)
1605
1606 receivedQuery.id = query.id
1607 self.assertEquals(query, receivedQuery)
1608 self.assertEquals(receivedResponse, response)
1609
1610 class TestAdvancedAllowHeaderOnly(DNSDistTest):
1611
1612 _config_template = """
1613 newServer{address="127.0.0.1:%s"}
1614 setAllowEmptyResponse(true)
1615 """
1616
1617 def testHeaderOnlyRefused(self):
1618 """
1619 Advanced: Header-only refused response
1620 """
1621 name = 'header-only-refused-response.advanced.tests.powerdns.com.'
1622 query = dns.message.make_query(name, 'A', 'IN')
1623 response = dns.message.make_response(query)
1624 response.set_rcode(dns.rcode.REFUSED)
1625 response.question = []
1626
1627 for method in ("sendUDPQuery", "sendTCPQuery"):
1628 sender = getattr(self, method)
1629 (receivedQuery, receivedResponse) = sender(query, response)
1630 self.assertTrue(receivedQuery)
1631 receivedQuery.id = query.id
1632 self.assertEquals(query, receivedQuery)
1633 self.assertEquals(receivedResponse, response)
1634
1635 def testHeaderOnlyNoErrorResponse(self):
1636 """
1637 Advanced: Header-only NoError response should be allowed
1638 """
1639 name = 'header-only-noerror-response.advanced.tests.powerdns.com.'
1640 query = dns.message.make_query(name, 'A', 'IN')
1641 response = dns.message.make_response(query)
1642 response.question = []
1643
1644 for method in ("sendUDPQuery", "sendTCPQuery"):
1645 sender = getattr(self, method)
1646 (receivedQuery, receivedResponse) = sender(query, response)
1647 self.assertTrue(receivedQuery)
1648 receivedQuery.id = query.id
1649 self.assertEquals(query, receivedQuery)
1650 self.assertEquals(receivedResponse, response)
1651
1652 def testHeaderOnlyNXDResponse(self):
1653 """
1654 Advanced: Header-only NXD response should be allowed
1655 """
1656 name = 'header-only-nxd-response.advanced.tests.powerdns.com.'
1657 query = dns.message.make_query(name, 'A', 'IN')
1658 response = dns.message.make_response(query)
1659 response.set_rcode(dns.rcode.NXDOMAIN)
1660 response.question = []
1661
1662 for method in ("sendUDPQuery", "sendTCPQuery"):
1663 sender = getattr(self, method)
1664 (receivedQuery, receivedResponse) = sender(query, response)
1665 self.assertTrue(receivedQuery)
1666 receivedQuery.id = query.id
1667 self.assertEquals(query, receivedQuery)
1668 self.assertEquals(receivedResponse, response)
1669
1670 class TestAdvancedEDNSVersionRule(DNSDistTest):
1671
1672 _config_template = """
1673 newServer{address="127.0.0.1:%s"}
1674 addAction(EDNSVersionRule(0), ERCodeAction(DNSRCode.BADVERS))
1675 """
1676
1677 def testBadVers(self):
1678 """
1679 Advanced: A question with ECS version larger than 0 yields BADVERS
1680 """
1681
1682 name = 'ednsversionrule.advanced.tests.powerdns.com.'
1683
1684 query = dns.message.make_query(name, 'A', 'IN', use_edns=1)
1685 query.flags &= ~dns.flags.RD
1686 expectedResponse = dns.message.make_response(query)
1687 expectedResponse.set_rcode(dns.rcode.BADVERS)
1688
1689 for method in ("sendUDPQuery", "sendTCPQuery"):
1690 sender = getattr(self, method)
1691 (_, receivedResponse) = sender(query, response=None)
1692 self.assertEquals(receivedResponse, expectedResponse)
1693
1694 def testNoEDNS0Pass(self):
1695 """
1696 Advanced: A question with ECS version 0 goes through
1697 """
1698
1699 name = 'ednsversionrule.advanced.tests.powerdns.com.'
1700
1701 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
1702 response = dns.message.make_response(query)
1703
1704 for method in ("sendUDPQuery", "sendTCPQuery"):
1705 sender = getattr(self, method)
1706 (receivedQuery, receivedResponse) = sender(query, response)
1707 receivedQuery.id = query.id
1708 self.assertEquals(query, receivedQuery)
1709 self.assertEquals(receivedResponse, response)
1710
1711 def testReplied(self):
1712 """
1713 Advanced: A question without ECS goes through
1714 """
1715
1716 name = 'ednsoptionrule.advanced.tests.powerdns.com.'
1717
1718 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
1719 response = dns.message.make_response(query)
1720
1721 for method in ("sendUDPQuery", "sendTCPQuery"):
1722 sender = getattr(self, method)
1723 (receivedQuery, receivedResponse) = sender(query, response)
1724 receivedQuery.id = query.id
1725 self.assertEquals(query, receivedQuery)
1726 self.assertEquals(receivedResponse, response)
1727
1728 class TestSetRules(DNSDistTest):
1729
1730 _consoleKey = DNSDistTest.generateConsoleKey()
1731 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
1732 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
1733 _config_template = """
1734 setKey("%s")
1735 controlSocket("127.0.0.1:%s")
1736 newServer{address="127.0.0.1:%s"}
1737 addAction(AllRule(), SpoofAction("192.0.2.1"))
1738 """
1739
1740 def testClearThenSetRules(self):
1741 """
1742 Advanced: Clear rules, set rules
1743
1744 """
1745 name = 'clearthensetrules.advanced.tests.powerdns.com.'
1746 query = dns.message.make_query(name, 'A', 'IN')
1747 # dnsdist set RA = RD for spoofed responses
1748 query.flags &= ~dns.flags.RD
1749 expectedResponse = dns.message.make_response(query)
1750 rrset = dns.rrset.from_text(name,
1751 60,
1752 dns.rdataclass.IN,
1753 dns.rdatatype.A,
1754 '192.0.2.1')
1755 expectedResponse.answer.append(rrset)
1756
1757 for method in ("sendUDPQuery", "sendTCPQuery"):
1758 sender = getattr(self, method)
1759
1760 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1761 self.assertTrue(receivedResponse)
1762 self.assertEquals(expectedResponse, receivedResponse)
1763
1764 # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead
1765 self.sendConsoleCommand("clearRules()")
1766
1767 expectedResponse = dns.message.make_response(query)
1768 expectedResponse.set_rcode(dns.rcode.SERVFAIL)
1769
1770 for method in ("sendUDPQuery", "sendTCPQuery"):
1771 sender = getattr(self, method)
1772
1773 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1774 self.assertTrue(receivedResponse)
1775 self.assertEquals(expectedResponse, receivedResponse)
1776
1777 # insert a new spoofing rule
1778 self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })")
1779
1780 expectedResponse = dns.message.make_response(query)
1781 rrset = dns.rrset.from_text(name,
1782 60,
1783 dns.rdataclass.IN,
1784 dns.rdatatype.A,
1785 '192.0.2.2')
1786 expectedResponse.answer.append(rrset)
1787
1788 for method in ("sendUDPQuery", "sendTCPQuery"):
1789 sender = getattr(self, method)
1790
1791 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1792 self.assertTrue(receivedResponse)
1793 self.assertEquals(expectedResponse, receivedResponse)
1794
1795 class TestAdvancedContinueAction(DNSDistTest):
1796
1797 _config_template = """
1798 newServer{address="127.0.0.1:%s", pool="mypool"}
1799 addAction("nocontinue.continue-action.advanced.tests.powerdns.com.", PoolAction("mypool"))
1800 addAction("continue.continue-action.advanced.tests.powerdns.com.", ContinueAction(PoolAction("mypool")))
1801 addAction(AllRule(), DisableValidationAction())
1802 """
1803
1804 def testNoContinue(self):
1805 """
1806 Advanced: Query routed to pool, PoolAction should be terminal
1807 """
1808
1809 name = 'nocontinue.continue-action.advanced.tests.powerdns.com.'
1810
1811 query = dns.message.make_query(name, 'A', 'IN')
1812 expectedQuery = dns.message.make_query(name, 'A', 'IN')
1813
1814 response = dns.message.make_response(query)
1815 expectedResponse = dns.message.make_response(query)
1816
1817 for method in ("sendUDPQuery", "sendTCPQuery"):
1818 sender = getattr(self, method)
1819 (receivedQuery, receivedResponse) = sender(query, response)
1820 self.assertEquals(receivedQuery, expectedQuery)
1821 self.assertEquals(receivedResponse, expectedResponse)
1822
1823 def testNoContinue(self):
1824 """
1825 Advanced: Query routed to pool, ContinueAction() should not stop the processing
1826 """
1827
1828 name = 'continue.continue-action.advanced.tests.powerdns.com.'
1829
1830 query = dns.message.make_query(name, 'A', 'IN')
1831 expectedQuery = dns.message.make_query(name, 'A', 'IN')
1832 expectedQuery.flags |= dns.flags.CD
1833
1834 response = dns.message.make_response(query)
1835 expectedResponse = dns.message.make_response(query)
1836
1837 for method in ("sendUDPQuery", "sendTCPQuery"):
1838 sender = getattr(self, method)
1839 (receivedQuery, receivedResponse) = sender(query, response)
1840 expectedQuery.id = receivedQuery.id
1841 self.assertEquals(receivedQuery, expectedQuery)
1842 print(receivedResponse)
1843 print(expectedResponse)
1844 self.assertEquals(receivedResponse, expectedResponse)
1845
1846 class TestAdvancedSetNegativeAndSOA(DNSDistTest):
1847
1848 _config_template = """
1849 addAction("nxd.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(true, "auth.", 42, "mname", "rname", 5, 4, 3, 2, 1))
1850 addAction("nodata.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(false, "another-auth.", 42, "mname", "rname", 1, 2, 3, 4, 5))
1851 newServer{address="127.0.0.1:%s"}
1852 """
1853
1854 def testAdvancedNegativeAndSOANXD(self):
1855 """
1856 Advanced: SetNegativeAndSOAAction NXD
1857 """
1858 name = 'nxd.setnegativeandsoa.advanced.tests.powerdns.com.'
1859 query = dns.message.make_query(name, 'A', 'IN')
1860 expectedResponse = dns.message.make_response(query)
1861 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
1862 soa = dns.rrset.from_text("auth",
1863 42,
1864 dns.rdataclass.IN,
1865 dns.rdatatype.SOA,
1866 'mname. rname. 5 4 3 2 1')
1867 expectedResponse.additional.append(soa)
1868
1869 for method in ("sendUDPQuery", "sendTCPQuery"):
1870 sender = getattr(self, method)
1871 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1872 self.assertEquals(receivedResponse, expectedResponse)
1873
1874 def testAdvancedNegativeAndSOANoData(self):
1875 """
1876 Advanced: SetNegativeAndSOAAction NoData
1877 """
1878 name = 'nodata.setnegativeandsoa.advanced.tests.powerdns.com.'
1879 query = dns.message.make_query(name, 'A', 'IN')
1880 expectedResponse = dns.message.make_response(query)
1881 expectedResponse.set_rcode(dns.rcode.NOERROR)
1882 soa = dns.rrset.from_text("another-auth",
1883 42,
1884 dns.rdataclass.IN,
1885 dns.rdatatype.SOA,
1886 'mname. rname. 1 2 3 4 5')
1887 expectedResponse.additional.append(soa)
1888
1889 for method in ("sendUDPQuery", "sendTCPQuery"):
1890 sender = getattr(self, method)
1891 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1892 self.assertEquals(receivedResponse, expectedResponse)