]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Responses.py
2 from datetime
import datetime
, timedelta
5 from dnsdisttests
import DNSDistTest
7 class TestResponseRuleNXDelayed(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
11 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), DelayResponseAction(1000))
14 def testNXDelayed(self
):
16 Responses: Delayed on NXDomain
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
22 name
= 'delayed.responses.tests.powerdns.com.'
23 query
= dns
.message
.make_query(name
, 'A', 'IN')
24 response
= dns
.message
.make_response(query
)
27 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
28 begin
= datetime
.now()
29 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
31 receivedQuery
.id = query
.id
32 self
.assertEquals(query
, receivedQuery
)
33 self
.assertEquals(response
, receivedResponse
)
34 self
.assertTrue((end
- begin
) > timedelta(0, 1))
37 response
.set_rcode(dns
.rcode
.NOERROR
)
38 begin
= datetime
.now()
39 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
41 receivedQuery
.id = query
.id
42 self
.assertEquals(query
, receivedQuery
)
43 self
.assertEquals(response
, receivedResponse
)
44 self
.assertTrue((end
- begin
) < timedelta(0, 1))
47 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
48 begin
= datetime
.now()
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 receivedQuery
.id = query
.id
52 self
.assertEquals(query
, receivedQuery
)
53 self
.assertEquals(response
, receivedResponse
)
54 self
.assertTrue((end
- begin
) < timedelta(0, 1))
56 class TestResponseRuleERCode(DNSDistTest
):
58 _config_template
= """
59 newServer{address="127.0.0.1:%s"}
60 addResponseAction(ERCodeRule(dnsdist.BADVERS), DelayResponseAction(1000))
63 def testBADVERSDelayed(self
):
65 Responses: Delayed on BADVERS
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
71 name
= 'delayed.responses.tests.powerdns.com.'
72 query
= dns
.message
.make_query(name
, 'A', 'IN')
73 response
= dns
.message
.make_response(query
)
74 response
.use_edns(edns
=True)
77 # BADVERS == 16, so rcode==0, ercode==1
78 response
.set_rcode(dns
.rcode
.BADVERS
)
79 begin
= datetime
.now()
80 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
82 receivedQuery
.id = query
.id
83 self
.assertEquals(query
, receivedQuery
)
84 self
.assertEquals(response
, receivedResponse
)
85 self
.assertTrue((end
- begin
) > timedelta(0, 1))
87 # BADKEY (17, an ERCode) over UDP
88 response
.set_rcode(17)
89 begin
= datetime
.now()
90 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
92 receivedQuery
.id = query
.id
93 self
.assertEquals(query
, receivedQuery
)
94 self
.assertEquals(response
, receivedResponse
)
95 self
.assertTrue((end
- begin
) < timedelta(0, 1))
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response
.set_rcode(dns
.rcode
.NOERROR
)
99 begin
= datetime
.now()
100 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
102 receivedQuery
.id = query
.id
103 self
.assertEquals(query
, receivedQuery
)
104 self
.assertEquals(response
, receivedResponse
)
105 self
.assertTrue((end
- begin
) < timedelta(0, 1))
107 class TestResponseRuleQNameDropped(DNSDistTest
):
109 _config_template
= """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
114 def testDropped(self
):
116 Responses: Dropped on QName
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
121 name
= 'drop.responses.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'A', 'IN')
123 response
= dns
.message
.make_response(query
)
125 for method
in ("sendUDPQuery", "sendTCPQuery"):
126 sender
= getattr(self
, method
)
127 (receivedQuery
, receivedResponse
) = sender(query
, response
)
128 receivedQuery
.id = query
.id
129 self
.assertEquals(query
, receivedQuery
)
130 self
.assertEquals(receivedResponse
, None)
132 def testNotDropped(self
):
134 Responses: NOT Dropped on QName
136 Send an A query to "dontdrop.responses.tests.powerdns.com.",
137 check that the response is not dropped.
139 name
= 'dontdrop.responses.tests.powerdns.com.'
140 query
= dns
.message
.make_query(name
, 'A', 'IN')
141 response
= dns
.message
.make_response(query
)
143 for method
in ("sendUDPQuery", "sendTCPQuery"):
144 sender
= getattr(self
, method
)
145 (receivedQuery
, receivedResponse
) = sender(query
, response
)
146 receivedQuery
.id = query
.id
147 self
.assertEquals(query
, receivedQuery
)
148 self
.assertEquals(response
, receivedResponse
)
150 class TestResponseRuleQNameAllowed(DNSDistTest
):
152 _config_template
= """
153 newServer{address="127.0.0.1:%s"}
154 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
155 addResponseAction(AllRule(), DropResponseAction())
158 def testAllowed(self
):
160 Responses: Allowed on QName
162 Send an A query to "allow.responses.tests.powerdns.com.",
163 check that the response is allowed.
165 name
= 'allow.responses.tests.powerdns.com.'
166 query
= dns
.message
.make_query(name
, 'A', 'IN')
167 response
= dns
.message
.make_response(query
)
169 for method
in ("sendUDPQuery", "sendTCPQuery"):
170 sender
= getattr(self
, method
)
171 (receivedQuery
, receivedResponse
) = sender(query
, response
)
172 receivedQuery
.id = query
.id
173 self
.assertEquals(query
, receivedQuery
)
174 self
.assertEquals(response
, receivedResponse
)
176 def testNotAllowed(self
):
178 Responses: Not allowed on QName
180 Send an A query to "dontallow.responses.tests.powerdns.com.",
181 check that the response is dropped.
183 name
= 'dontallow.responses.tests.powerdns.com.'
184 query
= dns
.message
.make_query(name
, 'A', 'IN')
185 response
= dns
.message
.make_response(query
)
187 for method
in ("sendUDPQuery", "sendTCPQuery"):
188 sender
= getattr(self
, method
)
189 (receivedQuery
, receivedResponse
) = sender(query
, response
)
190 receivedQuery
.id = query
.id
191 self
.assertEquals(query
, receivedQuery
)
192 self
.assertEquals(receivedResponse
, None)
194 class TestResponseRuleEditTTL(DNSDistTest
):
197 _config_params
= ['_testServerPort', '_ttl']
198 _config_template
= """
199 newServer{address="127.0.0.1:%s"}
201 function editTTLCallback(section, class, type, ttl)
205 function editTTLFunc(dr)
206 dr:editTTLs(editTTLCallback)
207 return DNSAction.None, ""
210 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
213 def testTTLEdited(self
):
215 Responses: Alter the TTLs
217 name
= 'editttl.responses.tests.powerdns.com.'
218 query
= dns
.message
.make_query(name
, 'A', 'IN')
219 response
= dns
.message
.make_response(query
)
220 rrset
= dns
.rrset
.from_text(name
,
225 response
.answer
.append(rrset
)
227 for method
in ("sendUDPQuery", "sendTCPQuery"):
228 sender
= getattr(self
, method
)
229 (receivedQuery
, receivedResponse
) = sender(query
, response
)
230 receivedQuery
.id = query
.id
231 self
.assertEquals(query
, receivedQuery
)
232 self
.assertEquals(response
, receivedResponse
)
233 self
.assertNotEquals(response
.answer
[0].ttl
, receivedResponse
.answer
[0].ttl
)
234 self
.assertEquals(receivedResponse
.answer
[0].ttl
, self
._ttl
)
236 class TestResponseLuaActionReturnSyntax(DNSDistTest
):
238 _config_template
= """
239 newServer{address="127.0.0.1:%s"}
240 function customDelay(dr)
241 return DNSResponseAction.Delay, "1000"
243 function customDrop(dr)
244 return DNSResponseAction.Drop
246 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
247 addResponseAction(RCodeRule(dnsdist.NXDOMAIN), LuaResponseAction(customDelay))
250 def testResponseActionDelayed(self
):
252 Responses: Delayed via LuaResponseAction
254 Send an A query to "delayed.responses.tests.powerdns.com.",
255 check that the response delay is longer than 1000 ms
256 for a NXDomain response over UDP, shorter for a NoError one.
258 name
= 'delayed.responses.tests.powerdns.com.'
259 query
= dns
.message
.make_query(name
, 'A', 'IN')
260 response
= dns
.message
.make_response(query
)
263 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
264 begin
= datetime
.now()
265 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
267 receivedQuery
.id = query
.id
268 self
.assertEquals(query
, receivedQuery
)
269 self
.assertEquals(response
, receivedResponse
)
270 self
.assertTrue((end
- begin
) > timedelta(0, 1))
272 def testDropped(self
):
274 Responses: Dropped via user defined LuaResponseAction
276 Send an A query to "drop.responses.tests.powerdns.com.",
277 check that the response (not the query) is dropped.
279 name
= 'drop.responses.tests.powerdns.com.'
280 query
= dns
.message
.make_query(name
, 'A', 'IN')
281 response
= dns
.message
.make_response(query
)
283 for method
in ("sendUDPQuery", "sendTCPQuery"):
284 sender
= getattr(self
, method
)
285 (receivedQuery
, receivedResponse
) = sender(query
, response
)
286 receivedQuery
.id = query
.id
287 self
.assertEquals(query
, receivedQuery
)
288 self
.assertEquals(receivedResponse
, None)