]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Responses.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Responses.py
CommitLineData
788c3243
RG
1#!/usr/bin/env python
2from datetime import datetime, timedelta
3import time
4import dns
5from dnsdisttests import DNSDistTest
6
7class TestResponseRuleNXDelayed(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
d3ec24f9 11 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000))
788c3243
RG
12 """
13
14 def testNXDelayed(self):
15 """
16 Responses: Delayed on NXDomain
17
18 Send an A query to "delayed.responses.tests.powerdns.com.",
19 check that the response delay is longer than 1000 ms
20 for a NXDomain response over UDP, shorter for a NoError one.
21 """
22 name = 'delayed.responses.tests.powerdns.com.'
23 query = dns.message.make_query(name, 'A', 'IN')
24 response = dns.message.make_response(query)
25
26 # NX over UDP
27 response.set_rcode(dns.rcode.NXDOMAIN)
28 begin = datetime.now()
29 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
30 end = datetime.now()
31 receivedQuery.id = query.id
32 self.assertEquals(query, receivedQuery)
33 self.assertEquals(response, receivedResponse)
34 self.assertTrue((end - begin) > timedelta(0, 1))
35
36 # NoError over UDP
37 response.set_rcode(dns.rcode.NOERROR)
38 begin = datetime.now()
39 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
40 end = datetime.now()
41 receivedQuery.id = query.id
42 self.assertEquals(query, receivedQuery)
43 self.assertEquals(response, receivedResponse)
44 self.assertTrue((end - begin) < timedelta(0, 1))
45
46 # NX over TCP
47 response.set_rcode(dns.rcode.NXDOMAIN)
48 begin = datetime.now()
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 end = datetime.now()
51 receivedQuery.id = query.id
52 self.assertEquals(query, receivedQuery)
53 self.assertEquals(response, receivedResponse)
54 self.assertTrue((end - begin) < timedelta(0, 1))
55
d83feb68
CH
56class TestResponseRuleERCode(DNSDistTest):
57
58 _config_template = """
59 newServer{address="127.0.0.1:%s"}
d3ec24f9 60 addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000))
d83feb68
CH
61 """
62
63 def testBADVERSDelayed(self):
64 """
65 Responses: Delayed on BADVERS
66
67 Send an A query to "delayed.responses.tests.powerdns.com.",
68 check that the response delay is longer than 1000 ms
69 for a BADVERS response over UDP, shorter for BADKEY and NoError.
70 """
71 name = 'delayed.responses.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'A', 'IN')
73 response = dns.message.make_response(query)
74 response.use_edns(edns=True)
75
76 # BADVERS over UDP
77 # BADVERS == 16, so rcode==0, ercode==1
78 response.set_rcode(dns.rcode.BADVERS)
79 begin = datetime.now()
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 end = datetime.now()
82 receivedQuery.id = query.id
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85 self.assertTrue((end - begin) > timedelta(0, 1))
86
87 # BADKEY (17, an ERCode) over UDP
88 response.set_rcode(17)
89 begin = datetime.now()
90 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
91 end = datetime.now()
92 receivedQuery.id = query.id
93 self.assertEquals(query, receivedQuery)
94 self.assertEquals(response, receivedResponse)
95 self.assertTrue((end - begin) < timedelta(0, 1))
96
97 # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP
98 response.set_rcode(dns.rcode.NOERROR)
99 begin = datetime.now()
100 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
101 end = datetime.now()
102 receivedQuery.id = query.id
103 self.assertEquals(query, receivedQuery)
104 self.assertEquals(response, receivedResponse)
105 self.assertTrue((end - begin) < timedelta(0, 1))
106
788c3243
RG
107class TestResponseRuleQNameDropped(DNSDistTest):
108
109 _config_template = """
110 newServer{address="127.0.0.1:%s"}
111 addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction())
112 """
113
114 def testDropped(self):
115 """
116 Responses: Dropped on QName
117
118 Send an A query to "drop.responses.tests.powerdns.com.",
119 check that the response (not the query) is dropped.
120 """
121 name = 'drop.responses.tests.powerdns.com.'
122 query = dns.message.make_query(name, 'A', 'IN')
123 response = dns.message.make_response(query)
124
6ca2e796
RG
125 for method in ("sendUDPQuery", "sendTCPQuery"):
126 sender = getattr(self, method)
127 (receivedQuery, receivedResponse) = sender(query, response)
128 receivedQuery.id = query.id
129 self.assertEquals(query, receivedQuery)
130 self.assertEquals(receivedResponse, None)
788c3243
RG
131
132 def testNotDropped(self):
133 """
134 Responses: NOT Dropped on QName
135
136 Send an A query to "dontdrop.responses.tests.powerdns.com.",
137 check that the response is not dropped.
138 """
139 name = 'dontdrop.responses.tests.powerdns.com.'
140 query = dns.message.make_query(name, 'A', 'IN')
141 response = dns.message.make_response(query)
142
6ca2e796
RG
143 for method in ("sendUDPQuery", "sendTCPQuery"):
144 sender = getattr(self, method)
145 (receivedQuery, receivedResponse) = sender(query, response)
146 receivedQuery.id = query.id
147 self.assertEquals(query, receivedQuery)
148 self.assertEquals(response, receivedResponse)
788c3243
RG
149
150class TestResponseRuleQNameAllowed(DNSDistTest):
151
152 _config_template = """
153 newServer{address="127.0.0.1:%s"}
154 addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction())
155 addResponseAction(AllRule(), DropResponseAction())
156 """
157
158 def testAllowed(self):
159 """
160 Responses: Allowed on QName
161
162 Send an A query to "allow.responses.tests.powerdns.com.",
163 check that the response is allowed.
164 """
165 name = 'allow.responses.tests.powerdns.com.'
166 query = dns.message.make_query(name, 'A', 'IN')
167 response = dns.message.make_response(query)
168
6ca2e796
RG
169 for method in ("sendUDPQuery", "sendTCPQuery"):
170 sender = getattr(self, method)
171 (receivedQuery, receivedResponse) = sender(query, response)
172 receivedQuery.id = query.id
173 self.assertEquals(query, receivedQuery)
174 self.assertEquals(response, receivedResponse)
788c3243
RG
175
176 def testNotAllowed(self):
177 """
178 Responses: Not allowed on QName
179
180 Send an A query to "dontallow.responses.tests.powerdns.com.",
181 check that the response is dropped.
182 """
183 name = 'dontallow.responses.tests.powerdns.com.'
184 query = dns.message.make_query(name, 'A', 'IN')
185 response = dns.message.make_response(query)
186
6ca2e796
RG
187 for method in ("sendUDPQuery", "sendTCPQuery"):
188 sender = getattr(self, method)
189 (receivedQuery, receivedResponse) = sender(query, response)
190 receivedQuery.id = query.id
191 self.assertEquals(query, receivedQuery)
192 self.assertEquals(receivedResponse, None)
153d5065
RG
193
194class TestResponseRuleEditTTL(DNSDistTest):
195
196 _ttl = 5
197 _config_params = ['_testServerPort', '_ttl']
198 _config_template = """
199 newServer{address="127.0.0.1:%s"}
200
201 function editTTLCallback(section, class, type, ttl)
202 return %d
203 end
204
205 function editTTLFunc(dr)
206 dr:editTTLs(editTTLCallback)
207 return DNSAction.None, ""
208 end
209
a2ff35e3 210 addResponseAction(AllRule(), LuaResponseAction(editTTLFunc))
153d5065
RG
211 """
212
213 def testTTLEdited(self):
214 """
215 Responses: Alter the TTLs
216 """
217 name = 'editttl.responses.tests.powerdns.com.'
218 query = dns.message.make_query(name, 'A', 'IN')
219 response = dns.message.make_response(query)
220 rrset = dns.rrset.from_text(name,
221 3600,
222 dns.rdataclass.IN,
223 dns.rdatatype.A,
224 '192.0.2.1')
225 response.answer.append(rrset)
226
6ca2e796
RG
227 for method in ("sendUDPQuery", "sendTCPQuery"):
228 sender = getattr(self, method)
229 (receivedQuery, receivedResponse) = sender(query, response)
230 receivedQuery.id = query.id
231 self.assertEquals(query, receivedQuery)
232 self.assertEquals(response, receivedResponse)
233 self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl)
234 self.assertEquals(receivedResponse.answer[0].ttl, self._ttl)
63cdc73d
CHB
235
236class TestResponseLuaActionReturnSyntax(DNSDistTest):
237
238 _config_template = """
239 newServer{address="127.0.0.1:%s"}
240 function customDelay(dr)
241 return DNSResponseAction.Delay, "1000"
242 end
243 function customDrop(dr)
244 return DNSResponseAction.Drop
245 end
246 addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop))
d3ec24f9 247 addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay))
63cdc73d
CHB
248 """
249
250 def testResponseActionDelayed(self):
251 """
252 Responses: Delayed via LuaResponseAction
253
254 Send an A query to "delayed.responses.tests.powerdns.com.",
255 check that the response delay is longer than 1000 ms
256 for a NXDomain response over UDP, shorter for a NoError one.
257 """
258 name = 'delayed.responses.tests.powerdns.com.'
259 query = dns.message.make_query(name, 'A', 'IN')
260 response = dns.message.make_response(query)
261
262 # NX over UDP
263 response.set_rcode(dns.rcode.NXDOMAIN)
264 begin = datetime.now()
265 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
266 end = datetime.now()
267 receivedQuery.id = query.id
268 self.assertEquals(query, receivedQuery)
269 self.assertEquals(response, receivedResponse)
270 self.assertTrue((end - begin) > timedelta(0, 1))
271
272 def testDropped(self):
273 """
274 Responses: Dropped via user defined LuaResponseAction
275
276 Send an A query to "drop.responses.tests.powerdns.com.",
277 check that the response (not the query) is dropped.
278 """
279 name = 'drop.responses.tests.powerdns.com.'
280 query = dns.message.make_query(name, 'A', 'IN')
281 response = dns.message.make_response(query)
282
6ca2e796
RG
283 for method in ("sendUDPQuery", "sendTCPQuery"):
284 sender = getattr(self, method)
285 (receivedQuery, receivedResponse) = sender(query, response)
286 receivedQuery.id = query.id
287 self.assertEquals(query, receivedQuery)
288 self.assertEquals(receivedResponse, None)