]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
2 from datetime
import datetime
, timedelta
5 from dnsdisttests
import DNSDistTest
7 class TestResponseRuleNXDelayed(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
14 def testNXDelayed(self
):
16 Responses: Delayed on NXDomain
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
22 name
= 'delayed.responses.tests.powerdns.com.'
23 query
= dns
.message
.make_query(name
, 'A', 'IN')
24 response
= dns
.message
.make_response(query
)
27 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
28 begin
= datetime
.now()
29 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
31 receivedQuery
.id = query
.id
32 self
.assertEqual(query
, receivedQuery
)
33 self
.assertEqual(response
, receivedResponse
)
34 self
.assertTrue((end
- begin
) > timedelta(0, 1))
37 response
.set_rcode(dns
.rcode
.NOERROR
)
38 begin
= datetime
.now()
39 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
41 receivedQuery
.id = query
.id
42 self
.assertEqual(query
, receivedQuery
)
43 self
.assertEqual(response
, receivedResponse
)
44 self
.assertTrue((end
- begin
) < timedelta(0, 1))
47 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
48 begin
= datetime
.now()
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 receivedQuery
.id = query
.id
52 self
.assertEqual(query
, receivedQuery
)
53 self
.assertEqual(response
, receivedResponse
)
54 self
.assertTrue((end
- begin
) < timedelta(0, 1))
56 class TestResponseRuleERCode(DNSDistTest
):
58 _extraStartupSleep
= 1
59 _config_template
= """
60 newServer{address="127.0.0.1:%s"}
61 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
64 def testBADVERSDelayed(self
):
66 Responses: Delayed on BADVERS
68 Send an A query to "delayed.responses.tests.powerdns.com.",
69 check that the response delay is longer than 1000 ms
70 for a BADVERS response over UDP, shorter for BADKEY and NoError.
72 name
= 'delayed.responses.tests.powerdns.com.'
73 query
= dns
.message
.make_query(name
, 'A', 'IN')
74 response
= dns
.message
.make_response(query
)
75 response
.use_edns(edns
=True)
78 # BADVERS == 16, so rcode==0, ercode==1
79 response
.set_rcode(dns
.rcode
.BADVERS
)
80 begin
= datetime
.now()
81 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
83 receivedQuery
.id = query
.id
84 self
.assertEqual(query
, receivedQuery
)
85 self
.assertEqual(response
, receivedResponse
)
86 self
.assertTrue((end
- begin
) > timedelta(0, 1))
88 # BADKEY (17, an ERCode) over UDP
89 response
.set_rcode(17)
90 begin
= datetime
.now()
91 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
93 receivedQuery
.id = query
.id
94 self
.assertEqual(query
, receivedQuery
)
95 self
.assertEqual(response
, receivedResponse
)
96 self
.assertTrue((end
- begin
) < timedelta(0, 1))
98 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
99 response
.set_rcode(dns
.rcode
.NOERROR
)
100 begin
= datetime
.now()
101 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
103 receivedQuery
.id = query
.id
104 self
.assertEqual(query
, receivedQuery
)
105 self
.assertEqual(response
, receivedResponse
)
106 self
.assertTrue((end
- begin
) < timedelta(0, 1))
108 class TestResponseRuleQNameDropped(DNSDistTest
):
110 _config_template
= """
111 newServer{address="127.0.0.1:%s"}
112 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
115 def testDropped(self
):
117 Responses: Dropped on QName
119 Send an A query to "drop.responses.tests.powerdns.com.",
120 check that the response (not the query) is dropped.
122 name
= 'drop.responses.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN')
124 response
= dns
.message
.make_response(query
)
126 for method
in ("sendUDPQuery", "sendTCPQuery"):
127 sender
= getattr(self
, method
)
128 (receivedQuery
, receivedResponse
) = sender(query
, response
)
129 receivedQuery
.id = query
.id
130 self
.assertEqual(query
, receivedQuery
)
131 self
.assertEqual(receivedResponse
, None)
133 def testNotDropped(self
):
135 Responses: NOT Dropped on QName
137 Send an A query to "dontdrop.responses.tests.powerdns.com.",
138 check that the response is not dropped.
140 name
= 'dontdrop.responses.tests.powerdns.com.'
141 query
= dns
.message
.make_query(name
, 'A', 'IN')
142 response
= dns
.message
.make_response(query
)
144 for method
in ("sendUDPQuery", "sendTCPQuery"):
145 sender
= getattr(self
, method
)
146 (receivedQuery
, receivedResponse
) = sender(query
, response
)
147 receivedQuery
.id = query
.id
148 self
.assertEqual(query
, receivedQuery
)
149 self
.assertEqual(response
, receivedResponse
)
151 class TestResponseRuleQNameAllowed(DNSDistTest
):
153 _config_template
= """
154 newServer{address="127.0.0.1:%s"}
155 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
156 addResponseAction(AllRule(), DropResponseAction())
159 def testAllowed(self
):
161 Responses: Allowed on QName
163 Send an A query to "allow.responses.tests.powerdns.com.",
164 check that the response is allowed.
166 name
= 'allow.responses.tests.powerdns.com.'
167 query
= dns
.message
.make_query(name
, 'A', 'IN')
168 response
= dns
.message
.make_response(query
)
170 for method
in ("sendUDPQuery", "sendTCPQuery"):
171 sender
= getattr(self
, method
)
172 (receivedQuery
, receivedResponse
) = sender(query
, response
)
173 receivedQuery
.id = query
.id
174 self
.assertEqual(query
, receivedQuery
)
175 self
.assertEqual(response
, receivedResponse
)
177 def testNotAllowed(self
):
179 Responses: Not allowed on QName
181 Send an A query to "dontallow.responses.tests.powerdns.com.",
182 check that the response is dropped.
184 name
= 'dontallow.responses.tests.powerdns.com.'
185 query
= dns
.message
.make_query(name
, 'A', 'IN')
186 response
= dns
.message
.make_response(query
)
188 for method
in ("sendUDPQuery", "sendTCPQuery"):
189 sender
= getattr(self
, method
)
190 (receivedQuery
, receivedResponse
) = sender(query
, response
)
191 receivedQuery
.id = query
.id
192 self
.assertEqual(query
, receivedQuery
)
193 self
.assertEqual(receivedResponse
, None)
195 class TestResponseRuleEditTTL(DNSDistTest
):
198 _config_params
= ['_testServerPort', '_ttl']
199 _config_template
= """
200 newServer{address="127.0.0.1:%s"}
202 function editTTLCallback(section, class, type, ttl)
206 function editTTLFunc(dr)
207 dr:editTTLs(editTTLCallback)
208 return DNSAction.None, ""
211 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
214 def testTTLEdited(self
):
216 Responses: Alter the TTLs
218 name
= 'editttl.responses.tests.powerdns.com.'
219 query
= dns
.message
.make_query(name
, 'A', 'IN')
220 response
= dns
.message
.make_response(query
)
221 rrset
= dns
.rrset
.from_text(name
,
226 response
.answer
.append(rrset
)
228 for method
in ("sendUDPQuery", "sendTCPQuery"):
229 sender
= getattr(self
, method
)
230 (receivedQuery
, receivedResponse
) = sender(query
, response
)
231 receivedQuery
.id = query
.id
232 self
.assertEqual(query
, receivedQuery
)
233 self
.assertEqual(response
, receivedResponse
)
234 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
235 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._ttl
)
237 class TestResponseRuleLimitTTL(DNSDistTest
):
242 _config_params
= ['_lowttl', '_highttl', '_testServerPort']
243 _config_template
= """
244 local ffi = require("ffi")
248 function luaFFISetMinTTL(dr)
249 ffi.C.dnsdist_ffi_dnsresponse_set_min_ttl(dr, highttl)
250 return DNSResponseAction.None, ""
252 function luaFFISetMaxTTL(dr)
253 ffi.C.dnsdist_ffi_dnsresponse_set_max_ttl(dr, lowttl)
254 return DNSResponseAction.None, ""
257 newServer{address="127.0.0.1:%s"}
259 addResponseAction("min.responses.tests.powerdns.com.", SetMinTTLResponseAction(highttl))
260 addResponseAction("max.responses.tests.powerdns.com.", SetMaxTTLResponseAction(lowttl))
261 addResponseAction("ffi.min.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMinTTL))
262 addResponseAction("ffi.max.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMaxTTL))
265 def testLimitTTL(self
):
267 Responses: Alter the TTLs via Limiter
269 name
= 'min.responses.tests.powerdns.com.'
270 query
= dns
.message
.make_query(name
, 'A', 'IN')
271 response
= dns
.message
.make_response(query
)
272 rrset
= dns
.rrset
.from_text(name
,
277 response
.answer
.append(rrset
)
279 for method
in ("sendUDPQuery", "sendTCPQuery"):
280 sender
= getattr(self
, method
)
281 (receivedQuery
, receivedResponse
) = sender(query
, response
)
282 receivedQuery
.id = query
.id
283 self
.assertEqual(query
, receivedQuery
)
284 self
.assertEqual(response
, receivedResponse
)
285 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
286 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
288 name
= 'max.responses.tests.powerdns.com.'
289 query
= dns
.message
.make_query(name
, 'A', 'IN')
290 response
= dns
.message
.make_response(query
)
291 rrset
= dns
.rrset
.from_text(name
,
296 response
.answer
.append(rrset
)
298 for method
in ("sendUDPQuery", "sendTCPQuery"):
299 sender
= getattr(self
, method
)
300 (receivedQuery
, receivedResponse
) = sender(query
, response
)
301 receivedQuery
.id = query
.id
302 self
.assertEqual(query
, receivedQuery
)
303 self
.assertEqual(response
, receivedResponse
)
304 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
305 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
307 def testLimitTTLFFI(self
):
309 Responses: Alter the TTLs via Limiter
311 name
= 'ffi.min.responses.tests.powerdns.com.'
312 query
= dns
.message
.make_query(name
, 'A', 'IN')
313 response
= dns
.message
.make_response(query
)
314 rrset
= dns
.rrset
.from_text(name
,
319 response
.answer
.append(rrset
)
321 for method
in ("sendUDPQuery", "sendTCPQuery"):
322 sender
= getattr(self
, method
)
323 (receivedQuery
, receivedResponse
) = sender(query
, response
)
324 receivedQuery
.id = query
.id
325 self
.assertEqual(query
, receivedQuery
)
326 self
.assertEqual(response
, receivedResponse
)
327 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
328 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
330 name
= 'ffi.max.responses.tests.powerdns.com.'
331 query
= dns
.message
.make_query(name
, 'A', 'IN')
332 response
= dns
.message
.make_response(query
)
333 rrset
= dns
.rrset
.from_text(name
,
338 response
.answer
.append(rrset
)
340 for method
in ("sendUDPQuery", "sendTCPQuery"):
341 sender
= getattr(self
, method
)
342 (receivedQuery
, receivedResponse
) = sender(query
, response
)
343 receivedQuery
.id = query
.id
344 self
.assertEqual(query
, receivedQuery
)
345 self
.assertEqual(response
, receivedResponse
)
346 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
347 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
349 class TestSetReducedTTL(DNSDistTest
):
353 _config_params
= ['_percentage', '_testServerPort']
354 _config_template
= """
355 addResponseAction(AllRule(), SetReducedTTLResponseAction(%d))
356 newServer{address="127.0.0.1:%s"}
359 def testLimitTTL(self
):
361 Responses: Reduce TTL to 42%
363 name
= 'reduced-ttl.responses.tests.powerdns.com.'
364 query
= dns
.message
.make_query(name
, 'A', 'IN')
365 response
= dns
.message
.make_response(query
)
366 rrset
= dns
.rrset
.from_text(name
,
371 response
.answer
.append(rrset
)
373 for method
in ("sendUDPQuery", "sendTCPQuery"):
374 sender
= getattr(self
, method
)
375 (receivedQuery
, receivedResponse
) = sender(query
, response
)
376 receivedQuery
.id = query
.id
377 self
.assertEqual(query
, receivedQuery
)
378 self
.assertEqual(response
, receivedResponse
)
379 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
380 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._percentage
)
382 class TestResponseLuaActionReturnSyntax(DNSDistTest
):
384 _config_template
= """
385 newServer{address="127.0.0.1:%s"}
386 function customDelay(dr)
387 return DNSResponseAction.Delay, "1000"
389 function customDrop(dr)
390 return DNSResponseAction.Drop
392 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
393 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
396 def testResponseActionDelayed(self
):
398 Responses: Delayed via LuaResponseAction
400 Send an A query to "delayed.responses.tests.powerdns.com.",
401 check that the response delay is longer than 1000 ms
402 for a NXDomain response over UDP, shorter for a NoError one.
404 name
= 'delayed.responses.tests.powerdns.com.'
405 query
= dns
.message
.make_query(name
, 'A', 'IN')
406 response
= dns
.message
.make_response(query
)
409 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
410 begin
= datetime
.now()
411 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
413 receivedQuery
.id = query
.id
414 self
.assertEqual(query
, receivedQuery
)
415 self
.assertEqual(response
, receivedResponse
)
416 self
.assertTrue((end
- begin
) > timedelta(0, 1))
418 def testDropped(self
):
420 Responses: Dropped via user defined LuaResponseAction
422 Send an A query to "drop.responses.tests.powerdns.com.",
423 check that the response (not the query) is dropped.
425 name
= 'drop.responses.tests.powerdns.com.'
426 query
= dns
.message
.make_query(name
, 'A', 'IN')
427 response
= dns
.message
.make_response(query
)
429 for method
in ("sendUDPQuery", "sendTCPQuery"):
430 sender
= getattr(self
, method
)
431 (receivedQuery
, receivedResponse
) = sender(query
, response
)
432 receivedQuery
.id = query
.id
433 self
.assertEqual(query
, receivedQuery
)
434 self
.assertEqual(receivedResponse
, None)
436 class TestResponseClearRecordsType(DNSDistTest
):
438 _config_params
= ['_testServerPort']
439 _config_template
= """
440 local ffi = require("ffi")
443 ffi.C.dnsdist_ffi_dnsresponse_clear_records_type(dr, DNSQType.AAAA)
444 return DNSResponseAction.HeaderModify, ""
447 newServer{address="127.0.0.1:%s"}
449 addResponseAction("ffi.clear-records-type.responses.tests.powerdns.com.", LuaFFIResponseAction(luafct))
450 addResponseAction("clear-records-type.responses.tests.powerdns.com.", ClearRecordTypesResponseAction(DNSQType.AAAA))
453 def testClearedFFI(self
):
455 Responses: Removes records of a given type (FFI API)
457 name
= 'ffi.clear-records-type.responses.tests.powerdns.com.'
458 query
= dns
.message
.make_query(name
, 'A', 'IN')
459 response
= dns
.message
.make_response(query
)
460 expectedResponse
= dns
.message
.make_response(query
)
461 rrset
= dns
.rrset
.from_text(name
,
466 response
.answer
.append(rrset
)
467 expectedResponse
.answer
.append(rrset
)
468 rrset
= dns
.rrset
.from_text(name
,
472 '2001:DB8::1', '2001:DB8::2')
473 response
.answer
.append(rrset
)
474 for method
in ("sendUDPQuery", "sendTCPQuery"):
475 sender
= getattr(self
, method
)
476 (receivedQuery
, receivedResponse
) = sender(query
, response
)
477 receivedQuery
.id = query
.id
478 self
.assertEqual(query
, receivedQuery
)
479 self
.assertEqual(expectedResponse
, receivedResponse
)
481 def testCleared(self
):
483 Responses: Removes records of a given type
485 name
= 'clear-records-type.responses.tests.powerdns.com.'
486 query
= dns
.message
.make_query(name
, 'A', 'IN')
487 response
= dns
.message
.make_response(query
)
488 expectedResponse
= dns
.message
.make_response(query
)
489 rrset
= dns
.rrset
.from_text(name
,
494 response
.answer
.append(rrset
)
495 expectedResponse
.answer
.append(rrset
)
496 rrset
= dns
.rrset
.from_text(name
,
500 '2001:DB8::1', '2001:DB8::2')
501 response
.answer
.append(rrset
)
502 for method
in ("sendUDPQuery", "sendTCPQuery"):
503 sender
= getattr(self
, method
)
504 (receivedQuery
, receivedResponse
) = sender(query
, response
)
505 receivedQuery
.id = query
.id
506 self
.assertEqual(query
, receivedQuery
)
507 self
.assertEqual(expectedResponse
, receivedResponse
)