3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
13 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
14 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
15 newServer{address="127.0.0.1:%s"}
18 def testSpoofActionA(self
):
20 Spoofing: Spoof A via Action
22 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
23 check that dnsdist sends a spoofed result.
25 name
= 'spoofaction.spoofing.tests.powerdns.com.'
26 query
= dns
.message
.make_query(name
, 'A', 'IN')
27 # dnsdist set RA = RD for spoofed responses
28 query
.flags
&= ~dns
.flags
.RD
29 expectedResponse
= dns
.message
.make_response(query
)
30 rrset
= dns
.rrset
.from_text(name
,
35 expectedResponse
.answer
.append(rrset
)
37 for method
in ("sendUDPQuery", "sendTCPQuery"):
38 sender
= getattr(self
, method
)
39 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
40 self
.assertTrue(receivedResponse
)
41 self
.assertEquals(expectedResponse
, receivedResponse
)
43 def testSpoofActionAAAA(self
):
45 Spoofing: Spoof AAAA via Action
47 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
48 check that dnsdist sends a spoofed result.
50 name
= 'spoofaction.spoofing.tests.powerdns.com.'
51 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
52 # dnsdist set RA = RD for spoofed responses
53 query
.flags
&= ~dns
.flags
.RD
54 expectedResponse
= dns
.message
.make_response(query
)
55 rrset
= dns
.rrset
.from_text(name
,
60 expectedResponse
.answer
.append(rrset
)
62 for method
in ("sendUDPQuery", "sendTCPQuery"):
63 sender
= getattr(self
, method
)
64 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
65 self
.assertTrue(receivedResponse
)
66 self
.assertEquals(expectedResponse
, receivedResponse
)
68 def testSpoofActionCNAME(self
):
70 Spoofing: Spoof CNAME via Action
72 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
73 check that dnsdist sends a spoofed result.
75 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
76 query
= dns
.message
.make_query(name
, 'A', 'IN')
77 # dnsdist set RA = RD for spoofed responses
78 query
.flags
&= ~dns
.flags
.RD
79 expectedResponse
= dns
.message
.make_response(query
)
80 rrset
= dns
.rrset
.from_text(name
,
84 'cnameaction.spoofing.tests.powerdns.com.')
85 expectedResponse
.answer
.append(rrset
)
87 for method
in ("sendUDPQuery", "sendTCPQuery"):
88 sender
= getattr(self
, method
)
89 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
90 self
.assertTrue(receivedResponse
)
91 self
.assertEquals(expectedResponse
, receivedResponse
)
93 def testSpoofActionMultiA(self
):
95 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
97 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
98 check that dnsdist sends a spoofed result.
100 name
= 'multispoof.spoofing.tests.powerdns.com.'
101 query
= dns
.message
.make_query(name
, 'A', 'IN')
102 # dnsdist set RA = RD for spoofed responses
103 query
.flags
&= ~dns
.flags
.RD
104 expectedResponse
= dns
.message
.make_response(query
)
105 rrset
= dns
.rrset
.from_text(name
,
109 '192.0.2.2', '192.0.2.1')
110 expectedResponse
.answer
.append(rrset
)
112 for method
in ("sendUDPQuery", "sendTCPQuery"):
113 sender
= getattr(self
, method
)
114 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
115 self
.assertTrue(receivedResponse
)
116 self
.assertEquals(expectedResponse
, receivedResponse
)
118 def testSpoofActionMultiAAAA(self
):
120 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
122 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
123 check that dnsdist sends a spoofed result.
125 name
= 'multispoof.spoofing.tests.powerdns.com.'
126 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
127 # dnsdist set RA = RD for spoofed responses
128 query
.flags
&= ~dns
.flags
.RD
129 expectedResponse
= dns
.message
.make_response(query
)
130 rrset
= dns
.rrset
.from_text(name
,
134 '2001:DB8::1', '2001:DB8::2')
135 expectedResponse
.answer
.append(rrset
)
137 for method
in ("sendUDPQuery", "sendTCPQuery"):
138 sender
= getattr(self
, method
)
139 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
140 self
.assertTrue(receivedResponse
)
141 self
.assertEquals(expectedResponse
, receivedResponse
)
143 def testSpoofActionMultiANY(self
):
145 Spoofing: Spoof multiple addresses via AddDomainSpoof
147 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
148 check that dnsdist sends a spoofed result.
150 name
= 'multispoof.spoofing.tests.powerdns.com.'
151 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
152 # dnsdist set RA = RD for spoofed responses
153 query
.flags
&= ~dns
.flags
.RD
154 expectedResponse
= dns
.message
.make_response(query
)
156 rrset
= dns
.rrset
.from_text(name
,
160 '192.0.2.2', '192.0.2.1')
161 expectedResponse
.answer
.append(rrset
)
163 rrset
= dns
.rrset
.from_text(name
,
167 '2001:DB8::1', '2001:DB8::2')
168 expectedResponse
.answer
.append(rrset
)
170 for method
in ("sendUDPQuery", "sendTCPQuery"):
171 sender
= getattr(self
, method
)
172 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
173 self
.assertTrue(receivedResponse
)
174 self
.assertEquals(expectedResponse
, receivedResponse
)
176 def testSpoofActionSetAA(self
):
178 Spoofing: Spoof via Action, setting AA=1
180 name
= 'spoofaction-aa.spoofing.tests.powerdns.com.'
181 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
182 # dnsdist set RA = RD for spoofed responses
183 query
.flags
&= ~dns
.flags
.RD
184 expectedResponse
= dns
.message
.make_response(query
)
185 expectedResponse
.flags |
= dns
.flags
.AA
186 rrset
= dns
.rrset
.from_text(name
,
191 expectedResponse
.answer
.append(rrset
)
193 for method
in ("sendUDPQuery", "sendTCPQuery"):
194 sender
= getattr(self
, method
)
195 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
196 self
.assertTrue(receivedResponse
)
197 self
.assertEquals(expectedResponse
, receivedResponse
)
199 def testSpoofActionSetAD(self
):
201 Spoofing: Spoof via Action, setting AD=1
203 name
= 'spoofaction-ad.spoofing.tests.powerdns.com.'
204 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
205 # dnsdist set RA = RD for spoofed responses
206 query
.flags
&= ~dns
.flags
.RD
207 expectedResponse
= dns
.message
.make_response(query
)
208 expectedResponse
.flags |
= dns
.flags
.AD
209 rrset
= dns
.rrset
.from_text(name
,
214 expectedResponse
.answer
.append(rrset
)
216 for method
in ("sendUDPQuery", "sendTCPQuery"):
217 sender
= getattr(self
, method
)
218 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
219 self
.assertTrue(receivedResponse
)
220 self
.assertEquals(expectedResponse
, receivedResponse
)
222 def testSpoofActionSetRA(self
):
224 Spoofing: Spoof via Action, setting RA=1
226 name
= 'spoofaction-ra.spoofing.tests.powerdns.com.'
227 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
228 # dnsdist set RA = RD for spoofed responses
229 query
.flags
&= ~dns
.flags
.RD
230 expectedResponse
= dns
.message
.make_response(query
)
231 expectedResponse
.flags |
= dns
.flags
.RA
232 rrset
= dns
.rrset
.from_text(name
,
237 expectedResponse
.answer
.append(rrset
)
239 for method
in ("sendUDPQuery", "sendTCPQuery"):
240 sender
= getattr(self
, method
)
241 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
242 self
.assertTrue(receivedResponse
)
243 self
.assertEquals(expectedResponse
, receivedResponse
)
245 def testSpoofActionSetNoRA(self
):
247 Spoofing: Spoof via Action, setting RA=0
249 name
= 'spoofaction-nora.spoofing.tests.powerdns.com.'
250 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
251 expectedResponse
= dns
.message
.make_response(query
)
252 expectedResponse
.flags
&= ~dns
.flags
.RA
253 rrset
= dns
.rrset
.from_text(name
,
258 expectedResponse
.answer
.append(rrset
)
260 for method
in ("sendUDPQuery", "sendTCPQuery"):
261 sender
= getattr(self
, method
)
262 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
263 self
.assertTrue(receivedResponse
)
264 self
.assertEquals(expectedResponse
, receivedResponse
)
266 class TestSpoofingLuaSpoof(DNSDistTest
):
268 _config_template
= """
269 function spoof1rule(dq)
272 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
273 elseif(dq.qtype == 28) -- AAAA
275 return DNSAction.Spoof, "2001:DB8::1"
277 return DNSAction.None, ""
280 function spoof2rule(dq)
281 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
283 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
284 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
285 newServer{address="127.0.0.1:%s"}
288 def testLuaSpoofA(self
):
290 Spoofing: Spoofing an A via Lua
292 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
293 check that dnsdist sends a spoofed result.
295 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
296 query
= dns
.message
.make_query(name
, 'A', 'IN')
297 # dnsdist set RA = RD for spoofed responses
298 query
.flags
&= ~dns
.flags
.RD
299 expectedResponse
= dns
.message
.make_response(query
)
300 rrset
= dns
.rrset
.from_text(name
,
304 '192.0.2.1', '192.0.2.2')
305 expectedResponse
.answer
.append(rrset
)
307 for method
in ("sendUDPQuery", "sendTCPQuery"):
308 sender
= getattr(self
, method
)
309 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
310 self
.assertTrue(receivedResponse
)
311 self
.assertEquals(expectedResponse
, receivedResponse
)
313 def testLuaSpoofAAAA(self
):
315 Spoofing: Spoofing an AAAA via Lua
317 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
318 check that dnsdist sends a spoofed result.
320 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
321 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
322 # dnsdist set RA = RD for spoofed responses
323 query
.flags
&= ~dns
.flags
.RD
324 expectedResponse
= dns
.message
.make_response(query
)
325 rrset
= dns
.rrset
.from_text(name
,
330 expectedResponse
.answer
.append(rrset
)
332 for method
in ("sendUDPQuery", "sendTCPQuery"):
333 sender
= getattr(self
, method
)
334 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
335 self
.assertTrue(receivedResponse
)
336 self
.assertEquals(expectedResponse
, receivedResponse
)
338 def testLuaSpoofAWithCNAME(self
):
340 Spoofing: Spoofing an A with a CNAME via Lua
342 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
343 check that dnsdist sends a spoofed result.
345 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
346 query
= dns
.message
.make_query(name
, 'A', 'IN')
347 # dnsdist set RA = RD for spoofed responses
348 query
.flags
&= ~dns
.flags
.RD
349 expectedResponse
= dns
.message
.make_response(query
)
350 rrset
= dns
.rrset
.from_text(name
,
354 'spoofedcname.spoofing.tests.powerdns.com.')
355 expectedResponse
.answer
.append(rrset
)
357 for method
in ("sendUDPQuery", "sendTCPQuery"):
358 sender
= getattr(self
, method
)
359 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
360 self
.assertTrue(receivedResponse
)
361 self
.assertEquals(expectedResponse
, receivedResponse
)
363 def testLuaSpoofAAAAWithCNAME(self
):
365 Spoofing: Spoofing an AAAA with a CNAME via Lua
367 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
368 check that dnsdist sends a spoofed result.
370 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
371 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
372 # dnsdist set RA = RD for spoofed responses
373 query
.flags
&= ~dns
.flags
.RD
374 expectedResponse
= dns
.message
.make_response(query
)
375 rrset
= dns
.rrset
.from_text(name
,
379 'spoofedcname.spoofing.tests.powerdns.com.')
380 expectedResponse
.answer
.append(rrset
)
382 for method
in ("sendUDPQuery", "sendTCPQuery"):
383 sender
= getattr(self
, method
)
384 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
385 self
.assertTrue(receivedResponse
)
386 self
.assertEquals(expectedResponse
, receivedResponse
)
388 class TestSpoofingLuaWithStatistics(DNSDistTest
):
390 _config_template
= """
391 function spoof1rule(dq)
392 queriesCount = getStatisticsCounters()['queries']
393 if(queriesCount == 1) then
394 return DNSAction.Spoof, "192.0.2.1"
395 elseif(queriesCount == 2) then
396 return DNSAction.Spoof, "192.0.2.2"
398 return DNSAction.Spoof, "192.0.2.0"
401 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
402 newServer{address="127.0.0.1:%s"}
405 def testLuaSpoofBasedOnStatistics(self
):
407 Spoofing: Spoofing an A via Lua based on statistics counters
410 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
411 query
= dns
.message
.make_query(name
, 'A', 'IN')
412 # dnsdist set RA = RD for spoofed responses
413 query
.flags
&= ~dns
.flags
.RD
414 expectedResponse1
= dns
.message
.make_response(query
)
415 rrset
= dns
.rrset
.from_text(name
,
420 expectedResponse1
.answer
.append(rrset
)
421 expectedResponse2
= dns
.message
.make_response(query
)
422 rrset
= dns
.rrset
.from_text(name
,
427 expectedResponse2
.answer
.append(rrset
)
428 expectedResponseAfterwards
= dns
.message
.make_response(query
)
429 rrset
= dns
.rrset
.from_text(name
,
434 expectedResponseAfterwards
.answer
.append(rrset
)
436 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
437 self
.assertTrue(receivedResponse
)
438 self
.assertEquals(expectedResponse1
, receivedResponse
)
440 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
441 self
.assertTrue(receivedResponse
)
442 self
.assertEquals(expectedResponse2
, receivedResponse
)
444 for method
in ("sendUDPQuery", "sendTCPQuery"):
445 sender
= getattr(self
, method
)
446 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
447 self
.assertTrue(receivedResponse
)
448 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)