]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Trailing.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Trailing.py
1 #!/usr/bin/env python
2 import threading
3 import dns
4 from dnsdisttests import DNSDistTest
5
6 class TestTrailingDataToBackend(DNSDistTest):
7
8 # this test suite uses a different responder port
9 # because, contrary to the other ones, its
10 # responders allow trailing data and we don't want
11 # to mix things up.
12 _testServerPort = 5360
13 _config_template = """
14 newServer{address="127.0.0.1:%s"}
15
16 function replaceTrailingData(dq)
17 local success = dq:setTrailingData("ABC")
18 if not success then
19 return DNSAction.ServFail, ""
20 end
21 return DNSAction.None, ""
22 end
23 addAction("added.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData))
24
25 function fillBuffer(dq)
26 local available = dq.size - dq.len
27 local tail = string.rep("A", available)
28 local success = dq:setTrailingData(tail)
29 if not success then
30 return DNSAction.ServFail, ""
31 end
32 return DNSAction.None, ""
33 end
34 addAction("max.trailing.tests.powerdns.com.", LuaAction(fillBuffer))
35
36 function exceedBuffer(dq)
37 local available = dq.size - dq.len
38 local tail = string.rep("A", available + 1)
39 local success = dq:setTrailingData(tail)
40 if not success then
41 return DNSAction.ServFail, ""
42 end
43 return DNSAction.None, ""
44 end
45 addAction("limited.trailing.tests.powerdns.com.", LuaAction(exceedBuffer))
46 """
47 @classmethod
48 def startResponders(cls):
49 print("Launching responders..")
50
51 # Respond REFUSED to queries with trailing data.
52 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, dns.rcode.REFUSED])
53 cls._UDPResponder.setDaemon(True)
54 cls._UDPResponder.start()
55
56 # Respond REFUSED to queries with trailing data.
57 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, dns.rcode.REFUSED])
58 cls._TCPResponder.setDaemon(True)
59 cls._TCPResponder.start()
60
61 def testTrailingPassthrough(self):
62 """
63 Trailing data: Pass through
64
65 """
66 name = 'passthrough.trailing.tests.powerdns.com.'
67 query = dns.message.make_query(name, 'A', 'IN')
68 response = dns.message.make_response(query)
69 rrset = dns.rrset.from_text(name,
70 3600,
71 dns.rdataclass.IN,
72 dns.rdatatype.A,
73 '127.0.0.1')
74 response.answer.append(rrset)
75 expectedResponse = dns.message.make_response(query)
76 expectedResponse.set_rcode(dns.rcode.REFUSED)
77
78 raw = query.to_wire()
79 raw = raw + b'A'* 20
80
81 for method in ("sendUDPQuery", "sendTCPQuery"):
82 sender = getattr(self, method)
83 (receivedQuery, receivedResponse) = sender(raw, response, rawQuery=True)
84 self.assertTrue(receivedQuery)
85 self.assertTrue(receivedResponse)
86 receivedQuery.id = query.id
87 self.assertEquals(receivedQuery, query)
88 self.assertEquals(receivedResponse, expectedResponse)
89
90 def testTrailingCapacity(self):
91 """
92 Trailing data: Fill buffer
93
94 """
95 name = 'max.trailing.tests.powerdns.com.'
96 query = dns.message.make_query(name, 'A', 'IN')
97 response = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '127.0.0.1')
103 response.answer.append(rrset)
104 expectedResponse = dns.message.make_response(query)
105 expectedResponse.set_rcode(dns.rcode.REFUSED)
106
107 for method in ("sendUDPQuery", "sendTCPQuery"):
108 sender = getattr(self, method)
109 (receivedQuery, receivedResponse) = sender(query, response)
110 self.assertTrue(receivedQuery)
111 self.assertTrue(receivedResponse)
112 receivedQuery.id = query.id
113 self.assertEquals(receivedQuery, query)
114 self.assertEquals(receivedResponse, expectedResponse)
115
116 def testTrailingLimited(self):
117 """
118 Trailing data: Reject buffer overflows
119
120 """
121 name = 'limited.trailing.tests.powerdns.com.'
122 query = dns.message.make_query(name, 'A', 'IN')
123 response = dns.message.make_response(query)
124 rrset = dns.rrset.from_text(name,
125 3600,
126 dns.rdataclass.IN,
127 dns.rdatatype.A,
128 '127.0.0.1')
129 response.answer.append(rrset)
130 expectedResponse = dns.message.make_response(query)
131 expectedResponse.set_rcode(dns.rcode.SERVFAIL)
132
133 for method in ("sendUDPQuery", "sendTCPQuery"):
134 sender = getattr(self, method)
135 (_, receivedResponse) = sender(query, response)
136 self.assertTrue(receivedResponse)
137 self.assertEquals(receivedResponse, expectedResponse)
138
139 def testTrailingAdded(self):
140 """
141 Trailing data: Add
142
143 """
144 name = 'added.trailing.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN')
146 response = dns.message.make_response(query)
147 rrset = dns.rrset.from_text(name,
148 3600,
149 dns.rdataclass.IN,
150 dns.rdatatype.A,
151 '127.0.0.1')
152 response.answer.append(rrset)
153 expectedResponse = dns.message.make_response(query)
154 expectedResponse.set_rcode(dns.rcode.REFUSED)
155
156 for method in ("sendUDPQuery", "sendTCPQuery"):
157 sender = getattr(self, method)
158 (receivedQuery, receivedResponse) = sender(query, response)
159 self.assertTrue(receivedQuery)
160 self.assertTrue(receivedResponse)
161 receivedQuery.id = query.id
162 self.assertEquals(receivedQuery, query)
163 self.assertEquals(receivedResponse, expectedResponse)
164
165 class TestTrailingDataToDnsdist(DNSDistTest):
166 _config_template = """
167 newServer{address="127.0.0.1:%s"}
168
169 addAction(AndRule({QNameRule("dropped.trailing.tests.powerdns.com."), TrailingDataRule()}), DropAction())
170
171 function removeTrailingData(dq)
172 local success = dq:setTrailingData("")
173 if not success then
174 return DNSAction.ServFail, ""
175 end
176 return DNSAction.None, ""
177 end
178 addAction("removed.trailing.tests.powerdns.com.", LuaAction(removeTrailingData))
179
180 function reportTrailingData(dq)
181 local tail = dq:getTrailingData()
182 return DNSAction.Spoof, "-" .. tail .. ".echoed.trailing.tests.powerdns.com."
183 end
184 addAction("echoed.trailing.tests.powerdns.com.", LuaAction(reportTrailingData))
185
186 function replaceTrailingData(dq)
187 local success = dq:setTrailingData("ABC")
188 if not success then
189 return DNSAction.ServFail, ""
190 end
191 return DNSAction.None, ""
192 end
193 addAction("replaced.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData))
194 addAction("replaced.trailing.tests.powerdns.com.", LuaAction(reportTrailingData))
195
196 function reportTrailingHex(dq)
197 local tail = dq:getTrailingData()
198 local hex = string.gsub(tail, ".", function(ch)
199 return string.sub(string.format("\\x2502X", string.byte(ch)), -2)
200 end)
201 return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com."
202 end
203 addAction("echoed-hex.trailing.tests.powerdns.com.", LuaAction(reportTrailingHex))
204
205 function replaceTrailingData_unsafe(dq)
206 local success = dq:setTrailingData("\\xB0\\x00\\xDE\\xADB\\xF0\\x9F\\x91\\xBB\\xC3\\xBE")
207 if not success then
208 return DNSAction.ServFail, ""
209 end
210 return DNSAction.None, ""
211 end
212 addAction("replaced-unsafe.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData_unsafe))
213 addAction("replaced-unsafe.trailing.tests.powerdns.com.", LuaAction(reportTrailingHex))
214 """
215
216 def testTrailingDropped(self):
217 """
218 Trailing data: Drop query
219
220 """
221 name = 'dropped.trailing.tests.powerdns.com.'
222 query = dns.message.make_query(name, 'A', 'IN')
223 response = dns.message.make_response(query)
224 rrset = dns.rrset.from_text(name,
225 3600,
226 dns.rdataclass.IN,
227 dns.rdatatype.A,
228 '127.0.0.1')
229 response.answer.append(rrset)
230
231 raw = query.to_wire()
232 raw = raw + b'A'* 20
233
234 for method in ("sendUDPQuery", "sendTCPQuery"):
235 sender = getattr(self, method)
236
237 # Verify that queries with no trailing data make it through.
238 (receivedQuery, receivedResponse) = sender(query, response)
239 self.assertTrue(receivedQuery)
240 self.assertTrue(receivedResponse)
241 receivedQuery.id = query.id
242 self.assertEquals(query, receivedQuery)
243 self.assertEquals(response, receivedResponse)
244
245 # Verify that queries with trailing data don't make it through.
246 (_, receivedResponse) = sender(raw, response, rawQuery=True)
247 self.assertEquals(receivedResponse, None)
248
249 def testTrailingRemoved(self):
250 """
251 Trailing data: Remove
252
253 """
254 name = 'removed.trailing.tests.powerdns.com.'
255 query = dns.message.make_query(name, 'A', 'IN')
256 response = dns.message.make_response(query)
257 rrset = dns.rrset.from_text(name,
258 3600,
259 dns.rdataclass.IN,
260 dns.rdatatype.A,
261 '127.0.0.1')
262 response.answer.append(rrset)
263
264 raw = query.to_wire()
265 raw = raw + b'A'* 20
266
267 for method in ("sendUDPQuery", "sendTCPQuery"):
268 sender = getattr(self, method)
269 (receivedQuery, receivedResponse) = sender(raw, response, rawQuery=True)
270 self.assertTrue(receivedQuery)
271 self.assertTrue(receivedResponse)
272 receivedQuery.id = query.id
273 self.assertEquals(receivedQuery, query)
274 self.assertEquals(receivedResponse, response)
275
276 def testTrailingRead(self):
277 """
278 Trailing data: Echo
279
280 """
281 name = 'echoed.trailing.tests.powerdns.com.'
282 query = dns.message.make_query(name, 'A', 'IN')
283 response = dns.message.make_response(query)
284 response.set_rcode(dns.rcode.SERVFAIL)
285 expectedResponse = dns.message.make_response(query)
286 rrset = dns.rrset.from_text(name,
287 60,
288 dns.rdataclass.IN,
289 dns.rdatatype.CNAME,
290 '-TrailingData.echoed.trailing.tests.powerdns.com.')
291 expectedResponse.answer.append(rrset)
292
293 raw = query.to_wire()
294 raw = raw + b'TrailingData'
295
296 for method in ("sendUDPQuery", "sendTCPQuery"):
297 sender = getattr(self, method)
298 (_, receivedResponse) = sender(raw, response, rawQuery=True)
299 self.assertTrue(receivedResponse)
300 expectedResponse.flags = receivedResponse.flags
301 self.assertEquals(receivedResponse, expectedResponse)
302
303 def testTrailingReplaced(self):
304 """
305 Trailing data: Replace
306
307 """
308 name = 'replaced.trailing.tests.powerdns.com.'
309 query = dns.message.make_query(name, 'A', 'IN')
310 response = dns.message.make_response(query)
311 response.set_rcode(dns.rcode.SERVFAIL)
312 expectedResponse = dns.message.make_response(query)
313 rrset = dns.rrset.from_text(name,
314 60,
315 dns.rdataclass.IN,
316 dns.rdatatype.CNAME,
317 '-ABC.echoed.trailing.tests.powerdns.com.')
318 expectedResponse.answer.append(rrset)
319
320 raw = query.to_wire()
321 raw = raw + b'TrailingData'
322
323 for method in ("sendUDPQuery", "sendTCPQuery"):
324 sender = getattr(self, method)
325 (_, receivedResponse) = sender(raw, response, rawQuery=True)
326 self.assertTrue(receivedResponse)
327 expectedResponse.flags = receivedResponse.flags
328 self.assertEquals(receivedResponse, expectedResponse)
329
330 def testTrailingReadUnsafe(self):
331 """
332 Trailing data: Echo as hex
333
334 """
335 name = 'echoed-hex.trailing.tests.powerdns.com.'
336 query = dns.message.make_query(name, 'A', 'IN')
337 response = dns.message.make_response(query)
338 response.set_rcode(dns.rcode.SERVFAIL)
339 expectedResponse = dns.message.make_response(query)
340 rrset = dns.rrset.from_text(name,
341 60,
342 dns.rdataclass.IN,
343 dns.rdatatype.CNAME,
344 '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
345 expectedResponse.answer.append(rrset)
346
347 raw = query.to_wire()
348 raw = raw + b'\x00\x00\xDE\xAD'
349
350 for method in ("sendUDPQuery", "sendTCPQuery"):
351 sender = getattr(self, method)
352 (_, receivedResponse) = sender(raw, response, rawQuery=True)
353 self.assertTrue(receivedResponse)
354 expectedResponse.flags = receivedResponse.flags
355 self.assertEquals(receivedResponse, expectedResponse)
356
357 def testTrailingReplacedUnsafe(self):
358 """
359 Trailing data: Replace with null and/or non-ASCII bytes
360
361 """
362 name = 'replaced-unsafe.trailing.tests.powerdns.com.'
363 query = dns.message.make_query(name, 'A', 'IN')
364 response = dns.message.make_response(query)
365 response.set_rcode(dns.rcode.SERVFAIL)
366 expectedResponse = dns.message.make_response(query)
367 rrset = dns.rrset.from_text(name,
368 60,
369 dns.rdataclass.IN,
370 dns.rdatatype.CNAME,
371 '-0xB000DEAD42F09F91BBC3BE.echoed-hex.trailing.tests.powerdns.com.')
372 expectedResponse.answer.append(rrset)
373
374 raw = query.to_wire()
375 raw = raw + b'TrailingData'
376
377 for method in ("sendUDPQuery", "sendTCPQuery"):
378 sender = getattr(self, method)
379 (_, receivedResponse) = sender(raw, response, rawQuery=True)
380 self.assertTrue(receivedResponse)
381 expectedResponse.flags = receivedResponse.flags
382 self.assertEquals(receivedResponse, expectedResponse)