]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
Merge pull request #4752 from mind04/norec
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Responses.py
1 #!/usr/bin/env python
2 from datetime import datetime, timedelta
3 import time
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestResponseRuleNXDelayed(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), DelayResponseAction(1000))
12 """
13
14 def testNXDelayed(self):
15 """
16 Responses: Delayed on NXDomain
17
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
21 """
22 name = 'delayed.responses.tests.powerdns.com.'
23 query = dns.message.make_query(name, 'A', 'IN')
24 response = dns.message.make_response(query)
25
26 # NX over UDP
27 response.set_rcode(dns.rcode.NXDOMAIN)
28 begin = datetime.now()
29 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
30 end = datetime.now()
31 receivedQuery.id = query.id
32 self.assertEquals(query, receivedQuery)
33 self.assertEquals(response, receivedResponse)
34 self.assertTrue((end - begin) > timedelta(0, 1))
35
36 # NoError over UDP
37 response.set_rcode(dns.rcode.NOERROR)
38 begin = datetime.now()
39 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
40 end = datetime.now()
41 receivedQuery.id = query.id
42 self.assertEquals(query, receivedQuery)
43 self.assertEquals(response, receivedResponse)
44 self.assertTrue((end - begin) < timedelta(0, 1))
45
46 # NX over TCP
47 response.set_rcode(dns.rcode.NXDOMAIN)
48 begin = datetime.now()
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 end = datetime.now()
51 receivedQuery.id = query.id
52 self.assertEquals(query, receivedQuery)
53 self.assertEquals(response, receivedResponse)
54 self.assertTrue((end - begin) < timedelta(0, 1))
55
56 class TestResponseRuleQNameDropped(DNSDistTest):
57
58 _config_template = """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
61 """
62
63 def testDropped(self):
64 """
65 Responses: Dropped on QName
66
67 Send an A query to "drop.responses.tests.powerdns.com.",
68 check that the response (not the query) is dropped.
69 """
70 name = 'drop.responses.tests.powerdns.com.'
71 query = dns.message.make_query(name, 'A', 'IN')
72 response = dns.message.make_response(query)
73
74 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
75 receivedQuery.id = query.id
76 self.assertEquals(query, receivedQuery)
77 self.assertEquals(receivedResponse, None)
78
79 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
80 receivedQuery.id = query.id
81 self.assertEquals(query, receivedQuery)
82 self.assertEquals(receivedResponse, None)
83
84 def testNotDropped(self):
85 """
86 Responses: NOT Dropped on QName
87
88 Send an A query to "dontdrop.responses.tests.powerdns.com.",
89 check that the response is not dropped.
90 """
91 name = 'dontdrop.responses.tests.powerdns.com.'
92 query = dns.message.make_query(name, 'A', 'IN')
93 response = dns.message.make_response(query)
94
95 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
96 receivedQuery.id = query.id
97 self.assertEquals(query, receivedQuery)
98 self.assertEquals(response, receivedResponse)
99
100 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
101 receivedQuery.id = query.id
102 self.assertEquals(query, receivedQuery)
103 self.assertEquals(response, receivedResponse)
104
105 class TestResponseRuleQNameAllowed(DNSDistTest):
106
107 _config_template = """
108 newServer{address="127.0.0.1:%s"}
109 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
110 addResponseAction(AllRule(), DropResponseAction())
111 """
112
113 def testAllowed(self):
114 """
115 Responses: Allowed on QName
116
117 Send an A query to "allow.responses.tests.powerdns.com.",
118 check that the response is allowed.
119 """
120 name = 'allow.responses.tests.powerdns.com.'
121 query = dns.message.make_query(name, 'A', 'IN')
122 response = dns.message.make_response(query)
123
124 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
125 receivedQuery.id = query.id
126 self.assertEquals(query, receivedQuery)
127 self.assertEquals(response, receivedResponse)
128
129 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
130 receivedQuery.id = query.id
131 self.assertEquals(query, receivedQuery)
132 self.assertEquals(response, receivedResponse)
133
134 def testNotAllowed(self):
135 """
136 Responses: Not allowed on QName
137
138 Send an A query to "dontallow.responses.tests.powerdns.com.",
139 check that the response is dropped.
140 """
141 name = 'dontallow.responses.tests.powerdns.com.'
142 query = dns.message.make_query(name, 'A', 'IN')
143 response = dns.message.make_response(query)
144
145 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
146 receivedQuery.id = query.id
147 self.assertEquals(query, receivedQuery)
148 self.assertEquals(receivedResponse, None)
149
150 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
151 receivedQuery.id = query.id
152 self.assertEquals(query, receivedQuery)
153 self.assertEquals(receivedResponse, None)
154
155 class TestResponseRuleEditTTL(DNSDistTest):
156
157 _ttl = 5
158 _config_params = ['_testServerPort', '_ttl']
159 _config_template = """
160 newServer{address="127.0.0.1:%s"}
161
162 function editTTLCallback(section, class, type, ttl)
163 return %d
164 end
165
166 function editTTLFunc(dr)
167 dr:editTTLs(editTTLCallback)
168 return DNSAction.None, ""
169 end
170
171 addLuaResponseAction(AllRule(), editTTLFunc)
172 """
173
174 def testTTLEdited(self):
175 """
176 Responses: Alter the TTLs
177 """
178 name = 'editttl.responses.tests.powerdns.com.'
179 query = dns.message.make_query(name, 'A', 'IN')
180 response = dns.message.make_response(query)
181 rrset = dns.rrset.from_text(name,
182 3600,
183 dns.rdataclass.IN,
184 dns.rdatatype.A,
185 '192.0.2.1')
186 response.answer.append(rrset)
187
188 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
189 receivedQuery.id = query.id
190 self.assertEquals(query, receivedQuery)
191 self.assertEquals(response, receivedResponse)
192 self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl)
193 self.assertEquals(receivedResponse.answer[0].ttl, self._ttl)
194
195 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
196 receivedQuery.id = query.id
197 self.assertEquals(query, receivedQuery)
198 self.assertEquals(response, receivedResponse)
199 self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl)
200 self.assertEquals(receivedResponse.answer[0].ttl, self._ttl)