]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
2 from datetime
import datetime
, timedelta
5 from dnsdisttests
import DNSDistTest
7 class TestResponseRuleNXDelayed(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
14 def testNXDelayed(self
):
16 Responses: Delayed on NXDomain
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
22 name
= 'delayed.responses.tests.powerdns.com.'
23 query
= dns
.message
.make_query(name
, 'A', 'IN')
24 response
= dns
.message
.make_response(query
)
27 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
28 begin
= datetime
.now()
29 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
31 receivedQuery
.id = query
.id
32 self
.assertEqual(query
, receivedQuery
)
33 self
.assertEqual(response
, receivedResponse
)
34 self
.assertTrue((end
- begin
) > timedelta(0, 1))
37 response
.set_rcode(dns
.rcode
.NOERROR
)
38 begin
= datetime
.now()
39 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
41 receivedQuery
.id = query
.id
42 self
.assertEqual(query
, receivedQuery
)
43 self
.assertEqual(response
, receivedResponse
)
44 self
.assertTrue((end
- begin
) < timedelta(0, 1))
47 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
48 begin
= datetime
.now()
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 receivedQuery
.id = query
.id
52 self
.assertEqual(query
, receivedQuery
)
53 self
.assertEqual(response
, receivedResponse
)
54 self
.assertTrue((end
- begin
) < timedelta(0, 1))
56 class TestResponseRuleERCode(DNSDistTest
):
58 _config_template
= """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
63 def testBADVERSDelayed(self
):
65 Responses: Delayed on BADVERS
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
71 name
= 'delayed.responses.tests.powerdns.com.'
72 query
= dns
.message
.make_query(name
, 'A', 'IN')
73 response
= dns
.message
.make_response(query
)
74 response
.use_edns(edns
=True)
77 # BADVERS == 16, so rcode==0, ercode==1
78 response
.set_rcode(dns
.rcode
.BADVERS
)
79 begin
= datetime
.now()
80 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
82 receivedQuery
.id = query
.id
83 self
.assertEqual(query
, receivedQuery
)
84 self
.assertEqual(response
, receivedResponse
)
85 self
.assertTrue((end
- begin
) > timedelta(0, 1))
87 # BADKEY (17, an ERCode) over UDP
88 response
.set_rcode(17)
89 begin
= datetime
.now()
90 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
92 receivedQuery
.id = query
.id
93 self
.assertEqual(query
, receivedQuery
)
94 self
.assertEqual(response
, receivedResponse
)
95 self
.assertTrue((end
- begin
) < timedelta(0, 1))
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response
.set_rcode(dns
.rcode
.NOERROR
)
99 begin
= datetime
.now()
100 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
102 receivedQuery
.id = query
.id
103 self
.assertEqual(query
, receivedQuery
)
104 self
.assertEqual(response
, receivedResponse
)
105 self
.assertTrue((end
- begin
) < timedelta(0, 1))
107 class TestResponseRuleQNameDropped(DNSDistTest
):
109 _config_template
= """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
114 def testDropped(self
):
116 Responses: Dropped on QName
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
121 name
= 'drop.responses.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'A', 'IN')
123 response
= dns
.message
.make_response(query
)
125 for method
in ("sendUDPQuery", "sendTCPQuery"):
126 sender
= getattr(self
, method
)
127 (receivedQuery
, receivedResponse
) = sender(query
, response
)
128 receivedQuery
.id = query
.id
129 self
.assertEqual(query
, receivedQuery
)
130 self
.assertEqual(receivedResponse
, None)
132 def testNotDropped(self
):
134 Responses: NOT Dropped on QName
136 Send an A query to "dontdrop.responses.tests.powerdns.com.",
137 check that the response is not dropped.
139 name
= 'dontdrop.responses.tests.powerdns.com.'
140 query
= dns
.message
.make_query(name
, 'A', 'IN')
141 response
= dns
.message
.make_response(query
)
143 for method
in ("sendUDPQuery", "sendTCPQuery"):
144 sender
= getattr(self
, method
)
145 (receivedQuery
, receivedResponse
) = sender(query
, response
)
146 receivedQuery
.id = query
.id
147 self
.assertEqual(query
, receivedQuery
)
148 self
.assertEqual(response
, receivedResponse
)
150 class TestResponseRuleQNameAllowed(DNSDistTest
):
152 _config_template
= """
153 newServer{address="127.0.0.1:%s"}
154 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
155 addResponseAction(AllRule(), DropResponseAction())
158 def testAllowed(self
):
160 Responses: Allowed on QName
162 Send an A query to "allow.responses.tests.powerdns.com.",
163 check that the response is allowed.
165 name
= 'allow.responses.tests.powerdns.com.'
166 query
= dns
.message
.make_query(name
, 'A', 'IN')
167 response
= dns
.message
.make_response(query
)
169 for method
in ("sendUDPQuery", "sendTCPQuery"):
170 sender
= getattr(self
, method
)
171 (receivedQuery
, receivedResponse
) = sender(query
, response
)
172 receivedQuery
.id = query
.id
173 self
.assertEqual(query
, receivedQuery
)
174 self
.assertEqual(response
, receivedResponse
)
176 def testNotAllowed(self
):
178 Responses: Not allowed on QName
180 Send an A query to "dontallow.responses.tests.powerdns.com.",
181 check that the response is dropped.
183 name
= 'dontallow.responses.tests.powerdns.com.'
184 query
= dns
.message
.make_query(name
, 'A', 'IN')
185 response
= dns
.message
.make_response(query
)
187 for method
in ("sendUDPQuery", "sendTCPQuery"):
188 sender
= getattr(self
, method
)
189 (receivedQuery
, receivedResponse
) = sender(query
, response
)
190 receivedQuery
.id = query
.id
191 self
.assertEqual(query
, receivedQuery
)
192 self
.assertEqual(receivedResponse
, None)
194 class TestResponseRuleEditTTL(DNSDistTest
):
197 _config_params
= ['_testServerPort', '_ttl']
198 _config_template
= """
199 newServer{address="127.0.0.1:%s"}
201 function editTTLCallback(section, class, type, ttl)
205 function editTTLFunc(dr)
206 dr:editTTLs(editTTLCallback)
207 return DNSAction.None, ""
210 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
213 def testTTLEdited(self
):
215 Responses: Alter the TTLs
217 name
= 'editttl.responses.tests.powerdns.com.'
218 query
= dns
.message
.make_query(name
, 'A', 'IN')
219 response
= dns
.message
.make_response(query
)
220 rrset
= dns
.rrset
.from_text(name
,
225 response
.answer
.append(rrset
)
227 for method
in ("sendUDPQuery", "sendTCPQuery"):
228 sender
= getattr(self
, method
)
229 (receivedQuery
, receivedResponse
) = sender(query
, response
)
230 receivedQuery
.id = query
.id
231 self
.assertEqual(query
, receivedQuery
)
232 self
.assertEqual(response
, receivedResponse
)
233 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
234 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._ttl
)
236 class TestResponseRuleLimitTTL(DNSDistTest
):
241 _config_params
= ['_lowttl', '_highttl', '_testServerPort']
242 _config_template
= """
243 local ffi = require("ffi")
247 function luaFFISetMinTTL(dr)
248 ffi.C.dnsdist_ffi_dnsresponse_set_min_ttl(dr, highttl)
249 return DNSResponseAction.None, ""
251 function luaFFISetMaxTTL(dr)
252 ffi.C.dnsdist_ffi_dnsresponse_set_max_ttl(dr, lowttl)
253 return DNSResponseAction.None, ""
256 newServer{address="127.0.0.1:%s"}
258 addResponseAction("min.responses.tests.powerdns.com.", SetMinTTLResponseAction(highttl))
259 addResponseAction("max.responses.tests.powerdns.com.", SetMaxTTLResponseAction(lowttl))
260 addResponseAction("ffi.min.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMinTTL))
261 addResponseAction("ffi.max.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMaxTTL))
264 def testLimitTTL(self
):
266 Responses: Alter the TTLs via Limiter
268 name
= 'min.responses.tests.powerdns.com.'
269 query
= dns
.message
.make_query(name
, 'A', 'IN')
270 response
= dns
.message
.make_response(query
)
271 rrset
= dns
.rrset
.from_text(name
,
276 response
.answer
.append(rrset
)
278 for method
in ("sendUDPQuery", "sendTCPQuery"):
279 sender
= getattr(self
, method
)
280 (receivedQuery
, receivedResponse
) = sender(query
, response
)
281 receivedQuery
.id = query
.id
282 self
.assertEqual(query
, receivedQuery
)
283 self
.assertEqual(response
, receivedResponse
)
284 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
285 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
287 name
= 'max.responses.tests.powerdns.com.'
288 query
= dns
.message
.make_query(name
, 'A', 'IN')
289 response
= dns
.message
.make_response(query
)
290 rrset
= dns
.rrset
.from_text(name
,
295 response
.answer
.append(rrset
)
297 for method
in ("sendUDPQuery", "sendTCPQuery"):
298 sender
= getattr(self
, method
)
299 (receivedQuery
, receivedResponse
) = sender(query
, response
)
300 receivedQuery
.id = query
.id
301 self
.assertEqual(query
, receivedQuery
)
302 self
.assertEqual(response
, receivedResponse
)
303 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
304 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
306 def testLimitTTLFFI(self
):
308 Responses: Alter the TTLs via Limiter
310 name
= 'ffi.min.responses.tests.powerdns.com.'
311 query
= dns
.message
.make_query(name
, 'A', 'IN')
312 response
= dns
.message
.make_response(query
)
313 rrset
= dns
.rrset
.from_text(name
,
318 response
.answer
.append(rrset
)
320 for method
in ("sendUDPQuery", "sendTCPQuery"):
321 sender
= getattr(self
, method
)
322 (receivedQuery
, receivedResponse
) = sender(query
, response
)
323 receivedQuery
.id = query
.id
324 self
.assertEqual(query
, receivedQuery
)
325 self
.assertEqual(response
, receivedResponse
)
326 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
327 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
329 name
= 'ffi.max.responses.tests.powerdns.com.'
330 query
= dns
.message
.make_query(name
, 'A', 'IN')
331 response
= dns
.message
.make_response(query
)
332 rrset
= dns
.rrset
.from_text(name
,
337 response
.answer
.append(rrset
)
339 for method
in ("sendUDPQuery", "sendTCPQuery"):
340 sender
= getattr(self
, method
)
341 (receivedQuery
, receivedResponse
) = sender(query
, response
)
342 receivedQuery
.id = query
.id
343 self
.assertEqual(query
, receivedQuery
)
344 self
.assertEqual(response
, receivedResponse
)
345 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
346 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
348 class TestResponseLuaActionReturnSyntax(DNSDistTest
):
350 _config_template
= """
351 newServer{address="127.0.0.1:%s"}
352 function customDelay(dr)
353 return DNSResponseAction.Delay, "1000"
355 function customDrop(dr)
356 return DNSResponseAction.Drop
358 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
359 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
362 def testResponseActionDelayed(self
):
364 Responses: Delayed via LuaResponseAction
366 Send an A query to "delayed.responses.tests.powerdns.com.",
367 check that the response delay is longer than 1000 ms
368 for a NXDomain response over UDP, shorter for a NoError one.
370 name
= 'delayed.responses.tests.powerdns.com.'
371 query
= dns
.message
.make_query(name
, 'A', 'IN')
372 response
= dns
.message
.make_response(query
)
375 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
376 begin
= datetime
.now()
377 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
379 receivedQuery
.id = query
.id
380 self
.assertEqual(query
, receivedQuery
)
381 self
.assertEqual(response
, receivedResponse
)
382 self
.assertTrue((end
- begin
) > timedelta(0, 1))
384 def testDropped(self
):
386 Responses: Dropped via user defined LuaResponseAction
388 Send an A query to "drop.responses.tests.powerdns.com.",
389 check that the response (not the query) is dropped.
391 name
= 'drop.responses.tests.powerdns.com.'
392 query
= dns
.message
.make_query(name
, 'A', 'IN')
393 response
= dns
.message
.make_response(query
)
395 for method
in ("sendUDPQuery", "sendTCPQuery"):
396 sender
= getattr(self
, method
)
397 (receivedQuery
, receivedResponse
) = sender(query
, response
)
398 receivedQuery
.id = query
.id
399 self
.assertEqual(query
, receivedQuery
)
400 self
.assertEqual(receivedResponse
, None)
402 class TestResponseClearRecordsType(DNSDistTest
):
404 _config_params
= ['_testServerPort']
405 _config_template
= """
406 local ffi = require("ffi")
409 ffi.C.dnsdist_ffi_dnsresponse_clear_records_type(dr, DNSQType.AAAA)
410 return DNSResponseAction.HeaderModify, ""
413 newServer{address="127.0.0.1:%s"}
415 addResponseAction("ffi.clear-records-type.responses.tests.powerdns.com.", LuaFFIResponseAction(luafct))
416 addResponseAction("clear-records-type.responses.tests.powerdns.com.", ClearRecordTypesResponseAction(DNSQType.AAAA))
419 def testClearedFFI(self
):
421 Responses: Removes records of a given type (FFI API)
423 name
= 'ffi.clear-records-type.responses.tests.powerdns.com.'
424 query
= dns
.message
.make_query(name
, 'A', 'IN')
425 response
= dns
.message
.make_response(query
)
426 expectedResponse
= dns
.message
.make_response(query
)
427 rrset
= dns
.rrset
.from_text(name
,
432 response
.answer
.append(rrset
)
433 expectedResponse
.answer
.append(rrset
)
434 rrset
= dns
.rrset
.from_text(name
,
438 '2001:DB8::1', '2001:DB8::2')
439 response
.answer
.append(rrset
)
440 for method
in ("sendUDPQuery", "sendTCPQuery"):
441 sender
= getattr(self
, method
)
442 (receivedQuery
, receivedResponse
) = sender(query
, response
)
443 receivedQuery
.id = query
.id
444 self
.assertEqual(query
, receivedQuery
)
445 self
.assertEqual(expectedResponse
, receivedResponse
)
447 def testCleared(self
):
449 Responses: Removes records of a given type
451 name
= 'clear-records-type.responses.tests.powerdns.com.'
452 query
= dns
.message
.make_query(name
, 'A', 'IN')
453 response
= dns
.message
.make_response(query
)
454 expectedResponse
= dns
.message
.make_response(query
)
455 rrset
= dns
.rrset
.from_text(name
,
460 response
.answer
.append(rrset
)
461 expectedResponse
.answer
.append(rrset
)
462 rrset
= dns
.rrset
.from_text(name
,
466 '2001:DB8::1', '2001:DB8::2')
467 response
.answer
.append(rrset
)
468 for method
in ("sendUDPQuery", "sendTCPQuery"):
469 sender
= getattr(self
, method
)
470 (receivedQuery
, receivedResponse
) = sender(query
, response
)
471 receivedQuery
.id = query
.id
472 self
.assertEqual(query
, receivedQuery
)
473 self
.assertEqual(expectedResponse
, receivedResponse
)