]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
Merge pull request #6327 from lowellmower/server-up-metric
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Responses.py
1 #!/usr/bin/env python
2 from datetime import datetime, timedelta
3 import time
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestResponseRuleNXDelayed(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), DelayResponseAction(1000))
12 """
13
14 def testNXDelayed(self):
15 """
16 Responses: Delayed on NXDomain
17
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
21 """
22 name = 'delayed.responses.tests.powerdns.com.'
23 query = dns.message.make_query(name, 'A', 'IN')
24 response = dns.message.make_response(query)
25
26 # NX over UDP
27 response.set_rcode(dns.rcode.NXDOMAIN)
28 begin = datetime.now()
29 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
30 end = datetime.now()
31 receivedQuery.id = query.id
32 self.assertEquals(query, receivedQuery)
33 self.assertEquals(response, receivedResponse)
34 self.assertTrue((end - begin) > timedelta(0, 1))
35
36 # NoError over UDP
37 response.set_rcode(dns.rcode.NOERROR)
38 begin = datetime.now()
39 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
40 end = datetime.now()
41 receivedQuery.id = query.id
42 self.assertEquals(query, receivedQuery)
43 self.assertEquals(response, receivedResponse)
44 self.assertTrue((end - begin) < timedelta(0, 1))
45
46 # NX over TCP
47 response.set_rcode(dns.rcode.NXDOMAIN)
48 begin = datetime.now()
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 end = datetime.now()
51 receivedQuery.id = query.id
52 self.assertEquals(query, receivedQuery)
53 self.assertEquals(response, receivedResponse)
54 self.assertTrue((end - begin) < timedelta(0, 1))
55
56 class TestResponseRuleERCode(DNSDistTest):
57
58 _config_template = """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction(ERCodeRule(dnsdist.BADVERS), DelayResponseAction(1000))
61 """
62
63 def testBADVERSDelayed(self):
64 """
65 Responses: Delayed on BADVERS
66
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
70 """
71 name = 'delayed.responses.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'A', 'IN')
73 response = dns.message.make_response(query)
74 response.use_edns(edns=True)
75
76 # BADVERS over UDP
77 # BADVERS == 16, so rcode==0, ercode==1
78 response.set_rcode(dns.rcode.BADVERS)
79 begin = datetime.now()
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 end = datetime.now()
82 receivedQuery.id = query.id
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85 self.assertTrue((end - begin) > timedelta(0, 1))
86
87 # BADKEY (17, an ERCode) over UDP
88 response.set_rcode(17)
89 begin = datetime.now()
90 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
91 end = datetime.now()
92 receivedQuery.id = query.id
93 self.assertEquals(query, receivedQuery)
94 self.assertEquals(response, receivedResponse)
95 self.assertTrue((end - begin) < timedelta(0, 1))
96
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response.set_rcode(dns.rcode.NOERROR)
99 begin = datetime.now()
100 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
101 end = datetime.now()
102 receivedQuery.id = query.id
103 self.assertEquals(query, receivedQuery)
104 self.assertEquals(response, receivedResponse)
105 self.assertTrue((end - begin) < timedelta(0, 1))
106
107 class TestResponseRuleQNameDropped(DNSDistTest):
108
109 _config_template = """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
112 """
113
114 def testDropped(self):
115 """
116 Responses: Dropped on QName
117
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
120 """
121 name = 'drop.responses.tests.powerdns.com.'
122 query = dns.message.make_query(name, 'A', 'IN')
123 response = dns.message.make_response(query)
124
125 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
126 receivedQuery.id = query.id
127 self.assertEquals(query, receivedQuery)
128 self.assertEquals(receivedResponse, None)
129
130 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
131 receivedQuery.id = query.id
132 self.assertEquals(query, receivedQuery)
133 self.assertEquals(receivedResponse, None)
134
135 def testNotDropped(self):
136 """
137 Responses: NOT Dropped on QName
138
139 Send an A query to "dontdrop.responses.tests.powerdns.com.",
140 check that the response is not dropped.
141 """
142 name = 'dontdrop.responses.tests.powerdns.com.'
143 query = dns.message.make_query(name, 'A', 'IN')
144 response = dns.message.make_response(query)
145
146 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
147 receivedQuery.id = query.id
148 self.assertEquals(query, receivedQuery)
149 self.assertEquals(response, receivedResponse)
150
151 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
152 receivedQuery.id = query.id
153 self.assertEquals(query, receivedQuery)
154 self.assertEquals(response, receivedResponse)
155
156 class TestResponseRuleQNameAllowed(DNSDistTest):
157
158 _config_template = """
159 newServer{address="127.0.0.1:%s"}
160 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
161 addResponseAction(AllRule(), DropResponseAction())
162 """
163
164 def testAllowed(self):
165 """
166 Responses: Allowed on QName
167
168 Send an A query to "allow.responses.tests.powerdns.com.",
169 check that the response is allowed.
170 """
171 name = 'allow.responses.tests.powerdns.com.'
172 query = dns.message.make_query(name, 'A', 'IN')
173 response = dns.message.make_response(query)
174
175 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
176 receivedQuery.id = query.id
177 self.assertEquals(query, receivedQuery)
178 self.assertEquals(response, receivedResponse)
179
180 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
181 receivedQuery.id = query.id
182 self.assertEquals(query, receivedQuery)
183 self.assertEquals(response, receivedResponse)
184
185 def testNotAllowed(self):
186 """
187 Responses: Not allowed on QName
188
189 Send an A query to "dontallow.responses.tests.powerdns.com.",
190 check that the response is dropped.
191 """
192 name = 'dontallow.responses.tests.powerdns.com.'
193 query = dns.message.make_query(name, 'A', 'IN')
194 response = dns.message.make_response(query)
195
196 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
197 receivedQuery.id = query.id
198 self.assertEquals(query, receivedQuery)
199 self.assertEquals(receivedResponse, None)
200
201 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
202 receivedQuery.id = query.id
203 self.assertEquals(query, receivedQuery)
204 self.assertEquals(receivedResponse, None)
205
206 class TestResponseRuleEditTTL(DNSDistTest):
207
208 _ttl = 5
209 _config_params = ['_testServerPort', '_ttl']
210 _config_template = """
211 newServer{address="127.0.0.1:%s"}
212
213 function editTTLCallback(section, class, type, ttl)
214 return %d
215 end
216
217 function editTTLFunc(dr)
218 dr:editTTLs(editTTLCallback)
219 return DNSAction.None, ""
220 end
221
222 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
223 """
224
225 def testTTLEdited(self):
226 """
227 Responses: Alter the TTLs
228 """
229 name = 'editttl.responses.tests.powerdns.com.'
230 query = dns.message.make_query(name, 'A', 'IN')
231 response = dns.message.make_response(query)
232 rrset = dns.rrset.from_text(name,
233 3600,
234 dns.rdataclass.IN,
235 dns.rdatatype.A,
236 '192.0.2.1')
237 response.answer.append(rrset)
238
239 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
240 receivedQuery.id = query.id
241 self.assertEquals(query, receivedQuery)
242 self.assertEquals(response, receivedResponse)
243 self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl)
244 self.assertEquals(receivedResponse.answer[0].ttl, self._ttl)
245
246 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
247 receivedQuery.id = query.id
248 self.assertEquals(query, receivedQuery)
249 self.assertEquals(response, receivedResponse)
250 self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl)
251 self.assertEquals(receivedResponse.answer[0].ttl, self._ttl)
252
253 class TestResponseLuaActionReturnSyntax(DNSDistTest):
254
255 _config_template = """
256 newServer{address="127.0.0.1:%s"}
257 function customDelay(dr)
258 return DNSResponseAction.Delay, "1000"
259 end
260 function customDrop(dr)
261 return DNSResponseAction.Drop
262 end
263 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
264 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), LuaResponseAction(customDelay))
265 """
266
267 def testResponseActionDelayed(self):
268 """
269 Responses: Delayed via LuaResponseAction
270
271 Send an A query to "delayed.responses.tests.powerdns.com.",
272 check that the response delay is longer than 1000 ms
273 for a NXDomain response over UDP, shorter for a NoError one.
274 """
275 name = 'delayed.responses.tests.powerdns.com.'
276 query = dns.message.make_query(name, 'A', 'IN')
277 response = dns.message.make_response(query)
278
279 # NX over UDP
280 response.set_rcode(dns.rcode.NXDOMAIN)
281 begin = datetime.now()
282 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
283 end = datetime.now()
284 receivedQuery.id = query.id
285 self.assertEquals(query, receivedQuery)
286 self.assertEquals(response, receivedResponse)
287 self.assertTrue((end - begin) > timedelta(0, 1))
288
289 def testDropped(self):
290 """
291 Responses: Dropped via user defined LuaResponseAction
292
293 Send an A query to "drop.responses.tests.powerdns.com.",
294 check that the response (not the query) is dropped.
295 """
296 name = 'drop.responses.tests.powerdns.com.'
297 query = dns.message.make_query(name, 'A', 'IN')
298 response = dns.message.make_response(query)
299
300 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
301 receivedQuery.id = query.id
302 self.assertEquals(query, receivedQuery)
303 self.assertEquals(receivedResponse, None)
304
305 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
306 receivedQuery.id = query.id
307 self.assertEquals(query, receivedQuery)
308 self.assertEquals(receivedResponse, None)