3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addDomainSpoof("spoof.spoofing.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
9 addDomainCNAMESpoof("cnamespoof.spoofing.tests.powerdns.com.", "cname.spoofing.tests.powerdns.com.")
10 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
11 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
12 addDomainSpoof("multispoof.spoofing.tests.powerdns.com", {"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})
13 newServer{address="127.0.0.1:%s"}
20 Send an A query to "spoof.spoofing.tests.powerdns.com.",
21 check that dnsdist sends a spoofed result.
23 name
= 'spoof.spoofing.tests.powerdns.com.'
24 query
= dns
.message
.make_query(name
, 'A', 'IN')
25 # dnsdist set RA = RD for spoofed responses
26 query
.flags
&= ~dns
.flags
.RD
27 expectedResponse
= dns
.message
.make_response(query
)
28 rrset
= dns
.rrset
.from_text(name
,
33 expectedResponse
.answer
.append(rrset
)
35 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
36 self
.assertTrue(receivedResponse
)
37 self
.assertEquals(expectedResponse
, receivedResponse
)
39 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
40 self
.assertTrue(receivedResponse
)
41 self
.assertEquals(expectedResponse
, receivedResponse
)
43 def testSpoofAAAA(self
):
47 Send an AAAA query to "spoof.spoofing.tests.powerdns.com.",
48 check that dnsdist sends a spoofed result.
50 name
= 'spoof.spoofing.tests.powerdns.com.'
51 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
52 # dnsdist set RA = RD for spoofed responses
53 query
.flags
&= ~dns
.flags
.RD
54 expectedResponse
= dns
.message
.make_response(query
)
55 rrset
= dns
.rrset
.from_text(name
,
60 expectedResponse
.answer
.append(rrset
)
62 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
63 self
.assertTrue(receivedResponse
)
64 self
.assertEquals(expectedResponse
, receivedResponse
)
66 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
67 self
.assertTrue(receivedResponse
)
68 self
.assertEquals(expectedResponse
, receivedResponse
)
70 def testSpoofCNAME(self
):
74 Send an A query for "cnamespoof.spoofing.tests.powerdns.com.",
75 check that dnsdist sends a spoofed result.
77 name
= 'cnamespoof.spoofing.tests.powerdns.com.'
78 query
= dns
.message
.make_query(name
, 'A', 'IN')
79 # dnsdist set RA = RD for spoofed responses
80 query
.flags
&= ~dns
.flags
.RD
81 expectedResponse
= dns
.message
.make_response(query
)
82 rrset
= dns
.rrset
.from_text(name
,
86 'cname.spoofing.tests.powerdns.com.')
87 expectedResponse
.answer
.append(rrset
)
89 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
90 self
.assertTrue(receivedResponse
)
91 self
.assertEquals(expectedResponse
, receivedResponse
)
93 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
94 self
.assertTrue(receivedResponse
)
95 self
.assertEquals(expectedResponse
, receivedResponse
)
97 def testSpoofActionA(self
):
99 Spoofing: Spoof A via Action
101 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
102 check that dnsdist sends a spoofed result.
104 name
= 'spoofaction.spoofing.tests.powerdns.com.'
105 query
= dns
.message
.make_query(name
, 'A', 'IN')
106 # dnsdist set RA = RD for spoofed responses
107 query
.flags
&= ~dns
.flags
.RD
108 expectedResponse
= dns
.message
.make_response(query
)
109 rrset
= dns
.rrset
.from_text(name
,
114 expectedResponse
.answer
.append(rrset
)
116 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
117 self
.assertTrue(receivedResponse
)
118 self
.assertEquals(expectedResponse
, receivedResponse
)
120 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
121 self
.assertTrue(receivedResponse
)
122 self
.assertEquals(expectedResponse
, receivedResponse
)
124 def testSpoofActionAAAA(self
):
126 Spoofing: Spoof AAAA via Action
128 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
129 check that dnsdist sends a spoofed result.
131 name
= 'spoofaction.spoofing.tests.powerdns.com.'
132 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
133 # dnsdist set RA = RD for spoofed responses
134 query
.flags
&= ~dns
.flags
.RD
135 expectedResponse
= dns
.message
.make_response(query
)
136 rrset
= dns
.rrset
.from_text(name
,
141 expectedResponse
.answer
.append(rrset
)
143 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
144 self
.assertTrue(receivedResponse
)
145 self
.assertEquals(expectedResponse
, receivedResponse
)
147 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
148 self
.assertTrue(receivedResponse
)
149 self
.assertEquals(expectedResponse
, receivedResponse
)
151 def testSpoofActionCNAME(self
):
153 Spoofing: Spoof CNAME via Action
155 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
156 check that dnsdist sends a spoofed result.
158 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
159 query
= dns
.message
.make_query(name
, 'A', 'IN')
160 # dnsdist set RA = RD for spoofed responses
161 query
.flags
&= ~dns
.flags
.RD
162 expectedResponse
= dns
.message
.make_response(query
)
163 rrset
= dns
.rrset
.from_text(name
,
167 'cnameaction.spoofing.tests.powerdns.com.')
168 expectedResponse
.answer
.append(rrset
)
170 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
171 self
.assertTrue(receivedResponse
)
172 self
.assertEquals(expectedResponse
, receivedResponse
)
174 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
175 self
.assertTrue(receivedResponse
)
176 self
.assertEquals(expectedResponse
, receivedResponse
)
178 def testSpoofActionMultiA(self
):
180 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
182 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
183 check that dnsdist sends a spoofed result.
185 name
= 'multispoof.spoofing.tests.powerdns.com.'
186 query
= dns
.message
.make_query(name
, 'A', 'IN')
187 # dnsdist set RA = RD for spoofed responses
188 query
.flags
&= ~dns
.flags
.RD
189 expectedResponse
= dns
.message
.make_response(query
)
190 rrset
= dns
.rrset
.from_text(name
,
194 '192.0.2.2', '192.0.2.1')
195 expectedResponse
.answer
.append(rrset
)
197 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
198 self
.assertTrue(receivedResponse
)
199 self
.assertEquals(expectedResponse
, receivedResponse
)
201 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
202 self
.assertTrue(receivedResponse
)
203 self
.assertEquals(expectedResponse
, receivedResponse
)
205 def testSpoofActionMultiAAAA(self
):
207 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
209 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
210 check that dnsdist sends a spoofed result.
212 name
= 'multispoof.spoofing.tests.powerdns.com.'
213 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
214 # dnsdist set RA = RD for spoofed responses
215 query
.flags
&= ~dns
.flags
.RD
216 expectedResponse
= dns
.message
.make_response(query
)
217 rrset
= dns
.rrset
.from_text(name
,
221 '2001:DB8::1', '2001:DB8::2')
222 expectedResponse
.answer
.append(rrset
)
224 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
225 self
.assertTrue(receivedResponse
)
226 self
.assertEquals(expectedResponse
, receivedResponse
)
228 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
229 self
.assertTrue(receivedResponse
)
230 self
.assertEquals(expectedResponse
, receivedResponse
)
232 def testSpoofActionMultiANY(self
):
234 Spoofing: Spoof multiple addresses via AddDomainSpoof
236 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
237 check that dnsdist sends a spoofed result.
239 name
= 'multispoof.spoofing.tests.powerdns.com.'
240 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
241 # dnsdist set RA = RD for spoofed responses
242 query
.flags
&= ~dns
.flags
.RD
243 expectedResponse
= dns
.message
.make_response(query
)
245 rrset
= dns
.rrset
.from_text(name
,
249 '192.0.2.2', '192.0.2.1')
250 expectedResponse
.answer
.append(rrset
)
252 rrset
= dns
.rrset
.from_text(name
,
256 '2001:DB8::1', '2001:DB8::2')
257 expectedResponse
.answer
.append(rrset
)
259 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
260 self
.assertTrue(receivedResponse
)
261 self
.assertEquals(expectedResponse
, receivedResponse
)
263 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
264 self
.assertTrue(receivedResponse
)
265 self
.assertEquals(expectedResponse
, receivedResponse
)
267 class TestSpoofingLuaSpoof(DNSDistTest
):
269 _config_template
= """
270 function spoof1rule(dq)
273 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
274 elseif(dq.qtype == 28) -- AAAA
276 return DNSAction.Spoof, "2001:DB8::1"
278 return DNSAction.None, ""
281 function spoof2rule(dq)
282 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
284 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
285 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
286 newServer{address="127.0.0.1:%s"}
289 def testLuaSpoofA(self
):
291 Spoofing: Spoofing an A via Lua
293 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
294 check that dnsdist sends a spoofed result.
296 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
297 query
= dns
.message
.make_query(name
, 'A', 'IN')
298 # dnsdist set RA = RD for spoofed responses
299 query
.flags
&= ~dns
.flags
.RD
300 expectedResponse
= dns
.message
.make_response(query
)
301 rrset
= dns
.rrset
.from_text(name
,
305 '192.0.2.1', '192.0.2.2')
306 expectedResponse
.answer
.append(rrset
)
308 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
309 self
.assertTrue(receivedResponse
)
310 self
.assertEquals(expectedResponse
, receivedResponse
)
312 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
313 self
.assertTrue(receivedResponse
)
314 self
.assertEquals(expectedResponse
, receivedResponse
)
316 def testLuaSpoofAAAA(self
):
318 Spoofing: Spoofing an AAAA via Lua
320 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
321 check that dnsdist sends a spoofed result.
323 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
324 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
325 # dnsdist set RA = RD for spoofed responses
326 query
.flags
&= ~dns
.flags
.RD
327 expectedResponse
= dns
.message
.make_response(query
)
328 rrset
= dns
.rrset
.from_text(name
,
333 expectedResponse
.answer
.append(rrset
)
335 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
336 self
.assertTrue(receivedResponse
)
337 self
.assertEquals(expectedResponse
, receivedResponse
)
339 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
340 self
.assertTrue(receivedResponse
)
341 self
.assertEquals(expectedResponse
, receivedResponse
)
343 def testLuaSpoofAWithCNAME(self
):
345 Spoofing: Spoofing an A with a CNAME via Lua
347 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
348 check that dnsdist sends a spoofed result.
350 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
351 query
= dns
.message
.make_query(name
, 'A', 'IN')
352 # dnsdist set RA = RD for spoofed responses
353 query
.flags
&= ~dns
.flags
.RD
354 expectedResponse
= dns
.message
.make_response(query
)
355 rrset
= dns
.rrset
.from_text(name
,
359 'spoofedcname.spoofing.tests.powerdns.com.')
360 expectedResponse
.answer
.append(rrset
)
362 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
363 self
.assertTrue(receivedResponse
)
364 self
.assertEquals(expectedResponse
, receivedResponse
)
366 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
367 self
.assertTrue(receivedResponse
)
368 self
.assertEquals(expectedResponse
, receivedResponse
)
370 def testLuaSpoofAAAAWithCNAME(self
):
372 Spoofing: Spoofing an AAAA with a CNAME via Lua
374 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
375 check that dnsdist sends a spoofed result.
377 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
378 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
379 # dnsdist set RA = RD for spoofed responses
380 query
.flags
&= ~dns
.flags
.RD
381 expectedResponse
= dns
.message
.make_response(query
)
382 rrset
= dns
.rrset
.from_text(name
,
386 'spoofedcname.spoofing.tests.powerdns.com.')
387 expectedResponse
.answer
.append(rrset
)
389 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
390 self
.assertTrue(receivedResponse
)
391 self
.assertEquals(expectedResponse
, receivedResponse
)
393 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
394 self
.assertTrue(receivedResponse
)
395 self
.assertEquals(expectedResponse
, receivedResponse
)
397 class TestSpoofingLuaWithStatistics(DNSDistTest
):
399 _config_template
= """
400 function spoof1rule(dq)
401 queriesCount = getStatisticsCounters()['queries']
402 if(queriesCount == 1) then
403 return DNSAction.Spoof, "192.0.2.1"
404 elseif(queriesCount == 2) then
405 return DNSAction.Spoof, "192.0.2.2"
407 return DNSAction.Spoof, "192.0.2.0"
410 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
411 newServer{address="127.0.0.1:%s"}
414 def testLuaSpoofBasedOnStatistics(self
):
416 Spoofing: Spoofing an A via Lua based on statistics counters
419 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
420 query
= dns
.message
.make_query(name
, 'A', 'IN')
421 # dnsdist set RA = RD for spoofed responses
422 query
.flags
&= ~dns
.flags
.RD
423 expectedResponse1
= dns
.message
.make_response(query
)
424 rrset
= dns
.rrset
.from_text(name
,
429 expectedResponse1
.answer
.append(rrset
)
430 expectedResponse2
= dns
.message
.make_response(query
)
431 rrset
= dns
.rrset
.from_text(name
,
436 expectedResponse2
.answer
.append(rrset
)
437 expectedResponseAfterwards
= dns
.message
.make_response(query
)
438 rrset
= dns
.rrset
.from_text(name
,
443 expectedResponseAfterwards
.answer
.append(rrset
)
445 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
446 self
.assertTrue(receivedResponse
)
447 self
.assertEquals(expectedResponse1
, receivedResponse
)
449 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
450 self
.assertTrue(receivedResponse
)
451 self
.assertEquals(expectedResponse2
, receivedResponse
)
453 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
454 self
.assertTrue(receivedResponse
)
455 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)
457 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
458 self
.assertTrue(receivedResponse
)
459 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)