]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_SelfAnsweredResponses.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_SelfAnsweredResponses.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSelfAnsweredResponses(DNSDistTest):
6
7 _config_template = """
8 -- this is a silly test config, please do not do this in production.
9 addAction(makeRule("udp.selfanswered.tests.powerdns.com."), SpoofAction("192.0.2.1"))
10 addSelfAnsweredResponseAction(AndRule({makeRule("udp.selfanswered.tests.powerdns.com."), NotRule(MaxQPSRule(1))}), DropResponseAction())
11 addAction(makeRule("tcp.selfanswered.tests.powerdns.com."), SpoofAction("192.0.2.1"))
12 addSelfAnsweredResponseAction(AndRule({makeRule("tcp.selfanswered.tests.powerdns.com."), NotRule(MaxQPSRule(1))}), DropResponseAction())
13 newServer{address="127.0.0.1:%s"}
14 """
15
16 def testSelfAnsweredUDP(self):
17 """
18 SelfAnsweredResponses: Drop when served from the cache
19 """
20 ttl = 60
21 name = 'udp.selfanswered.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 response = dns.message.make_response(query)
24 rrset = dns.rrset.from_text(name,
25 ttl,
26 dns.rdataclass.IN,
27 dns.rdatatype.A,
28 '192.0.2.1')
29 response.answer.append(rrset)
30 response.flags |= dns.flags.RA
31
32 # self-answered, but no SelfAnswered rule matches.
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertTrue(receivedResponse)
35 self.assertEquals(receivedResponse, response)
36
37 # self-answered, AND SelfAnswered rule matches. Should not see a reply.
38 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
39 self.assertIsNone(receivedResponse)
40
41 def testSelfAnsweredTCP(self):
42 """
43 SelfAnsweredResponses: TCP: Drop after exceeding QPS
44 """
45 ttl = 60
46 name = 'tcp.selfanswered.tests.powerdns.com.'
47 query = dns.message.make_query(name, 'A', 'IN')
48 response = dns.message.make_response(query)
49 rrset = dns.rrset.from_text(name,
50 ttl,
51 dns.rdataclass.IN,
52 dns.rdatatype.A,
53 '192.0.2.1')
54 response.answer.append(rrset)
55 response.flags |= dns.flags.RA
56
57 # self-answered, but no SelfAnswered rule matches.
58 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
59 self.assertTrue(receivedResponse)
60 self.assertEquals(receivedResponse, response)
61
62 # self-answered, AND SelfAnswered rule matches. Should not see a reply.
63 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
64 self.assertIsNone(receivedResponse)