3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
10 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
11 newServer{address="127.0.0.1:%s"}
14 def testSpoofActionA(self
):
16 Spoofing: Spoof A via Action
18 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
19 check that dnsdist sends a spoofed result.
21 name
= 'spoofaction.spoofing.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 # dnsdist set RA = RD for spoofed responses
24 query
.flags
&= ~dns
.flags
.RD
25 expectedResponse
= dns
.message
.make_response(query
)
26 rrset
= dns
.rrset
.from_text(name
,
31 expectedResponse
.answer
.append(rrset
)
33 for method
in ("sendUDPQuery", "sendTCPQuery"):
34 sender
= getattr(self
, method
)
35 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
36 self
.assertTrue(receivedResponse
)
37 self
.assertEquals(expectedResponse
, receivedResponse
)
39 def testSpoofActionAAAA(self
):
41 Spoofing: Spoof AAAA via Action
43 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
44 check that dnsdist sends a spoofed result.
46 name
= 'spoofaction.spoofing.tests.powerdns.com.'
47 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
48 # dnsdist set RA = RD for spoofed responses
49 query
.flags
&= ~dns
.flags
.RD
50 expectedResponse
= dns
.message
.make_response(query
)
51 rrset
= dns
.rrset
.from_text(name
,
56 expectedResponse
.answer
.append(rrset
)
58 for method
in ("sendUDPQuery", "sendTCPQuery"):
59 sender
= getattr(self
, method
)
60 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
61 self
.assertTrue(receivedResponse
)
62 self
.assertEquals(expectedResponse
, receivedResponse
)
64 def testSpoofActionCNAME(self
):
66 Spoofing: Spoof CNAME via Action
68 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
69 check that dnsdist sends a spoofed result.
71 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
72 query
= dns
.message
.make_query(name
, 'A', 'IN')
73 # dnsdist set RA = RD for spoofed responses
74 query
.flags
&= ~dns
.flags
.RD
75 expectedResponse
= dns
.message
.make_response(query
)
76 rrset
= dns
.rrset
.from_text(name
,
80 'cnameaction.spoofing.tests.powerdns.com.')
81 expectedResponse
.answer
.append(rrset
)
83 for method
in ("sendUDPQuery", "sendTCPQuery"):
84 sender
= getattr(self
, method
)
85 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
86 self
.assertTrue(receivedResponse
)
87 self
.assertEquals(expectedResponse
, receivedResponse
)
89 def testSpoofActionMultiA(self
):
91 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
93 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
94 check that dnsdist sends a spoofed result.
96 name
= 'multispoof.spoofing.tests.powerdns.com.'
97 query
= dns
.message
.make_query(name
, 'A', 'IN')
98 # dnsdist set RA = RD for spoofed responses
99 query
.flags
&= ~dns
.flags
.RD
100 expectedResponse
= dns
.message
.make_response(query
)
101 rrset
= dns
.rrset
.from_text(name
,
105 '192.0.2.2', '192.0.2.1')
106 expectedResponse
.answer
.append(rrset
)
108 for method
in ("sendUDPQuery", "sendTCPQuery"):
109 sender
= getattr(self
, method
)
110 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
111 self
.assertTrue(receivedResponse
)
112 self
.assertEquals(expectedResponse
, receivedResponse
)
114 def testSpoofActionMultiAAAA(self
):
116 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
118 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
119 check that dnsdist sends a spoofed result.
121 name
= 'multispoof.spoofing.tests.powerdns.com.'
122 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
123 # dnsdist set RA = RD for spoofed responses
124 query
.flags
&= ~dns
.flags
.RD
125 expectedResponse
= dns
.message
.make_response(query
)
126 rrset
= dns
.rrset
.from_text(name
,
130 '2001:DB8::1', '2001:DB8::2')
131 expectedResponse
.answer
.append(rrset
)
133 for method
in ("sendUDPQuery", "sendTCPQuery"):
134 sender
= getattr(self
, method
)
135 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
136 self
.assertTrue(receivedResponse
)
137 self
.assertEquals(expectedResponse
, receivedResponse
)
139 def testSpoofActionMultiANY(self
):
141 Spoofing: Spoof multiple addresses via AddDomainSpoof
143 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
144 check that dnsdist sends a spoofed result.
146 name
= 'multispoof.spoofing.tests.powerdns.com.'
147 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
148 # dnsdist set RA = RD for spoofed responses
149 query
.flags
&= ~dns
.flags
.RD
150 expectedResponse
= dns
.message
.make_response(query
)
152 rrset
= dns
.rrset
.from_text(name
,
156 '192.0.2.2', '192.0.2.1')
157 expectedResponse
.answer
.append(rrset
)
159 rrset
= dns
.rrset
.from_text(name
,
163 '2001:DB8::1', '2001:DB8::2')
164 expectedResponse
.answer
.append(rrset
)
166 for method
in ("sendUDPQuery", "sendTCPQuery"):
167 sender
= getattr(self
, method
)
168 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
169 self
.assertTrue(receivedResponse
)
170 self
.assertEquals(expectedResponse
, receivedResponse
)
172 class TestSpoofingLuaSpoof(DNSDistTest
):
174 _config_template
= """
175 function spoof1rule(dq)
178 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
179 elseif(dq.qtype == 28) -- AAAA
181 return DNSAction.Spoof, "2001:DB8::1"
183 return DNSAction.None, ""
186 function spoof2rule(dq)
187 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
189 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
190 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
191 newServer{address="127.0.0.1:%s"}
194 def testLuaSpoofA(self
):
196 Spoofing: Spoofing an A via Lua
198 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
199 check that dnsdist sends a spoofed result.
201 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
202 query
= dns
.message
.make_query(name
, 'A', 'IN')
203 # dnsdist set RA = RD for spoofed responses
204 query
.flags
&= ~dns
.flags
.RD
205 expectedResponse
= dns
.message
.make_response(query
)
206 rrset
= dns
.rrset
.from_text(name
,
210 '192.0.2.1', '192.0.2.2')
211 expectedResponse
.answer
.append(rrset
)
213 for method
in ("sendUDPQuery", "sendTCPQuery"):
214 sender
= getattr(self
, method
)
215 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
216 self
.assertTrue(receivedResponse
)
217 self
.assertEquals(expectedResponse
, receivedResponse
)
219 def testLuaSpoofAAAA(self
):
221 Spoofing: Spoofing an AAAA via Lua
223 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
224 check that dnsdist sends a spoofed result.
226 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
227 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
228 # dnsdist set RA = RD for spoofed responses
229 query
.flags
&= ~dns
.flags
.RD
230 expectedResponse
= dns
.message
.make_response(query
)
231 rrset
= dns
.rrset
.from_text(name
,
236 expectedResponse
.answer
.append(rrset
)
238 for method
in ("sendUDPQuery", "sendTCPQuery"):
239 sender
= getattr(self
, method
)
240 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
241 self
.assertTrue(receivedResponse
)
242 self
.assertEquals(expectedResponse
, receivedResponse
)
244 def testLuaSpoofAWithCNAME(self
):
246 Spoofing: Spoofing an A with a CNAME via Lua
248 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
249 check that dnsdist sends a spoofed result.
251 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
252 query
= dns
.message
.make_query(name
, 'A', 'IN')
253 # dnsdist set RA = RD for spoofed responses
254 query
.flags
&= ~dns
.flags
.RD
255 expectedResponse
= dns
.message
.make_response(query
)
256 rrset
= dns
.rrset
.from_text(name
,
260 'spoofedcname.spoofing.tests.powerdns.com.')
261 expectedResponse
.answer
.append(rrset
)
263 for method
in ("sendUDPQuery", "sendTCPQuery"):
264 sender
= getattr(self
, method
)
265 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
266 self
.assertTrue(receivedResponse
)
267 self
.assertEquals(expectedResponse
, receivedResponse
)
269 def testLuaSpoofAAAAWithCNAME(self
):
271 Spoofing: Spoofing an AAAA with a CNAME via Lua
273 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
274 check that dnsdist sends a spoofed result.
276 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
277 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
278 # dnsdist set RA = RD for spoofed responses
279 query
.flags
&= ~dns
.flags
.RD
280 expectedResponse
= dns
.message
.make_response(query
)
281 rrset
= dns
.rrset
.from_text(name
,
285 'spoofedcname.spoofing.tests.powerdns.com.')
286 expectedResponse
.answer
.append(rrset
)
288 for method
in ("sendUDPQuery", "sendTCPQuery"):
289 sender
= getattr(self
, method
)
290 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
291 self
.assertTrue(receivedResponse
)
292 self
.assertEquals(expectedResponse
, receivedResponse
)
294 class TestSpoofingLuaWithStatistics(DNSDistTest
):
296 _config_template
= """
297 function spoof1rule(dq)
298 queriesCount = getStatisticsCounters()['queries']
299 if(queriesCount == 1) then
300 return DNSAction.Spoof, "192.0.2.1"
301 elseif(queriesCount == 2) then
302 return DNSAction.Spoof, "192.0.2.2"
304 return DNSAction.Spoof, "192.0.2.0"
307 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
308 newServer{address="127.0.0.1:%s"}
311 def testLuaSpoofBasedOnStatistics(self
):
313 Spoofing: Spoofing an A via Lua based on statistics counters
316 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
317 query
= dns
.message
.make_query(name
, 'A', 'IN')
318 # dnsdist set RA = RD for spoofed responses
319 query
.flags
&= ~dns
.flags
.RD
320 expectedResponse1
= dns
.message
.make_response(query
)
321 rrset
= dns
.rrset
.from_text(name
,
326 expectedResponse1
.answer
.append(rrset
)
327 expectedResponse2
= dns
.message
.make_response(query
)
328 rrset
= dns
.rrset
.from_text(name
,
333 expectedResponse2
.answer
.append(rrset
)
334 expectedResponseAfterwards
= dns
.message
.make_response(query
)
335 rrset
= dns
.rrset
.from_text(name
,
340 expectedResponseAfterwards
.answer
.append(rrset
)
342 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
343 self
.assertTrue(receivedResponse
)
344 self
.assertEquals(expectedResponse1
, receivedResponse
)
346 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
347 self
.assertTrue(receivedResponse
)
348 self
.assertEquals(expectedResponse2
, receivedResponse
)
350 for method
in ("sendUDPQuery", "sendTCPQuery"):
351 sender
= getattr(self
, method
)
352 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
353 self
.assertTrue(receivedResponse
)
354 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)