]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
Some tests need a bit of extra delay.
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Responses.py
1 #!/usr/bin/env python
2 from datetime import datetime, timedelta
3 import time
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestResponseRuleNXDelayed(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
12 """
13
14 def testNXDelayed(self):
15 """
16 Responses: Delayed on NXDomain
17
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
21 """
22 name = 'delayed.responses.tests.powerdns.com.'
23 query = dns.message.make_query(name, 'A', 'IN')
24 response = dns.message.make_response(query)
25
26 # NX over UDP
27 response.set_rcode(dns.rcode.NXDOMAIN)
28 begin = datetime.now()
29 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
30 end = datetime.now()
31 receivedQuery.id = query.id
32 self.assertEqual(query, receivedQuery)
33 self.assertEqual(response, receivedResponse)
34 self.assertTrue((end - begin) > timedelta(0, 1))
35
36 # NoError over UDP
37 response.set_rcode(dns.rcode.NOERROR)
38 begin = datetime.now()
39 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
40 end = datetime.now()
41 receivedQuery.id = query.id
42 self.assertEqual(query, receivedQuery)
43 self.assertEqual(response, receivedResponse)
44 self.assertTrue((end - begin) < timedelta(0, 1))
45
46 # NX over TCP
47 response.set_rcode(dns.rcode.NXDOMAIN)
48 begin = datetime.now()
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 end = datetime.now()
51 receivedQuery.id = query.id
52 self.assertEqual(query, receivedQuery)
53 self.assertEqual(response, receivedResponse)
54 self.assertTrue((end - begin) < timedelta(0, 1))
55
56 class TestResponseRuleERCode(DNSDistTest):
57
58 _extraStartupSleep = 1
59 _config_template = """
60 newServer{address="127.0.0.1:%s"}
61 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
62 """
63
64 def testBADVERSDelayed(self):
65 """
66 Responses: Delayed on BADVERS
67
68 Send an A query to "delayed.responses.tests.powerdns.com.",
69 check that the response delay is longer than 1000 ms
70 for a BADVERS response over UDP, shorter for BADKEY and NoError.
71 """
72 name = 'delayed.responses.tests.powerdns.com.'
73 query = dns.message.make_query(name, 'A', 'IN')
74 response = dns.message.make_response(query)
75 response.use_edns(edns=True)
76
77 # BADVERS over UDP
78 # BADVERS == 16, so rcode==0, ercode==1
79 response.set_rcode(dns.rcode.BADVERS)
80 begin = datetime.now()
81 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
82 end = datetime.now()
83 receivedQuery.id = query.id
84 self.assertEqual(query, receivedQuery)
85 self.assertEqual(response, receivedResponse)
86 self.assertTrue((end - begin) > timedelta(0, 1))
87
88 # BADKEY (17, an ERCode) over UDP
89 response.set_rcode(17)
90 begin = datetime.now()
91 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
92 end = datetime.now()
93 receivedQuery.id = query.id
94 self.assertEqual(query, receivedQuery)
95 self.assertEqual(response, receivedResponse)
96 self.assertTrue((end - begin) < timedelta(0, 1))
97
98 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
99 response.set_rcode(dns.rcode.NOERROR)
100 begin = datetime.now()
101 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
102 end = datetime.now()
103 receivedQuery.id = query.id
104 self.assertEqual(query, receivedQuery)
105 self.assertEqual(response, receivedResponse)
106 self.assertTrue((end - begin) < timedelta(0, 1))
107
108 class TestResponseRuleQNameDropped(DNSDistTest):
109
110 _config_template = """
111 newServer{address="127.0.0.1:%s"}
112 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
113 """
114
115 def testDropped(self):
116 """
117 Responses: Dropped on QName
118
119 Send an A query to "drop.responses.tests.powerdns.com.",
120 check that the response (not the query) is dropped.
121 """
122 name = 'drop.responses.tests.powerdns.com.'
123 query = dns.message.make_query(name, 'A', 'IN')
124 response = dns.message.make_response(query)
125
126 for method in ("sendUDPQuery", "sendTCPQuery"):
127 sender = getattr(self, method)
128 (receivedQuery, receivedResponse) = sender(query, response)
129 receivedQuery.id = query.id
130 self.assertEqual(query, receivedQuery)
131 self.assertEqual(receivedResponse, None)
132
133 def testNotDropped(self):
134 """
135 Responses: NOT Dropped on QName
136
137 Send an A query to "dontdrop.responses.tests.powerdns.com.",
138 check that the response is not dropped.
139 """
140 name = 'dontdrop.responses.tests.powerdns.com.'
141 query = dns.message.make_query(name, 'A', 'IN')
142 response = dns.message.make_response(query)
143
144 for method in ("sendUDPQuery", "sendTCPQuery"):
145 sender = getattr(self, method)
146 (receivedQuery, receivedResponse) = sender(query, response)
147 receivedQuery.id = query.id
148 self.assertEqual(query, receivedQuery)
149 self.assertEqual(response, receivedResponse)
150
151 class TestResponseRuleQNameAllowed(DNSDistTest):
152
153 _config_template = """
154 newServer{address="127.0.0.1:%s"}
155 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
156 addResponseAction(AllRule(), DropResponseAction())
157 """
158
159 def testAllowed(self):
160 """
161 Responses: Allowed on QName
162
163 Send an A query to "allow.responses.tests.powerdns.com.",
164 check that the response is allowed.
165 """
166 name = 'allow.responses.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN')
168 response = dns.message.make_response(query)
169
170 for method in ("sendUDPQuery", "sendTCPQuery"):
171 sender = getattr(self, method)
172 (receivedQuery, receivedResponse) = sender(query, response)
173 receivedQuery.id = query.id
174 self.assertEqual(query, receivedQuery)
175 self.assertEqual(response, receivedResponse)
176
177 def testNotAllowed(self):
178 """
179 Responses: Not allowed on QName
180
181 Send an A query to "dontallow.responses.tests.powerdns.com.",
182 check that the response is dropped.
183 """
184 name = 'dontallow.responses.tests.powerdns.com.'
185 query = dns.message.make_query(name, 'A', 'IN')
186 response = dns.message.make_response(query)
187
188 for method in ("sendUDPQuery", "sendTCPQuery"):
189 sender = getattr(self, method)
190 (receivedQuery, receivedResponse) = sender(query, response)
191 receivedQuery.id = query.id
192 self.assertEqual(query, receivedQuery)
193 self.assertEqual(receivedResponse, None)
194
195 class TestResponseRuleEditTTL(DNSDistTest):
196
197 _ttl = 5
198 _config_params = ['_testServerPort', '_ttl']
199 _config_template = """
200 newServer{address="127.0.0.1:%s"}
201
202 function editTTLCallback(section, class, type, ttl)
203 return %d
204 end
205
206 function editTTLFunc(dr)
207 dr:editTTLs(editTTLCallback)
208 return DNSAction.None, ""
209 end
210
211 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
212 """
213
214 def testTTLEdited(self):
215 """
216 Responses: Alter the TTLs
217 """
218 name = 'editttl.responses.tests.powerdns.com.'
219 query = dns.message.make_query(name, 'A', 'IN')
220 response = dns.message.make_response(query)
221 rrset = dns.rrset.from_text(name,
222 3600,
223 dns.rdataclass.IN,
224 dns.rdatatype.A,
225 '192.0.2.1')
226 response.answer.append(rrset)
227
228 for method in ("sendUDPQuery", "sendTCPQuery"):
229 sender = getattr(self, method)
230 (receivedQuery, receivedResponse) = sender(query, response)
231 receivedQuery.id = query.id
232 self.assertEqual(query, receivedQuery)
233 self.assertEqual(response, receivedResponse)
234 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
235 self.assertEqual(receivedResponse.answer[0].ttl, self._ttl)
236
237 class TestResponseRuleLimitTTL(DNSDistTest):
238
239 _lowttl = 60
240 _defaulttl = 3600
241 _highttl = 18000
242 _config_params = ['_lowttl', '_highttl', '_testServerPort']
243 _config_template = """
244 local ffi = require("ffi")
245 local lowttl = %d
246 local highttl = %d
247
248 function luaFFISetMinTTL(dr)
249 ffi.C.dnsdist_ffi_dnsresponse_set_min_ttl(dr, highttl)
250 return DNSResponseAction.None, ""
251 end
252 function luaFFISetMaxTTL(dr)
253 ffi.C.dnsdist_ffi_dnsresponse_set_max_ttl(dr, lowttl)
254 return DNSResponseAction.None, ""
255 end
256
257 newServer{address="127.0.0.1:%s"}
258
259 addResponseAction("min.responses.tests.powerdns.com.", SetMinTTLResponseAction(highttl))
260 addResponseAction("max.responses.tests.powerdns.com.", SetMaxTTLResponseAction(lowttl))
261 addResponseAction("ffi.min.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMinTTL))
262 addResponseAction("ffi.max.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMaxTTL))
263 """
264
265 def testLimitTTL(self):
266 """
267 Responses: Alter the TTLs via Limiter
268 """
269 name = 'min.responses.tests.powerdns.com.'
270 query = dns.message.make_query(name, 'A', 'IN')
271 response = dns.message.make_response(query)
272 rrset = dns.rrset.from_text(name,
273 3600,
274 dns.rdataclass.IN,
275 dns.rdatatype.A,
276 '192.0.2.1')
277 response.answer.append(rrset)
278
279 for method in ("sendUDPQuery", "sendTCPQuery"):
280 sender = getattr(self, method)
281 (receivedQuery, receivedResponse) = sender(query, response)
282 receivedQuery.id = query.id
283 self.assertEqual(query, receivedQuery)
284 self.assertEqual(response, receivedResponse)
285 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
286 self.assertEqual(receivedResponse.answer[0].ttl, self._highttl)
287
288 name = 'max.responses.tests.powerdns.com.'
289 query = dns.message.make_query(name, 'A', 'IN')
290 response = dns.message.make_response(query)
291 rrset = dns.rrset.from_text(name,
292 3600,
293 dns.rdataclass.IN,
294 dns.rdatatype.A,
295 '192.0.2.1')
296 response.answer.append(rrset)
297
298 for method in ("sendUDPQuery", "sendTCPQuery"):
299 sender = getattr(self, method)
300 (receivedQuery, receivedResponse) = sender(query, response)
301 receivedQuery.id = query.id
302 self.assertEqual(query, receivedQuery)
303 self.assertEqual(response, receivedResponse)
304 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
305 self.assertEqual(receivedResponse.answer[0].ttl, self._lowttl)
306
307 def testLimitTTLFFI(self):
308 """
309 Responses: Alter the TTLs via Limiter
310 """
311 name = 'ffi.min.responses.tests.powerdns.com.'
312 query = dns.message.make_query(name, 'A', 'IN')
313 response = dns.message.make_response(query)
314 rrset = dns.rrset.from_text(name,
315 3600,
316 dns.rdataclass.IN,
317 dns.rdatatype.A,
318 '192.0.2.1')
319 response.answer.append(rrset)
320
321 for method in ("sendUDPQuery", "sendTCPQuery"):
322 sender = getattr(self, method)
323 (receivedQuery, receivedResponse) = sender(query, response)
324 receivedQuery.id = query.id
325 self.assertEqual(query, receivedQuery)
326 self.assertEqual(response, receivedResponse)
327 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
328 self.assertEqual(receivedResponse.answer[0].ttl, self._highttl)
329
330 name = 'ffi.max.responses.tests.powerdns.com.'
331 query = dns.message.make_query(name, 'A', 'IN')
332 response = dns.message.make_response(query)
333 rrset = dns.rrset.from_text(name,
334 3600,
335 dns.rdataclass.IN,
336 dns.rdatatype.A,
337 '192.0.2.1')
338 response.answer.append(rrset)
339
340 for method in ("sendUDPQuery", "sendTCPQuery"):
341 sender = getattr(self, method)
342 (receivedQuery, receivedResponse) = sender(query, response)
343 receivedQuery.id = query.id
344 self.assertEqual(query, receivedQuery)
345 self.assertEqual(response, receivedResponse)
346 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
347 self.assertEqual(receivedResponse.answer[0].ttl, self._lowttl)
348
349 class TestResponseLuaActionReturnSyntax(DNSDistTest):
350
351 _config_template = """
352 newServer{address="127.0.0.1:%s"}
353 function customDelay(dr)
354 return DNSResponseAction.Delay, "1000"
355 end
356 function customDrop(dr)
357 return DNSResponseAction.Drop
358 end
359 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
360 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
361 """
362
363 def testResponseActionDelayed(self):
364 """
365 Responses: Delayed via LuaResponseAction
366
367 Send an A query to "delayed.responses.tests.powerdns.com.",
368 check that the response delay is longer than 1000 ms
369 for a NXDomain response over UDP, shorter for a NoError one.
370 """
371 name = 'delayed.responses.tests.powerdns.com.'
372 query = dns.message.make_query(name, 'A', 'IN')
373 response = dns.message.make_response(query)
374
375 # NX over UDP
376 response.set_rcode(dns.rcode.NXDOMAIN)
377 begin = datetime.now()
378 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
379 end = datetime.now()
380 receivedQuery.id = query.id
381 self.assertEqual(query, receivedQuery)
382 self.assertEqual(response, receivedResponse)
383 self.assertTrue((end - begin) > timedelta(0, 1))
384
385 def testDropped(self):
386 """
387 Responses: Dropped via user defined LuaResponseAction
388
389 Send an A query to "drop.responses.tests.powerdns.com.",
390 check that the response (not the query) is dropped.
391 """
392 name = 'drop.responses.tests.powerdns.com.'
393 query = dns.message.make_query(name, 'A', 'IN')
394 response = dns.message.make_response(query)
395
396 for method in ("sendUDPQuery", "sendTCPQuery"):
397 sender = getattr(self, method)
398 (receivedQuery, receivedResponse) = sender(query, response)
399 receivedQuery.id = query.id
400 self.assertEqual(query, receivedQuery)
401 self.assertEqual(receivedResponse, None)
402
403 class TestResponseClearRecordsType(DNSDistTest):
404
405 _config_params = ['_testServerPort']
406 _config_template = """
407 local ffi = require("ffi")
408
409 function luafct(dr)
410 ffi.C.dnsdist_ffi_dnsresponse_clear_records_type(dr, DNSQType.AAAA)
411 return DNSResponseAction.HeaderModify, ""
412 end
413
414 newServer{address="127.0.0.1:%s"}
415
416 addResponseAction("ffi.clear-records-type.responses.tests.powerdns.com.", LuaFFIResponseAction(luafct))
417 addResponseAction("clear-records-type.responses.tests.powerdns.com.", ClearRecordTypesResponseAction(DNSQType.AAAA))
418 """
419
420 def testClearedFFI(self):
421 """
422 Responses: Removes records of a given type (FFI API)
423 """
424 name = 'ffi.clear-records-type.responses.tests.powerdns.com.'
425 query = dns.message.make_query(name, 'A', 'IN')
426 response = dns.message.make_response(query)
427 expectedResponse = dns.message.make_response(query)
428 rrset = dns.rrset.from_text(name,
429 3600,
430 dns.rdataclass.IN,
431 dns.rdatatype.A,
432 '192.0.2.1')
433 response.answer.append(rrset)
434 expectedResponse.answer.append(rrset)
435 rrset = dns.rrset.from_text(name,
436 3660,
437 dns.rdataclass.IN,
438 dns.rdatatype.AAAA,
439 '2001:DB8::1', '2001:DB8::2')
440 response.answer.append(rrset)
441 for method in ("sendUDPQuery", "sendTCPQuery"):
442 sender = getattr(self, method)
443 (receivedQuery, receivedResponse) = sender(query, response)
444 receivedQuery.id = query.id
445 self.assertEqual(query, receivedQuery)
446 self.assertEqual(expectedResponse, receivedResponse)
447
448 def testCleared(self):
449 """
450 Responses: Removes records of a given type
451 """
452 name = 'clear-records-type.responses.tests.powerdns.com.'
453 query = dns.message.make_query(name, 'A', 'IN')
454 response = dns.message.make_response(query)
455 expectedResponse = dns.message.make_response(query)
456 rrset = dns.rrset.from_text(name,
457 3600,
458 dns.rdataclass.IN,
459 dns.rdatatype.A,
460 '192.0.2.1')
461 response.answer.append(rrset)
462 expectedResponse.answer.append(rrset)
463 rrset = dns.rrset.from_text(name,
464 3660,
465 dns.rdataclass.IN,
466 dns.rdatatype.AAAA,
467 '2001:DB8::1', '2001:DB8::2')
468 response.answer.append(rrset)
469 for method in ("sendUDPQuery", "sendTCPQuery"):
470 sender = getattr(self, method)
471 (receivedQuery, receivedResponse) = sender(query, response)
472 receivedQuery.id = query.id
473 self.assertEqual(query, receivedQuery)
474 self.assertEqual(expectedResponse, receivedResponse)