]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
2 from datetime
import datetime
, timedelta
5 from dnsdisttests
import DNSDistTest
7 class TestResponseRuleNXDelayed(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), DelayResponseAction(1000))
14 def testNXDelayed(self
):
16 Responses: Delayed on NXDomain
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
22 name
= 'delayed.responses.tests.powerdns.com.'
23 query
= dns
.message
.make_query(name
, 'A', 'IN')
24 response
= dns
.message
.make_response(query
)
27 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
28 begin
= datetime
.now()
29 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
31 receivedQuery
.id = query
.id
32 self
.assertEquals(query
, receivedQuery
)
33 self
.assertEquals(response
, receivedResponse
)
34 self
.assertTrue((end
- begin
) > timedelta(0, 1))
37 response
.set_rcode(dns
.rcode
.NOERROR
)
38 begin
= datetime
.now()
39 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
41 receivedQuery
.id = query
.id
42 self
.assertEquals(query
, receivedQuery
)
43 self
.assertEquals(response
, receivedResponse
)
44 self
.assertTrue((end
- begin
) < timedelta(0, 1))
47 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
48 begin
= datetime
.now()
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 receivedQuery
.id = query
.id
52 self
.assertEquals(query
, receivedQuery
)
53 self
.assertEquals(response
, receivedResponse
)
54 self
.assertTrue((end
- begin
) < timedelta(0, 1))
56 class TestResponseRuleERCode(DNSDistTest
):
58 _config_template
= """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction(ERCodeRule(dnsdist.BADVERS), DelayResponseAction(1000))
63 def testBADVERSDelayed(self
):
65 Responses: Delayed on BADVERS
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
71 name
= 'delayed.responses.tests.powerdns.com.'
72 query
= dns
.message
.make_query(name
, 'A', 'IN')
73 response
= dns
.message
.make_response(query
)
74 response
.use_edns(edns
=True)
77 # BADVERS == 16, so rcode==0, ercode==1
78 response
.set_rcode(dns
.rcode
.BADVERS
)
79 begin
= datetime
.now()
80 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
82 receivedQuery
.id = query
.id
83 self
.assertEquals(query
, receivedQuery
)
84 self
.assertEquals(response
, receivedResponse
)
85 self
.assertTrue((end
- begin
) > timedelta(0, 1))
87 # BADKEY (17, an ERCode) over UDP
88 response
.set_rcode(17)
89 begin
= datetime
.now()
90 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
92 receivedQuery
.id = query
.id
93 self
.assertEquals(query
, receivedQuery
)
94 self
.assertEquals(response
, receivedResponse
)
95 self
.assertTrue((end
- begin
) < timedelta(0, 1))
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response
.set_rcode(dns
.rcode
.NOERROR
)
99 begin
= datetime
.now()
100 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
102 receivedQuery
.id = query
.id
103 self
.assertEquals(query
, receivedQuery
)
104 self
.assertEquals(response
, receivedResponse
)
105 self
.assertTrue((end
- begin
) < timedelta(0, 1))
107 class TestResponseRuleQNameDropped(DNSDistTest
):
109 _config_template
= """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
114 def testDropped(self
):
116 Responses: Dropped on QName
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
121 name
= 'drop.responses.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'A', 'IN')
123 response
= dns
.message
.make_response(query
)
125 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
126 receivedQuery
.id = query
.id
127 self
.assertEquals(query
, receivedQuery
)
128 self
.assertEquals(receivedResponse
, None)
130 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
131 receivedQuery
.id = query
.id
132 self
.assertEquals(query
, receivedQuery
)
133 self
.assertEquals(receivedResponse
, None)
135 def testNotDropped(self
):
137 Responses: NOT Dropped on QName
139 Send an A query to "dontdrop.responses.tests.powerdns.com.",
140 check that the response is not dropped.
142 name
= 'dontdrop.responses.tests.powerdns.com.'
143 query
= dns
.message
.make_query(name
, 'A', 'IN')
144 response
= dns
.message
.make_response(query
)
146 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
147 receivedQuery
.id = query
.id
148 self
.assertEquals(query
, receivedQuery
)
149 self
.assertEquals(response
, receivedResponse
)
151 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
152 receivedQuery
.id = query
.id
153 self
.assertEquals(query
, receivedQuery
)
154 self
.assertEquals(response
, receivedResponse
)
156 class TestResponseRuleQNameAllowed(DNSDistTest
):
158 _config_template
= """
159 newServer{address="127.0.0.1:%s"}
160 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
161 addResponseAction(AllRule(), DropResponseAction())
164 def testAllowed(self
):
166 Responses: Allowed on QName
168 Send an A query to "allow.responses.tests.powerdns.com.",
169 check that the response is allowed.
171 name
= 'allow.responses.tests.powerdns.com.'
172 query
= dns
.message
.make_query(name
, 'A', 'IN')
173 response
= dns
.message
.make_response(query
)
175 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
176 receivedQuery
.id = query
.id
177 self
.assertEquals(query
, receivedQuery
)
178 self
.assertEquals(response
, receivedResponse
)
180 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
181 receivedQuery
.id = query
.id
182 self
.assertEquals(query
, receivedQuery
)
183 self
.assertEquals(response
, receivedResponse
)
185 def testNotAllowed(self
):
187 Responses: Not allowed on QName
189 Send an A query to "dontallow.responses.tests.powerdns.com.",
190 check that the response is dropped.
192 name
= 'dontallow.responses.tests.powerdns.com.'
193 query
= dns
.message
.make_query(name
, 'A', 'IN')
194 response
= dns
.message
.make_response(query
)
196 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
197 receivedQuery
.id = query
.id
198 self
.assertEquals(query
, receivedQuery
)
199 self
.assertEquals(receivedResponse
, None)
201 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
202 receivedQuery
.id = query
.id
203 self
.assertEquals(query
, receivedQuery
)
204 self
.assertEquals(receivedResponse
, None)
206 class TestResponseRuleEditTTL(DNSDistTest
):
209 _config_params
= ['_testServerPort', '_ttl']
210 _config_template
= """
211 newServer{address="127.0.0.1:%s"}
213 function editTTLCallback(section, class, type, ttl)
217 function editTTLFunc(dr)
218 dr:editTTLs(editTTLCallback)
219 return DNSAction.None, ""
222 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
225 def testTTLEdited(self
):
227 Responses: Alter the TTLs
229 name
= 'editttl.responses.tests.powerdns.com.'
230 query
= dns
.message
.make_query(name
, 'A', 'IN')
231 response
= dns
.message
.make_response(query
)
232 rrset
= dns
.rrset
.from_text(name
,
237 response
.answer
.append(rrset
)
239 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
240 receivedQuery
.id = query
.id
241 self
.assertEquals(query
, receivedQuery
)
242 self
.assertEquals(response
, receivedResponse
)
243 self
.assertNotEquals(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
244 self
.assertEquals(receivedResponse
.answer
[0].ttl
, self
._ttl
)
246 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
247 receivedQuery
.id = query
.id
248 self
.assertEquals(query
, receivedQuery
)
249 self
.assertEquals(response
, receivedResponse
)
250 self
.assertNotEquals(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
251 self
.assertEquals(receivedResponse
.answer
[0].ttl
, self
._ttl
)
253 class TestResponseLuaActionReturnSyntax(DNSDistTest
):
255 _config_template
= """
256 newServer{address="127.0.0.1:%s"}
257 function customDelay(dr)
258 return DNSResponseAction.Delay, "1000"
260 function customDrop(dr)
261 return DNSResponseAction.Drop
263 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
264 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), LuaResponseAction(customDelay))
267 def testResponseActionDelayed(self
):
269 Responses: Delayed via LuaResponseAction
271 Send an A query to "delayed.responses.tests.powerdns.com.",
272 check that the response delay is longer than 1000 ms
273 for a NXDomain response over UDP, shorter for a NoError one.
275 name
= 'delayed.responses.tests.powerdns.com.'
276 query
= dns
.message
.make_query(name
, 'A', 'IN')
277 response
= dns
.message
.make_response(query
)
280 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
281 begin
= datetime
.now()
282 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
284 receivedQuery
.id = query
.id
285 self
.assertEquals(query
, receivedQuery
)
286 self
.assertEquals(response
, receivedResponse
)
287 self
.assertTrue((end
- begin
) > timedelta(0, 1))
289 def testDropped(self
):
291 Responses: Dropped via user defined LuaResponseAction
293 Send an A query to "drop.responses.tests.powerdns.com.",
294 check that the response (not the query) is dropped.
296 name
= 'drop.responses.tests.powerdns.com.'
297 query
= dns
.message
.make_query(name
, 'A', 'IN')
298 response
= dns
.message
.make_response(query
)
300 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
301 receivedQuery
.id = query
.id
302 self
.assertEquals(query
, receivedQuery
)
303 self
.assertEquals(receivedResponse
, None)
305 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
306 receivedQuery
.id = query
.id
307 self
.assertEquals(query
, receivedQuery
)
308 self
.assertEquals(receivedResponse
, None)