]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Trailing.py
4 from dnsdisttests
import DNSDistTest
6 class TestTrailingDataToBackend(DNSDistTest
):
8 # this test suite uses a different responder port
9 # because, contrary to the other ones, its
10 # responders allow trailing data and we don't want
12 _testServerPort
= 5360
13 _config_template
= """
14 newServer{address="127.0.0.1:%s"}
16 function replaceTrailingData(dq)
17 local success = dq:setTrailingData("ABC")
19 return DNSAction.ServFail, ""
21 return DNSAction.None, ""
23 addLuaAction("added.trailing.tests.powerdns.com.", replaceTrailingData)
25 function fillBuffer(dq)
26 local available = dq.size - dq.len
27 local tail = string.rep("A", available)
28 local success = dq:setTrailingData(tail)
30 return DNSAction.ServFail, ""
32 return DNSAction.None, ""
34 addLuaAction("max.trailing.tests.powerdns.com.", fillBuffer)
36 function exceedBuffer(dq)
37 local available = dq.size - dq.len
38 local tail = string.rep("A", available + 1)
39 local success = dq:setTrailingData(tail)
41 return DNSAction.ServFail, ""
43 return DNSAction.None, ""
45 addLuaAction("limited.trailing.tests.powerdns.com.", exceedBuffer)
48 def startResponders(cls
):
49 print("Launching responders..")
51 # Respond REFUSED to queries with trailing data.
52 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, dns
.rcode
.REFUSED
])
53 cls
._UDPResponder
.setDaemon(True)
54 cls
._UDPResponder
.start()
56 # Respond REFUSED to queries with trailing data.
57 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, dns
.rcode
.REFUSED
])
58 cls
._TCPResponder
.setDaemon(True)
59 cls
._TCPResponder
.start()
61 def testTrailingPassthrough(self
):
63 Trailing data: Pass through
66 name
= 'passthrough.trailing.tests.powerdns.com.'
67 query
= dns
.message
.make_query(name
, 'A', 'IN')
68 response
= dns
.message
.make_response(query
)
69 rrset
= dns
.rrset
.from_text(name
,
74 response
.answer
.append(rrset
)
75 expectedResponse
= dns
.message
.make_response(query
)
76 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
81 for method
in ("sendUDPQuery", "sendTCPQuery"):
82 sender
= getattr(self
, method
)
83 (receivedQuery
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
84 self
.assertTrue(receivedQuery
)
85 self
.assertTrue(receivedResponse
)
86 receivedQuery
.id = query
.id
87 self
.assertEquals(receivedQuery
, query
)
88 self
.assertEquals(receivedResponse
, expectedResponse
)
90 def testTrailingCapacity(self
):
92 Trailing data: Fill buffer
95 name
= 'max.trailing.tests.powerdns.com.'
96 query
= dns
.message
.make_query(name
, 'A', 'IN')
97 response
= dns
.message
.make_response(query
)
98 rrset
= dns
.rrset
.from_text(name
,
103 response
.answer
.append(rrset
)
104 expectedResponse
= dns
.message
.make_response(query
)
105 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
107 for method
in ("sendUDPQuery", "sendTCPQuery"):
108 sender
= getattr(self
, method
)
109 (receivedQuery
, receivedResponse
) = sender(query
, response
)
110 self
.assertTrue(receivedQuery
)
111 self
.assertTrue(receivedResponse
)
112 receivedQuery
.id = query
.id
113 self
.assertEquals(receivedQuery
, query
)
114 self
.assertEquals(receivedResponse
, expectedResponse
)
116 def testTrailingLimited(self
):
118 Trailing data: Reject buffer overflows
121 name
= 'limited.trailing.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'A', 'IN')
123 response
= dns
.message
.make_response(query
)
124 rrset
= dns
.rrset
.from_text(name
,
129 response
.answer
.append(rrset
)
130 expectedResponse
= dns
.message
.make_response(query
)
131 expectedResponse
.set_rcode(dns
.rcode
.SERVFAIL
)
133 for method
in ("sendUDPQuery", "sendTCPQuery"):
134 sender
= getattr(self
, method
)
135 (_
, receivedResponse
) = sender(query
, response
)
136 self
.assertTrue(receivedResponse
)
137 self
.assertEquals(receivedResponse
, expectedResponse
)
139 def testTrailingAdded(self
):
144 name
= 'added.trailing.tests.powerdns.com.'
145 query
= dns
.message
.make_query(name
, 'A', 'IN')
146 response
= dns
.message
.make_response(query
)
147 rrset
= dns
.rrset
.from_text(name
,
152 response
.answer
.append(rrset
)
153 expectedResponse
= dns
.message
.make_response(query
)
154 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
156 for method
in ("sendUDPQuery", "sendTCPQuery"):
157 sender
= getattr(self
, method
)
158 (receivedQuery
, receivedResponse
) = sender(query
, response
)
159 self
.assertTrue(receivedQuery
)
160 self
.assertTrue(receivedResponse
)
161 receivedQuery
.id = query
.id
162 self
.assertEquals(receivedQuery
, query
)
163 self
.assertEquals(receivedResponse
, expectedResponse
)
165 class TestTrailingDataToDnsdist(DNSDistTest
):
166 _config_template
= """
167 newServer{address="127.0.0.1:%s"}
169 addAction(AndRule({QNameRule("dropped.trailing.tests.powerdns.com."), TrailingDataRule()}), DropAction())
171 function removeTrailingData(dq)
172 local success = dq:setTrailingData("")
174 return DNSAction.ServFail, ""
176 return DNSAction.None, ""
178 addLuaAction("removed.trailing.tests.powerdns.com.", removeTrailingData)
180 function reportTrailingData(dq)
181 local tail = dq:getTrailingData()
182 return DNSAction.Spoof, "-" .. tail .. ".echoed.trailing.tests.powerdns.com."
184 addLuaAction("echoed.trailing.tests.powerdns.com.", reportTrailingData)
186 function replaceTrailingData(dq)
187 local success = dq:setTrailingData("ABC")
189 return DNSAction.ServFail, ""
191 return DNSAction.None, ""
193 addLuaAction("replaced.trailing.tests.powerdns.com.", replaceTrailingData)
194 addLuaAction("replaced.trailing.tests.powerdns.com.", reportTrailingData)
196 function reportTrailingHex(dq)
197 local tail = dq:getTrailingData()
198 local hex = string.gsub(tail, ".", function(ch)
199 return string.sub(string.format("\\x2502X", string.byte(ch)), -2)
201 return DNSAction.Spoof, "-0x" .. hex .. ".echoed-hex.trailing.tests.powerdns.com."
203 addLuaAction("echoed-hex.trailing.tests.powerdns.com.", reportTrailingHex)
205 function replaceTrailingData_unsafe(dq)
206 local success = dq:setTrailingData("\\xB0\\x00\\xDE\\xADB\\xF0\\x9F\\x91\\xBB\\xC3\\xBE")
208 return DNSAction.ServFail, ""
210 return DNSAction.None, ""
212 addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", replaceTrailingData_unsafe)
213 addLuaAction("replaced-unsafe.trailing.tests.powerdns.com.", reportTrailingHex)
216 def testTrailingDropped(self
):
218 Trailing data: Drop query
221 name
= 'dropped.trailing.tests.powerdns.com.'
222 query
= dns
.message
.make_query(name
, 'A', 'IN')
223 response
= dns
.message
.make_response(query
)
224 rrset
= dns
.rrset
.from_text(name
,
229 response
.answer
.append(rrset
)
231 raw
= query
.to_wire()
234 for method
in ("sendUDPQuery", "sendTCPQuery"):
235 sender
= getattr(self
, method
)
237 # Verify that queries with no trailing data make it through.
238 (receivedQuery
, receivedResponse
) = sender(query
, response
)
239 self
.assertTrue(receivedQuery
)
240 self
.assertTrue(receivedResponse
)
241 receivedQuery
.id = query
.id
242 self
.assertEquals(query
, receivedQuery
)
243 self
.assertEquals(response
, receivedResponse
)
245 # Verify that queries with trailing data don't make it through.
246 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
247 self
.assertEquals(receivedResponse
, None)
249 def testTrailingRemoved(self
):
251 Trailing data: Remove
254 name
= 'removed.trailing.tests.powerdns.com.'
255 query
= dns
.message
.make_query(name
, 'A', 'IN')
256 response
= dns
.message
.make_response(query
)
257 rrset
= dns
.rrset
.from_text(name
,
262 response
.answer
.append(rrset
)
264 raw
= query
.to_wire()
267 for method
in ("sendUDPQuery", "sendTCPQuery"):
268 sender
= getattr(self
, method
)
269 (receivedQuery
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
270 self
.assertTrue(receivedQuery
)
271 self
.assertTrue(receivedResponse
)
272 receivedQuery
.id = query
.id
273 self
.assertEquals(receivedQuery
, query
)
274 self
.assertEquals(receivedResponse
, response
)
276 def testTrailingRead(self
):
281 name
= 'echoed.trailing.tests.powerdns.com.'
282 query
= dns
.message
.make_query(name
, 'A', 'IN')
283 response
= dns
.message
.make_response(query
)
284 response
.set_rcode(dns
.rcode
.SERVFAIL
)
285 expectedResponse
= dns
.message
.make_response(query
)
286 rrset
= dns
.rrset
.from_text(name
,
290 '-TrailingData.echoed.trailing.tests.powerdns.com.')
291 expectedResponse
.answer
.append(rrset
)
293 raw
= query
.to_wire()
294 raw
= raw
+ b
'TrailingData'
296 for method
in ("sendUDPQuery", "sendTCPQuery"):
297 sender
= getattr(self
, method
)
298 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
299 self
.assertTrue(receivedResponse
)
300 expectedResponse
.flags
= receivedResponse
.flags
301 self
.assertEquals(receivedResponse
, expectedResponse
)
303 def testTrailingReplaced(self
):
305 Trailing data: Replace
308 name
= 'replaced.trailing.tests.powerdns.com.'
309 query
= dns
.message
.make_query(name
, 'A', 'IN')
310 response
= dns
.message
.make_response(query
)
311 response
.set_rcode(dns
.rcode
.SERVFAIL
)
312 expectedResponse
= dns
.message
.make_response(query
)
313 rrset
= dns
.rrset
.from_text(name
,
317 '-ABC.echoed.trailing.tests.powerdns.com.')
318 expectedResponse
.answer
.append(rrset
)
320 raw
= query
.to_wire()
321 raw
= raw
+ b
'TrailingData'
323 for method
in ("sendUDPQuery", "sendTCPQuery"):
324 sender
= getattr(self
, method
)
325 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
326 self
.assertTrue(receivedResponse
)
327 expectedResponse
.flags
= receivedResponse
.flags
328 self
.assertEquals(receivedResponse
, expectedResponse
)
330 def testTrailingReadUnsafe(self
):
332 Trailing data: Echo as hex
335 name
= 'echoed-hex.trailing.tests.powerdns.com.'
336 query
= dns
.message
.make_query(name
, 'A', 'IN')
337 response
= dns
.message
.make_response(query
)
338 response
.set_rcode(dns
.rcode
.SERVFAIL
)
339 expectedResponse
= dns
.message
.make_response(query
)
340 rrset
= dns
.rrset
.from_text(name
,
344 '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
345 expectedResponse
.answer
.append(rrset
)
347 raw
= query
.to_wire()
348 raw
= raw
+ b
'\x00\x00\xDE\xAD'
350 for method
in ("sendUDPQuery", "sendTCPQuery"):
351 sender
= getattr(self
, method
)
352 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
353 self
.assertTrue(receivedResponse
)
354 expectedResponse
.flags
= receivedResponse
.flags
355 self
.assertEquals(receivedResponse
, expectedResponse
)
357 def testTrailingReplacedUnsafe(self
):
359 Trailing data: Replace with null and/or non-ASCII bytes
362 name
= 'replaced-unsafe.trailing.tests.powerdns.com.'
363 query
= dns
.message
.make_query(name
, 'A', 'IN')
364 response
= dns
.message
.make_response(query
)
365 response
.set_rcode(dns
.rcode
.SERVFAIL
)
366 expectedResponse
= dns
.message
.make_response(query
)
367 rrset
= dns
.rrset
.from_text(name
,
371 '-0xB000DEAD42F09F91BBC3BE.echoed-hex.trailing.tests.powerdns.com.')
372 expectedResponse
.answer
.append(rrset
)
374 raw
= query
.to_wire()
375 raw
= raw
+ b
'TrailingData'
377 for method
in ("sendUDPQuery", "sendTCPQuery"):
378 sender
= getattr(self
, method
)
379 (_
, receivedResponse
) = sender(raw
, response
, rawQuery
=True)
380 self
.assertTrue(receivedResponse
)
381 expectedResponse
.flags
= receivedResponse
.flags
382 self
.assertEquals(receivedResponse
, expectedResponse
)