]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
2 from datetime
import datetime
, timedelta
5 from dnsdisttests
import DNSDistTest
7 class TestResponseRuleNXDelayed(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
14 def testNXDelayed(self
):
16 Responses: Delayed on NXDomain
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
22 name
= 'delayed.responses.tests.powerdns.com.'
23 query
= dns
.message
.make_query(name
, 'A', 'IN')
24 response
= dns
.message
.make_response(query
)
27 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
28 begin
= datetime
.now()
29 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
31 receivedQuery
.id = query
.id
32 self
.assertEqual(query
, receivedQuery
)
33 self
.assertEqual(response
, receivedResponse
)
34 self
.assertTrue((end
- begin
) > timedelta(0, 1))
37 response
.set_rcode(dns
.rcode
.NOERROR
)
38 begin
= datetime
.now()
39 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
41 receivedQuery
.id = query
.id
42 self
.assertEqual(query
, receivedQuery
)
43 self
.assertEqual(response
, receivedResponse
)
44 self
.assertTrue((end
- begin
) < timedelta(0, 1))
47 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
48 begin
= datetime
.now()
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 receivedQuery
.id = query
.id
52 self
.assertEqual(query
, receivedQuery
)
53 self
.assertEqual(response
, receivedResponse
)
54 self
.assertTrue((end
- begin
) < timedelta(0, 1))
56 class TestResponseRuleERCode(DNSDistTest
):
58 _extraStartupSleep
= 1
59 _config_template
= """
60 newServer{address="127.0.0.1:%s"}
61 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
64 def testBADVERSDelayed(self
):
66 Responses: Delayed on BADVERS
68 Send an A query to "delayed.responses.tests.powerdns.com.",
69 check that the response delay is longer than 1000 ms
70 for a BADVERS response over UDP, shorter for BADKEY and NoError.
72 name
= 'delayed.responses.tests.powerdns.com.'
73 query
= dns
.message
.make_query(name
, 'A', 'IN')
74 response
= dns
.message
.make_response(query
)
75 response
.use_edns(edns
=True)
78 # BADVERS == 16, so rcode==0, ercode==1
79 response
.set_rcode(dns
.rcode
.BADVERS
)
80 begin
= datetime
.now()
81 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
83 receivedQuery
.id = query
.id
84 self
.assertEqual(query
, receivedQuery
)
85 self
.assertEqual(response
, receivedResponse
)
86 self
.assertTrue((end
- begin
) > timedelta(0, 1))
88 # BADKEY (17, an ERCode) over UDP
89 response
.set_rcode(17)
90 begin
= datetime
.now()
91 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
93 receivedQuery
.id = query
.id
94 self
.assertEqual(query
, receivedQuery
)
95 self
.assertEqual(response
, receivedResponse
)
96 self
.assertTrue((end
- begin
) < timedelta(0, 1))
98 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
99 response
.set_rcode(dns
.rcode
.NOERROR
)
100 begin
= datetime
.now()
101 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
103 receivedQuery
.id = query
.id
104 self
.assertEqual(query
, receivedQuery
)
105 self
.assertEqual(response
, receivedResponse
)
106 self
.assertTrue((end
- begin
) < timedelta(0, 1))
108 class TestResponseRuleQNameDropped(DNSDistTest
):
110 _config_template
= """
111 newServer{address="127.0.0.1:%s"}
112 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
115 def testDropped(self
):
117 Responses: Dropped on QName
119 Send an A query to "drop.responses.tests.powerdns.com.",
120 check that the response (not the query) is dropped.
122 name
= 'drop.responses.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN')
124 response
= dns
.message
.make_response(query
)
126 for method
in ("sendUDPQuery", "sendTCPQuery"):
127 sender
= getattr(self
, method
)
128 (receivedQuery
, receivedResponse
) = sender(query
, response
)
129 receivedQuery
.id = query
.id
130 self
.assertEqual(query
, receivedQuery
)
131 self
.assertEqual(receivedResponse
, None)
133 def testNotDropped(self
):
135 Responses: NOT Dropped on QName
137 Send an A query to "dontdrop.responses.tests.powerdns.com.",
138 check that the response is not dropped.
140 name
= 'dontdrop.responses.tests.powerdns.com.'
141 query
= dns
.message
.make_query(name
, 'A', 'IN')
142 response
= dns
.message
.make_response(query
)
144 for method
in ("sendUDPQuery", "sendTCPQuery"):
145 sender
= getattr(self
, method
)
146 (receivedQuery
, receivedResponse
) = sender(query
, response
)
147 receivedQuery
.id = query
.id
148 self
.assertEqual(query
, receivedQuery
)
149 self
.assertEqual(response
, receivedResponse
)
151 class TestResponseRuleQNameAllowed(DNSDistTest
):
153 _config_template
= """
154 newServer{address="127.0.0.1:%s"}
155 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
156 addResponseAction(AllRule(), DropResponseAction())
159 def testAllowed(self
):
161 Responses: Allowed on QName
163 Send an A query to "allow.responses.tests.powerdns.com.",
164 check that the response is allowed.
166 name
= 'allow.responses.tests.powerdns.com.'
167 query
= dns
.message
.make_query(name
, 'A', 'IN')
168 response
= dns
.message
.make_response(query
)
170 for method
in ("sendUDPQuery", "sendTCPQuery"):
171 sender
= getattr(self
, method
)
172 (receivedQuery
, receivedResponse
) = sender(query
, response
)
173 receivedQuery
.id = query
.id
174 self
.assertEqual(query
, receivedQuery
)
175 self
.assertEqual(response
, receivedResponse
)
177 def testNotAllowed(self
):
179 Responses: Not allowed on QName
181 Send an A query to "dontallow.responses.tests.powerdns.com.",
182 check that the response is dropped.
184 name
= 'dontallow.responses.tests.powerdns.com.'
185 query
= dns
.message
.make_query(name
, 'A', 'IN')
186 response
= dns
.message
.make_response(query
)
188 for method
in ("sendUDPQuery", "sendTCPQuery"):
189 sender
= getattr(self
, method
)
190 (receivedQuery
, receivedResponse
) = sender(query
, response
)
191 receivedQuery
.id = query
.id
192 self
.assertEqual(query
, receivedQuery
)
193 self
.assertEqual(receivedResponse
, None)
195 class TestResponseRuleEditTTL(DNSDistTest
):
198 _config_params
= ['_testServerPort', '_ttl']
199 _config_template
= """
200 newServer{address="127.0.0.1:%s"}
202 function editTTLCallback(section, class, type, ttl)
206 function editTTLFunc(dr)
207 dr:editTTLs(editTTLCallback)
208 return DNSAction.None, ""
211 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
214 def testTTLEdited(self
):
216 Responses: Alter the TTLs
218 name
= 'editttl.responses.tests.powerdns.com.'
219 query
= dns
.message
.make_query(name
, 'A', 'IN')
220 response
= dns
.message
.make_response(query
)
221 rrset
= dns
.rrset
.from_text(name
,
226 response
.answer
.append(rrset
)
228 for method
in ("sendUDPQuery", "sendTCPQuery"):
229 sender
= getattr(self
, method
)
230 (receivedQuery
, receivedResponse
) = sender(query
, response
)
231 receivedQuery
.id = query
.id
232 self
.assertEqual(query
, receivedQuery
)
233 self
.assertEqual(response
, receivedResponse
)
234 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
235 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._ttl
)
237 class TestResponseRuleLimitTTL(DNSDistTest
):
242 _config_params
= ['_lowttl', '_highttl', '_testServerPort']
243 _config_template
= """
244 local ffi = require("ffi")
248 function luaFFISetMinTTL(dr)
249 ffi.C.dnsdist_ffi_dnsresponse_set_min_ttl(dr, highttl)
250 return DNSResponseAction.None, ""
252 function luaFFISetMaxTTL(dr)
253 ffi.C.dnsdist_ffi_dnsresponse_set_max_ttl(dr, lowttl)
254 return DNSResponseAction.None, ""
257 newServer{address="127.0.0.1:%s"}
259 addResponseAction("min.responses.tests.powerdns.com.", SetMinTTLResponseAction(highttl))
260 addResponseAction("max.responses.tests.powerdns.com.", SetMaxTTLResponseAction(lowttl))
261 addResponseAction("ffi.min.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMinTTL))
262 addResponseAction("ffi.max.limitttl.responses.tests.powerdns.com.", LuaFFIResponseAction(luaFFISetMaxTTL))
265 def testLimitTTL(self
):
267 Responses: Alter the TTLs via Limiter
269 name
= 'min.responses.tests.powerdns.com.'
270 query
= dns
.message
.make_query(name
, 'A', 'IN')
271 response
= dns
.message
.make_response(query
)
272 rrset
= dns
.rrset
.from_text(name
,
277 response
.answer
.append(rrset
)
279 for method
in ("sendUDPQuery", "sendTCPQuery"):
280 sender
= getattr(self
, method
)
281 (receivedQuery
, receivedResponse
) = sender(query
, response
)
282 receivedQuery
.id = query
.id
283 self
.assertEqual(query
, receivedQuery
)
284 self
.assertEqual(response
, receivedResponse
)
285 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
286 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
288 name
= 'max.responses.tests.powerdns.com.'
289 query
= dns
.message
.make_query(name
, 'A', 'IN')
290 response
= dns
.message
.make_response(query
)
291 rrset
= dns
.rrset
.from_text(name
,
296 response
.answer
.append(rrset
)
298 for method
in ("sendUDPQuery", "sendTCPQuery"):
299 sender
= getattr(self
, method
)
300 (receivedQuery
, receivedResponse
) = sender(query
, response
)
301 receivedQuery
.id = query
.id
302 self
.assertEqual(query
, receivedQuery
)
303 self
.assertEqual(response
, receivedResponse
)
304 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
305 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
307 def testLimitTTLFFI(self
):
309 Responses: Alter the TTLs via Limiter
311 name
= 'ffi.min.responses.tests.powerdns.com.'
312 query
= dns
.message
.make_query(name
, 'A', 'IN')
313 response
= dns
.message
.make_response(query
)
314 rrset
= dns
.rrset
.from_text(name
,
319 response
.answer
.append(rrset
)
321 for method
in ("sendUDPQuery", "sendTCPQuery"):
322 sender
= getattr(self
, method
)
323 (receivedQuery
, receivedResponse
) = sender(query
, response
)
324 receivedQuery
.id = query
.id
325 self
.assertEqual(query
, receivedQuery
)
326 self
.assertEqual(response
, receivedResponse
)
327 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
328 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._highttl
)
330 name
= 'ffi.max.responses.tests.powerdns.com.'
331 query
= dns
.message
.make_query(name
, 'A', 'IN')
332 response
= dns
.message
.make_response(query
)
333 rrset
= dns
.rrset
.from_text(name
,
338 response
.answer
.append(rrset
)
340 for method
in ("sendUDPQuery", "sendTCPQuery"):
341 sender
= getattr(self
, method
)
342 (receivedQuery
, receivedResponse
) = sender(query
, response
)
343 receivedQuery
.id = query
.id
344 self
.assertEqual(query
, receivedQuery
)
345 self
.assertEqual(response
, receivedResponse
)
346 self
.assertNotEqual(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
347 self
.assertEqual(receivedResponse
.answer
[0].ttl
, self
._lowttl
)
349 class TestResponseLuaActionReturnSyntax(DNSDistTest
):
351 _config_template
= """
352 newServer{address="127.0.0.1:%s"}
353 function customDelay(dr)
354 return DNSResponseAction.Delay, "1000"
356 function customDrop(dr)
357 return DNSResponseAction.Drop
359 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
360 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
363 def testResponseActionDelayed(self
):
365 Responses: Delayed via LuaResponseAction
367 Send an A query to "delayed.responses.tests.powerdns.com.",
368 check that the response delay is longer than 1000 ms
369 for a NXDomain response over UDP, shorter for a NoError one.
371 name
= 'delayed.responses.tests.powerdns.com.'
372 query
= dns
.message
.make_query(name
, 'A', 'IN')
373 response
= dns
.message
.make_response(query
)
376 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
377 begin
= datetime
.now()
378 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
380 receivedQuery
.id = query
.id
381 self
.assertEqual(query
, receivedQuery
)
382 self
.assertEqual(response
, receivedResponse
)
383 self
.assertTrue((end
- begin
) > timedelta(0, 1))
385 def testDropped(self
):
387 Responses: Dropped via user defined LuaResponseAction
389 Send an A query to "drop.responses.tests.powerdns.com.",
390 check that the response (not the query) is dropped.
392 name
= 'drop.responses.tests.powerdns.com.'
393 query
= dns
.message
.make_query(name
, 'A', 'IN')
394 response
= dns
.message
.make_response(query
)
396 for method
in ("sendUDPQuery", "sendTCPQuery"):
397 sender
= getattr(self
, method
)
398 (receivedQuery
, receivedResponse
) = sender(query
, response
)
399 receivedQuery
.id = query
.id
400 self
.assertEqual(query
, receivedQuery
)
401 self
.assertEqual(receivedResponse
, None)
403 class TestResponseClearRecordsType(DNSDistTest
):
405 _config_params
= ['_testServerPort']
406 _config_template
= """
407 local ffi = require("ffi")
410 ffi.C.dnsdist_ffi_dnsresponse_clear_records_type(dr, DNSQType.AAAA)
411 return DNSResponseAction.HeaderModify, ""
414 newServer{address="127.0.0.1:%s"}
416 addResponseAction("ffi.clear-records-type.responses.tests.powerdns.com.", LuaFFIResponseAction(luafct))
417 addResponseAction("clear-records-type.responses.tests.powerdns.com.", ClearRecordTypesResponseAction(DNSQType.AAAA))
420 def testClearedFFI(self
):
422 Responses: Removes records of a given type (FFI API)
424 name
= 'ffi.clear-records-type.responses.tests.powerdns.com.'
425 query
= dns
.message
.make_query(name
, 'A', 'IN')
426 response
= dns
.message
.make_response(query
)
427 expectedResponse
= dns
.message
.make_response(query
)
428 rrset
= dns
.rrset
.from_text(name
,
433 response
.answer
.append(rrset
)
434 expectedResponse
.answer
.append(rrset
)
435 rrset
= dns
.rrset
.from_text(name
,
439 '2001:DB8::1', '2001:DB8::2')
440 response
.answer
.append(rrset
)
441 for method
in ("sendUDPQuery", "sendTCPQuery"):
442 sender
= getattr(self
, method
)
443 (receivedQuery
, receivedResponse
) = sender(query
, response
)
444 receivedQuery
.id = query
.id
445 self
.assertEqual(query
, receivedQuery
)
446 self
.assertEqual(expectedResponse
, receivedResponse
)
448 def testCleared(self
):
450 Responses: Removes records of a given type
452 name
= 'clear-records-type.responses.tests.powerdns.com.'
453 query
= dns
.message
.make_query(name
, 'A', 'IN')
454 response
= dns
.message
.make_response(query
)
455 expectedResponse
= dns
.message
.make_response(query
)
456 rrset
= dns
.rrset
.from_text(name
,
461 response
.answer
.append(rrset
)
462 expectedResponse
.answer
.append(rrset
)
463 rrset
= dns
.rrset
.from_text(name
,
467 '2001:DB8::1', '2001:DB8::2')
468 response
.answer
.append(rrset
)
469 for method
in ("sendUDPQuery", "sendTCPQuery"):
470 sender
= getattr(self
, method
)
471 (receivedQuery
, receivedResponse
) = sender(query
, response
)
472 receivedQuery
.id = query
.id
473 self
.assertEqual(query
, receivedQuery
)
474 self
.assertEqual(expectedResponse
, receivedResponse
)