]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Trailing.py
4 from dnsdisttests
import DNSDistTest
6 class TestTrailingDataToBackend(DNSDistTest
):
8 # this test suite uses a different responder port
9 # because, contrary to the other ones, its
10 # responders allow trailing data and we don't want
12 _testServerPort
= 5360
14 _config_template
= """
15 newServer{address="127.0.0.1:%s"}
17 function replaceTrailingData(dq)
18 local success = dq:setTrailingData("ABC")
20 return DNSAction.ServFail, ""
22 return DNSAction.None, ""
24 addAction("added.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData))
26 function fillBuffer(dq)
27 local available = dq.size - dq.len
28 local tail = string.rep("A", available)
29 local success = dq:setTrailingData(tail)
31 return DNSAction.ServFail, ""
33 return DNSAction.None, ""
35 addAction("max.trailing.tests.powerdns.com.", LuaAction(fillBuffer))
37 function exceedBuffer(dq)
38 local available = dq.size - dq.len
39 local tail = string.rep("A", available + 1)
40 local success = dq:setTrailingData(tail)
42 return DNSAction.ServFail, ""
44 return DNSAction.None, ""
46 addAction("limited.trailing.tests.powerdns.com.", LuaAction(exceedBuffer))
49 def startResponders(cls
):
50 print("Launching responders..")
52 # Respond REFUSED to queries with trailing data.
53 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, dns
.rcode
.REFUSED
])
54 cls
._UDPResponder
.setDaemon(True)
55 cls
._UDPResponder
.start()
57 # Respond REFUSED to queries with trailing data.
58 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, dns
.rcode
.REFUSED
])
59 cls
._TCPResponder
.setDaemon(True)
60 cls
._TCPResponder
.start()
62 def testTrailingPassthrough(self
):
64 Trailing data: Pass through
67 name
= 'passthrough.trailing.tests.powerdns.com.'
68 query
= dns
.message
.make_query(name
, 'A', 'IN')
69 response
= dns
.message
.make_response(query
)
70 rrset
= dns
.rrset
.from_text(name
,
75 response
.answer
.append(rrset
)
76 expectedResponse
= dns
.message
.make_response(query
)
77 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
82 for method
in ("sendUDPQuery", "sendTCPQuery"):
83 sender
= getattr(self
, method
)
84 (receivedQuery
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
85 self
.assertTrue(receivedQuery
)
86 self
.assertTrue(receivedResponse
)
87 receivedQuery
.id = query
.id
88 self
.assertEquals(receivedQuery
, query
)
89 self
.assertEquals(receivedResponse
, expectedResponse
)
91 def testTrailingCapacity(self
):
93 Trailing data: Fill buffer
96 name
= 'max.trailing.tests.powerdns.com.'
97 query
= dns
.message
.make_query(name
, 'A', 'IN')
98 response
= dns
.message
.make_response(query
)
99 rrset
= dns
.rrset
.from_text(name
,
104 response
.answer
.append(rrset
)
105 expectedResponse
= dns
.message
.make_response(query
)
106 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
108 for method
in ("sendUDPQuery", "sendTCPQuery"):
109 sender
= getattr(self
, method
)
110 (receivedQuery
, receivedResponse
) = sender(query
, response
)
111 self
.assertTrue(receivedQuery
)
112 self
.assertTrue(receivedResponse
)
113 receivedQuery
.id = query
.id
114 self
.assertEquals(receivedQuery
, query
)
115 self
.assertEquals(receivedResponse
, expectedResponse
)
117 def testTrailingLimited(self
):
119 Trailing data: Reject buffer overflows
122 name
= 'limited.trailing.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN')
124 response
= dns
.message
.make_response(query
)
125 rrset
= dns
.rrset
.from_text(name
,
130 response
.answer
.append(rrset
)
131 expectedResponse
= dns
.message
.make_response(query
)
132 expectedResponse
.set_rcode(dns
.rcode
.SERVFAIL
)
134 for method
in ("sendUDPQuery", "sendTCPQuery"):
135 sender
= getattr(self
, method
)
136 (_
, receivedResponse
) = sender(query
, response
, useQueue
=False)
137 self
.assertTrue(receivedResponse
)
138 self
.assertEquals(receivedResponse
, expectedResponse
)
140 def testTrailingAdded(self
):
145 name
= 'added.trailing.tests.powerdns.com.'
146 query
= dns
.message
.make_query(name
, 'A', 'IN')
147 response
= dns
.message
.make_response(query
)
148 rrset
= dns
.rrset
.from_text(name
,
153 response
.answer
.append(rrset
)
154 expectedResponse
= dns
.message
.make_response(query
)
155 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
157 for method
in ("sendUDPQuery", "sendTCPQuery"):
158 sender
= getattr(self
, method
)
159 (receivedQuery
, receivedResponse
) = sender(query
, response
)
160 self
.assertTrue(receivedQuery
)
161 self
.assertTrue(receivedResponse
)
162 receivedQuery
.id = query
.id
163 self
.assertEquals(receivedQuery
, query
)
164 self
.assertEquals(receivedResponse
, expectedResponse
)
166 class TestTrailingDataToDnsdist(DNSDistTest
):
168 _config_template
= """
169 newServer{address="127.0.0.1:%s"}
171 addAction(AndRule({QNameRule("dropped.trailing.tests.powerdns.com."), TrailingDataRule()}), DropAction())
173 function removeTrailingData(dq)
174 local success = dq:setTrailingData("")
176 print("Trailing removal failed")
177 return DNSAction.ServFail, ""
179 return DNSAction.None, ""
181 addAction("removed.trailing.tests.powerdns.com.", LuaAction(removeTrailingData))
183 function reportTrailingData(dq)
184 local tail = dq:getTrailingData()
185 return DNSAction.Spoof, "-" .. tail .. ".echoed.trailing.tests.powerdns.com."
187 addAction("echoed.trailing.tests.powerdns.com.", LuaAction(reportTrailingData))
189 function replaceTrailingData(dq)
190 local success = dq:setTrailingData("ABC")
192 return DNSAction.ServFail, ""
194 return DNSAction.None, ""
196 addAction("replaced.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData))
197 addAction("replaced.trailing.tests.powerdns.com.", LuaAction(reportTrailingData))
199 function reportTrailingHex(dq)
200 local tail = dq:getTrailingData()
201 local hex = string.gsub(tail, ".", function(ch)
202 return string.sub(string.format("\\x2502X", string.byte(ch)), -2)
204 return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com."
206 addAction("echoed-hex.trailing.tests.powerdns.com.", LuaAction(reportTrailingHex))
208 function replaceTrailingData_unsafe(dq)
209 local success = dq:setTrailingData("\\xB0\\x00\\xDE\\xADB\\xF0\\x9F\\x91\\xBB\\xC3\\xBE")
211 return DNSAction.ServFail, ""
213 return DNSAction.None, ""
215 addAction("replaced-unsafe.trailing.tests.powerdns.com.", LuaAction(replaceTrailingData_unsafe))
216 addAction("replaced-unsafe.trailing.tests.powerdns.com.", LuaAction(reportTrailingHex))
219 def testTrailingDropped(self
):
221 Trailing data: Drop query
224 name
= 'dropped.trailing.tests.powerdns.com.'
225 query
= dns
.message
.make_query(name
, 'A', 'IN')
226 response
= dns
.message
.make_response(query
)
227 rrset
= dns
.rrset
.from_text(name
,
232 response
.answer
.append(rrset
)
234 raw
= query
.to_wire()
237 for method
in ("sendUDPQuery", "sendTCPQuery"):
238 sender
= getattr(self
, method
)
240 # Verify that queries with no trailing data make it through.
241 (receivedQuery
, receivedResponse
) = sender(query
, response
)
242 self
.assertTrue(receivedQuery
)
243 self
.assertTrue(receivedResponse
)
244 receivedQuery
.id = query
.id
245 self
.assertEquals(query
, receivedQuery
)
246 self
.assertEquals(response
, receivedResponse
)
248 # Verify that queries with trailing data don't make it through.
249 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True, useQueue
=False)
250 self
.assertEquals(receivedResponse
, None)
252 def testTrailingRemoved(self
):
254 Trailing data: Remove
257 name
= 'removed.trailing.tests.powerdns.com.'
258 query
= dns
.message
.make_query(name
, 'A', 'IN')
259 response
= dns
.message
.make_response(query
)
260 rrset
= dns
.rrset
.from_text(name
,
265 response
.answer
.append(rrset
)
267 raw
= query
.to_wire()
270 for method
in ("sendUDPQuery", "sendTCPQuery"):
271 sender
= getattr(self
, method
)
272 (receivedQuery
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
273 self
.assertTrue(receivedQuery
)
274 self
.assertTrue(receivedResponse
)
275 receivedQuery
.id = query
.id
276 self
.assertEquals(receivedQuery
, query
)
277 self
.assertEquals(receivedResponse
, response
)
279 def testTrailingRead(self
):
284 name
= 'echoed.trailing.tests.powerdns.com.'
285 query
= dns
.message
.make_query(name
, 'A', 'IN')
286 response
= dns
.message
.make_response(query
)
287 response
.set_rcode(dns
.rcode
.SERVFAIL
)
288 expectedResponse
= dns
.message
.make_response(query
)
289 rrset
= dns
.rrset
.from_text(name
,
293 '-TrailingData.echoed.trailing.tests.powerdns.com.')
294 expectedResponse
.answer
.append(rrset
)
296 raw
= query
.to_wire()
297 raw
= raw
+ b
'TrailingData'
299 for method
in ("sendUDPQuery", "sendTCPQuery"):
300 sender
= getattr(self
, method
)
301 (_
, receivedResponse
) = sender(raw
, response
=None, rawQuery
=True, useQueue
=False)
302 self
.assertTrue(receivedResponse
)
303 expectedResponse
.flags
= receivedResponse
.flags
304 self
.assertEquals(receivedResponse
, expectedResponse
)
306 def testTrailingReplaced(self
):
308 Trailing data: Replace
311 name
= 'replaced.trailing.tests.powerdns.com.'
312 query
= dns
.message
.make_query(name
, 'A', 'IN')
313 response
= dns
.message
.make_response(query
)
314 response
.set_rcode(dns
.rcode
.SERVFAIL
)
315 expectedResponse
= dns
.message
.make_response(query
)
316 rrset
= dns
.rrset
.from_text(name
,
320 '-ABC.echoed.trailing.tests.powerdns.com.')
321 expectedResponse
.answer
.append(rrset
)
323 raw
= query
.to_wire()
324 raw
= raw
+ b
'TrailingData'
326 for method
in ("sendUDPQuery", "sendTCPQuery"):
327 sender
= getattr(self
, method
)
328 (_
, receivedResponse
) = sender(raw
, response
=None, rawQuery
=True, useQueue
=False)
329 self
.assertTrue(receivedResponse
)
330 expectedResponse
.flags
= receivedResponse
.flags
331 self
.assertEquals(receivedResponse
, expectedResponse
)
333 def testTrailingReadUnsafe(self
):
335 Trailing data: Echo as hex
338 name
= 'echoed-hex.trailing.tests.powerdns.com.'
339 query
= dns
.message
.make_query(name
, 'A', 'IN')
340 response
= dns
.message
.make_response(query
)
341 response
.set_rcode(dns
.rcode
.SERVFAIL
)
342 expectedResponse
= dns
.message
.make_response(query
)
343 rrset
= dns
.rrset
.from_text(name
,
347 '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
348 expectedResponse
.answer
.append(rrset
)
350 raw
= query
.to_wire()
351 raw
= raw
+ b
'\x00\x00\xDE\xAD'
353 for method
in ("sendUDPQuery", "sendTCPQuery"):
354 sender
= getattr(self
, method
)
355 (_
, receivedResponse
) = sender(raw
, response
=None, rawQuery
=True, useQueue
=False)
356 self
.assertTrue(receivedResponse
)
357 expectedResponse
.flags
= receivedResponse
.flags
358 self
.assertEquals(receivedResponse
, expectedResponse
)
360 def testTrailingReplacedUnsafe(self
):
362 Trailing data: Replace with null and/or non-ASCII bytes
365 name
= 'replaced-unsafe.trailing.tests.powerdns.com.'
366 query
= dns
.message
.make_query(name
, 'A', 'IN')
367 response
= dns
.message
.make_response(query
)
368 response
.set_rcode(dns
.rcode
.SERVFAIL
)
369 expectedResponse
= dns
.message
.make_response(query
)
370 rrset
= dns
.rrset
.from_text(name
,
374 '-0xB000DEAD42F09F91BBC3BE.echoed-hex.trailing.tests.powerdns.com.')
375 expectedResponse
.answer
.append(rrset
)
377 raw
= query
.to_wire()
378 raw
= raw
+ b
'TrailingData'
380 for method
in ("sendUDPQuery", "sendTCPQuery"):
381 sender
= getattr(self
, method
)
382 (_
, receivedResponse
) = sender(raw
, response
=None, rawQuery
=True, useQueue
=False)
383 self
.assertTrue(receivedResponse
)
384 expectedResponse
.flags
= receivedResponse
.flags
385 self
.assertEquals(receivedResponse
, expectedResponse
)