]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
Merge pull request #11089 from pieterlexis/remote-dnssec-docs
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Responses.py
1 #!/usr/bin/env python
2 from datetime import datetime, timedelta
3 import time
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestResponseRuleNXDelayed(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
12 """
13
14 def testNXDelayed(self):
15 """
16 Responses: Delayed on NXDomain
17
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
21 """
22 name = 'delayed.responses.tests.powerdns.com.'
23 query = dns.message.make_query(name, 'A', 'IN')
24 response = dns.message.make_response(query)
25
26 # NX over UDP
27 response.set_rcode(dns.rcode.NXDOMAIN)
28 begin = datetime.now()
29 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
30 end = datetime.now()
31 receivedQuery.id = query.id
32 self.assertEqual(query, receivedQuery)
33 self.assertEqual(response, receivedResponse)
34 self.assertTrue((end - begin) > timedelta(0, 1))
35
36 # NoError over UDP
37 response.set_rcode(dns.rcode.NOERROR)
38 begin = datetime.now()
39 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
40 end = datetime.now()
41 receivedQuery.id = query.id
42 self.assertEqual(query, receivedQuery)
43 self.assertEqual(response, receivedResponse)
44 self.assertTrue((end - begin) < timedelta(0, 1))
45
46 # NX over TCP
47 response.set_rcode(dns.rcode.NXDOMAIN)
48 begin = datetime.now()
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 end = datetime.now()
51 receivedQuery.id = query.id
52 self.assertEqual(query, receivedQuery)
53 self.assertEqual(response, receivedResponse)
54 self.assertTrue((end - begin) < timedelta(0, 1))
55
56 class TestResponseRuleERCode(DNSDistTest):
57
58 _config_template = """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
61 """
62
63 def testBADVERSDelayed(self):
64 """
65 Responses: Delayed on BADVERS
66
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
70 """
71 name = 'delayed.responses.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'A', 'IN')
73 response = dns.message.make_response(query)
74 response.use_edns(edns=True)
75
76 # BADVERS over UDP
77 # BADVERS == 16, so rcode==0, ercode==1
78 response.set_rcode(dns.rcode.BADVERS)
79 begin = datetime.now()
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 end = datetime.now()
82 receivedQuery.id = query.id
83 self.assertEqual(query, receivedQuery)
84 self.assertEqual(response, receivedResponse)
85 self.assertTrue((end - begin) > timedelta(0, 1))
86
87 # BADKEY (17, an ERCode) over UDP
88 response.set_rcode(17)
89 begin = datetime.now()
90 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
91 end = datetime.now()
92 receivedQuery.id = query.id
93 self.assertEqual(query, receivedQuery)
94 self.assertEqual(response, receivedResponse)
95 self.assertTrue((end - begin) < timedelta(0, 1))
96
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response.set_rcode(dns.rcode.NOERROR)
99 begin = datetime.now()
100 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
101 end = datetime.now()
102 receivedQuery.id = query.id
103 self.assertEqual(query, receivedQuery)
104 self.assertEqual(response, receivedResponse)
105 self.assertTrue((end - begin) < timedelta(0, 1))
106
107 class TestResponseRuleQNameDropped(DNSDistTest):
108
109 _config_template = """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
112 """
113
114 def testDropped(self):
115 """
116 Responses: Dropped on QName
117
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
120 """
121 name = 'drop.responses.tests.powerdns.com.'
122 query = dns.message.make_query(name, 'A', 'IN')
123 response = dns.message.make_response(query)
124
125 for method in ("sendUDPQuery", "sendTCPQuery"):
126 sender = getattr(self, method)
127 (receivedQuery, receivedResponse) = sender(query, response)
128 receivedQuery.id = query.id
129 self.assertEqual(query, receivedQuery)
130 self.assertEqual(receivedResponse, None)
131
132 def testNotDropped(self):
133 """
134 Responses: NOT Dropped on QName
135
136 Send an A query to "dontdrop.responses.tests.powerdns.com.",
137 check that the response is not dropped.
138 """
139 name = 'dontdrop.responses.tests.powerdns.com.'
140 query = dns.message.make_query(name, 'A', 'IN')
141 response = dns.message.make_response(query)
142
143 for method in ("sendUDPQuery", "sendTCPQuery"):
144 sender = getattr(self, method)
145 (receivedQuery, receivedResponse) = sender(query, response)
146 receivedQuery.id = query.id
147 self.assertEqual(query, receivedQuery)
148 self.assertEqual(response, receivedResponse)
149
150 class TestResponseRuleQNameAllowed(DNSDistTest):
151
152 _config_template = """
153 newServer{address="127.0.0.1:%s"}
154 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
155 addResponseAction(AllRule(), DropResponseAction())
156 """
157
158 def testAllowed(self):
159 """
160 Responses: Allowed on QName
161
162 Send an A query to "allow.responses.tests.powerdns.com.",
163 check that the response is allowed.
164 """
165 name = 'allow.responses.tests.powerdns.com.'
166 query = dns.message.make_query(name, 'A', 'IN')
167 response = dns.message.make_response(query)
168
169 for method in ("sendUDPQuery", "sendTCPQuery"):
170 sender = getattr(self, method)
171 (receivedQuery, receivedResponse) = sender(query, response)
172 receivedQuery.id = query.id
173 self.assertEqual(query, receivedQuery)
174 self.assertEqual(response, receivedResponse)
175
176 def testNotAllowed(self):
177 """
178 Responses: Not allowed on QName
179
180 Send an A query to "dontallow.responses.tests.powerdns.com.",
181 check that the response is dropped.
182 """
183 name = 'dontallow.responses.tests.powerdns.com.'
184 query = dns.message.make_query(name, 'A', 'IN')
185 response = dns.message.make_response(query)
186
187 for method in ("sendUDPQuery", "sendTCPQuery"):
188 sender = getattr(self, method)
189 (receivedQuery, receivedResponse) = sender(query, response)
190 receivedQuery.id = query.id
191 self.assertEqual(query, receivedQuery)
192 self.assertEqual(receivedResponse, None)
193
194 class TestResponseRuleEditTTL(DNSDistTest):
195
196 _ttl = 5
197 _config_params = ['_testServerPort', '_ttl']
198 _config_template = """
199 newServer{address="127.0.0.1:%s"}
200
201 function editTTLCallback(section, class, type, ttl)
202 return %d
203 end
204
205 function editTTLFunc(dr)
206 dr:editTTLs(editTTLCallback)
207 return DNSAction.None, ""
208 end
209
210 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
211 """
212
213 def testTTLEdited(self):
214 """
215 Responses: Alter the TTLs
216 """
217 name = 'editttl.responses.tests.powerdns.com.'
218 query = dns.message.make_query(name, 'A', 'IN')
219 response = dns.message.make_response(query)
220 rrset = dns.rrset.from_text(name,
221 3600,
222 dns.rdataclass.IN,
223 dns.rdatatype.A,
224 '192.0.2.1')
225 response.answer.append(rrset)
226
227 for method in ("sendUDPQuery", "sendTCPQuery"):
228 sender = getattr(self, method)
229 (receivedQuery, receivedResponse) = sender(query, response)
230 receivedQuery.id = query.id
231 self.assertEqual(query, receivedQuery)
232 self.assertEqual(response, receivedResponse)
233 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
234 self.assertEqual(receivedResponse.answer[0].ttl, self._ttl)
235
236 class TestResponseRuleLimitTTL(DNSDistTest):
237
238 _lowttl = 60
239 _defaulttl = 3600
240 _highttl = 18000
241 _config_params = ['_lowttl', '_highttl', '_testServerPort']
242 _config_template = """
243 local ffi = require("ffi")
244 local lowttl = %d
245 local highttl = %d
246
247 function luaFFISetMinTTL(dr)
248 ffi.C.dnsdist_ffi_dnsresponse_set_min_ttl(dr, highttl)
249 return DNSResponseAction.None, ""
250 end
251 function luaFFISetMaxTTL(dr)
252 ffi.C.dnsdist_ffi_dnsresponse_set_max_ttl(dr, lowttl)
253 return DNSResponseAction.None, ""
254 end
255
256 newServer{address="127.0.0.1:%s"}
257
258 addResponseAction("min.responses.tests.powerdns.com.", SetMinTTLResponseAction(highttl))
259 addResponseAction("max.responses.tests.powerdns.com.", SetMaxTTLResponseAction(lowttl))
260 addResponseAction("ffi.min.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMinTTL))
261 addResponseAction("ffi.max.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMaxTTL))
262 """
263
264 def testLimitTTL(self):
265 """
266 Responses: Alter the TTLs via Limiter
267 """
268 name = 'min.responses.tests.powerdns.com.'
269 query = dns.message.make_query(name, 'A', 'IN')
270 response = dns.message.make_response(query)
271 rrset = dns.rrset.from_text(name,
272 3600,
273 dns.rdataclass.IN,
274 dns.rdatatype.A,
275 '192.0.2.1')
276 response.answer.append(rrset)
277
278 for method in ("sendUDPQuery", "sendTCPQuery"):
279 sender = getattr(self, method)
280 (receivedQuery, receivedResponse) = sender(query, response)
281 receivedQuery.id = query.id
282 self.assertEqual(query, receivedQuery)
283 self.assertEqual(response, receivedResponse)
284 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
285 self.assertEqual(receivedResponse.answer[0].ttl, self._highttl)
286
287 name = 'max.responses.tests.powerdns.com.'
288 query = dns.message.make_query(name, 'A', 'IN')
289 response = dns.message.make_response(query)
290 rrset = dns.rrset.from_text(name,
291 3600,
292 dns.rdataclass.IN,
293 dns.rdatatype.A,
294 '192.0.2.1')
295 response.answer.append(rrset)
296
297 for method in ("sendUDPQuery", "sendTCPQuery"):
298 sender = getattr(self, method)
299 (receivedQuery, receivedResponse) = sender(query, response)
300 receivedQuery.id = query.id
301 self.assertEqual(query, receivedQuery)
302 self.assertEqual(response, receivedResponse)
303 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
304 self.assertEqual(receivedResponse.answer[0].ttl, self._lowttl)
305
306 def testLimitTTLFFI(self):
307 """
308 Responses: Alter the TTLs via Limiter
309 """
310 name = 'ffi.min.responses.tests.powerdns.com.'
311 query = dns.message.make_query(name, 'A', 'IN')
312 response = dns.message.make_response(query)
313 rrset = dns.rrset.from_text(name,
314 3600,
315 dns.rdataclass.IN,
316 dns.rdatatype.A,
317 '192.0.2.1')
318 response.answer.append(rrset)
319
320 for method in ("sendUDPQuery", "sendTCPQuery"):
321 sender = getattr(self, method)
322 (receivedQuery, receivedResponse) = sender(query, response)
323 receivedQuery.id = query.id
324 self.assertEqual(query, receivedQuery)
325 self.assertEqual(response, receivedResponse)
326 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
327 self.assertEqual(receivedResponse.answer[0].ttl, self._highttl)
328
329 name = 'ffi.max.responses.tests.powerdns.com.'
330 query = dns.message.make_query(name, 'A', 'IN')
331 response = dns.message.make_response(query)
332 rrset = dns.rrset.from_text(name,
333 3600,
334 dns.rdataclass.IN,
335 dns.rdatatype.A,
336 '192.0.2.1')
337 response.answer.append(rrset)
338
339 for method in ("sendUDPQuery", "sendTCPQuery"):
340 sender = getattr(self, method)
341 (receivedQuery, receivedResponse) = sender(query, response)
342 receivedQuery.id = query.id
343 self.assertEqual(query, receivedQuery)
344 self.assertEqual(response, receivedResponse)
345 self.assertNotEqual(response.answer[0].ttl, receivedResponse.answer[0].ttl)
346 self.assertEqual(receivedResponse.answer[0].ttl, self._lowttl)
347
348 class TestResponseLuaActionReturnSyntax(DNSDistTest):
349
350 _config_template = """
351 newServer{address="127.0.0.1:%s"}
352 function customDelay(dr)
353 return DNSResponseAction.Delay, "1000"
354 end
355 function customDrop(dr)
356 return DNSResponseAction.Drop
357 end
358 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
359 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
360 """
361
362 def testResponseActionDelayed(self):
363 """
364 Responses: Delayed via LuaResponseAction
365
366 Send an A query to "delayed.responses.tests.powerdns.com.",
367 check that the response delay is longer than 1000 ms
368 for a NXDomain response over UDP, shorter for a NoError one.
369 """
370 name = 'delayed.responses.tests.powerdns.com.'
371 query = dns.message.make_query(name, 'A', 'IN')
372 response = dns.message.make_response(query)
373
374 # NX over UDP
375 response.set_rcode(dns.rcode.NXDOMAIN)
376 begin = datetime.now()
377 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
378 end = datetime.now()
379 receivedQuery.id = query.id
380 self.assertEqual(query, receivedQuery)
381 self.assertEqual(response, receivedResponse)
382 self.assertTrue((end - begin) > timedelta(0, 1))
383
384 def testDropped(self):
385 """
386 Responses: Dropped via user defined LuaResponseAction
387
388 Send an A query to "drop.responses.tests.powerdns.com.",
389 check that the response (not the query) is dropped.
390 """
391 name = 'drop.responses.tests.powerdns.com.'
392 query = dns.message.make_query(name, 'A', 'IN')
393 response = dns.message.make_response(query)
394
395 for method in ("sendUDPQuery", "sendTCPQuery"):
396 sender = getattr(self, method)
397 (receivedQuery, receivedResponse) = sender(query, response)
398 receivedQuery.id = query.id
399 self.assertEqual(query, receivedQuery)
400 self.assertEqual(receivedResponse, None)
401
402 class TestResponseClearRecordsType(DNSDistTest):
403
404 _config_params = ['_testServerPort']
405 _config_template = """
406 local ffi = require("ffi")
407
408 function luafct(dr)
409 ffi.C.dnsdist_ffi_dnsresponse_clear_records_type(dr, DNSQType.AAAA)
410 return DNSResponseAction.HeaderModify, ""
411 end
412
413 newServer{address="127.0.0.1:%s"}
414
415 addResponseAction("ffi.clear-records-type.responses.tests.powerdns.com.", LuaFFIResponseAction(luafct))
416 addResponseAction("clear-records-type.responses.tests.powerdns.com.", ClearRecordTypesResponseAction(DNSQType.AAAA))
417 """
418
419 def testClearedFFI(self):
420 """
421 Responses: Removes records of a given type (FFI API)
422 """
423 name = 'ffi.clear-records-type.responses.tests.powerdns.com.'
424 query = dns.message.make_query(name, 'A', 'IN')
425 response = dns.message.make_response(query)
426 expectedResponse = dns.message.make_response(query)
427 rrset = dns.rrset.from_text(name,
428 3600,
429 dns.rdataclass.IN,
430 dns.rdatatype.A,
431 '192.0.2.1')
432 response.answer.append(rrset)
433 expectedResponse.answer.append(rrset)
434 rrset = dns.rrset.from_text(name,
435 3660,
436 dns.rdataclass.IN,
437 dns.rdatatype.AAAA,
438 '2001:DB8::1', '2001:DB8::2')
439 response.answer.append(rrset)
440 for method in ("sendUDPQuery", "sendTCPQuery"):
441 sender = getattr(self, method)
442 (receivedQuery, receivedResponse) = sender(query, response)
443 receivedQuery.id = query.id
444 self.assertEqual(query, receivedQuery)
445 self.assertEqual(expectedResponse, receivedResponse)
446
447 def testCleared(self):
448 """
449 Responses: Removes records of a given type
450 """
451 name = 'clear-records-type.responses.tests.powerdns.com.'
452 query = dns.message.make_query(name, 'A', 'IN')
453 response = dns.message.make_response(query)
454 expectedResponse = dns.message.make_response(query)
455 rrset = dns.rrset.from_text(name,
456 3600,
457 dns.rdataclass.IN,
458 dns.rdatatype.A,
459 '192.0.2.1')
460 response.answer.append(rrset)
461 expectedResponse.answer.append(rrset)
462 rrset = dns.rrset.from_text(name,
463 3660,
464 dns.rdataclass.IN,
465 dns.rdatatype.AAAA,
466 '2001:DB8::1', '2001:DB8::2')
467 response.answer.append(rrset)
468 for method in ("sendUDPQuery", "sendTCPQuery"):
469 sender = getattr(self, method)
470 (receivedQuery, receivedResponse) = sender(query, response)
471 receivedQuery.id = query.id
472 self.assertEqual(query, receivedQuery)
473 self.assertEqual(expectedResponse, receivedResponse)