3 from dnsdisttests
import DNSDistTest
5 class TestSpoofingSpoof(DNSDistTest
):
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
10 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
11 newServer{address="127.0.0.1:%s"}
14 def testSpoofActionA(self
):
16 Spoofing: Spoof A via Action
18 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
19 check that dnsdist sends a spoofed result.
21 name
= 'spoofaction.spoofing.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 # dnsdist set RA = RD for spoofed responses
24 query
.flags
&= ~dns
.flags
.RD
25 expectedResponse
= dns
.message
.make_response(query
)
26 rrset
= dns
.rrset
.from_text(name
,
31 expectedResponse
.answer
.append(rrset
)
33 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
34 self
.assertTrue(receivedResponse
)
35 self
.assertEquals(expectedResponse
, receivedResponse
)
37 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
38 self
.assertTrue(receivedResponse
)
39 self
.assertEquals(expectedResponse
, receivedResponse
)
41 def testSpoofActionAAAA(self
):
43 Spoofing: Spoof AAAA via Action
45 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
46 check that dnsdist sends a spoofed result.
48 name
= 'spoofaction.spoofing.tests.powerdns.com.'
49 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
50 # dnsdist set RA = RD for spoofed responses
51 query
.flags
&= ~dns
.flags
.RD
52 expectedResponse
= dns
.message
.make_response(query
)
53 rrset
= dns
.rrset
.from_text(name
,
58 expectedResponse
.answer
.append(rrset
)
60 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
61 self
.assertTrue(receivedResponse
)
62 self
.assertEquals(expectedResponse
, receivedResponse
)
64 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
65 self
.assertTrue(receivedResponse
)
66 self
.assertEquals(expectedResponse
, receivedResponse
)
68 def testSpoofActionCNAME(self
):
70 Spoofing: Spoof CNAME via Action
72 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
73 check that dnsdist sends a spoofed result.
75 name
= 'cnamespoofaction.spoofing.tests.powerdns.com.'
76 query
= dns
.message
.make_query(name
, 'A', 'IN')
77 # dnsdist set RA = RD for spoofed responses
78 query
.flags
&= ~dns
.flags
.RD
79 expectedResponse
= dns
.message
.make_response(query
)
80 rrset
= dns
.rrset
.from_text(name
,
84 'cnameaction.spoofing.tests.powerdns.com.')
85 expectedResponse
.answer
.append(rrset
)
87 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
88 self
.assertTrue(receivedResponse
)
89 self
.assertEquals(expectedResponse
, receivedResponse
)
91 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
92 self
.assertTrue(receivedResponse
)
93 self
.assertEquals(expectedResponse
, receivedResponse
)
95 def testSpoofActionMultiA(self
):
97 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
99 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
100 check that dnsdist sends a spoofed result.
102 name
= 'multispoof.spoofing.tests.powerdns.com.'
103 query
= dns
.message
.make_query(name
, 'A', 'IN')
104 # dnsdist set RA = RD for spoofed responses
105 query
.flags
&= ~dns
.flags
.RD
106 expectedResponse
= dns
.message
.make_response(query
)
107 rrset
= dns
.rrset
.from_text(name
,
111 '192.0.2.2', '192.0.2.1')
112 expectedResponse
.answer
.append(rrset
)
114 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
115 self
.assertTrue(receivedResponse
)
116 self
.assertEquals(expectedResponse
, receivedResponse
)
118 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
119 self
.assertTrue(receivedResponse
)
120 self
.assertEquals(expectedResponse
, receivedResponse
)
122 def testSpoofActionMultiAAAA(self
):
124 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
126 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
127 check that dnsdist sends a spoofed result.
129 name
= 'multispoof.spoofing.tests.powerdns.com.'
130 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
131 # dnsdist set RA = RD for spoofed responses
132 query
.flags
&= ~dns
.flags
.RD
133 expectedResponse
= dns
.message
.make_response(query
)
134 rrset
= dns
.rrset
.from_text(name
,
138 '2001:DB8::1', '2001:DB8::2')
139 expectedResponse
.answer
.append(rrset
)
141 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
142 self
.assertTrue(receivedResponse
)
143 self
.assertEquals(expectedResponse
, receivedResponse
)
145 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
146 self
.assertTrue(receivedResponse
)
147 self
.assertEquals(expectedResponse
, receivedResponse
)
149 def testSpoofActionMultiANY(self
):
151 Spoofing: Spoof multiple addresses via AddDomainSpoof
153 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
154 check that dnsdist sends a spoofed result.
156 name
= 'multispoof.spoofing.tests.powerdns.com.'
157 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
158 # dnsdist set RA = RD for spoofed responses
159 query
.flags
&= ~dns
.flags
.RD
160 expectedResponse
= dns
.message
.make_response(query
)
162 rrset
= dns
.rrset
.from_text(name
,
166 '192.0.2.2', '192.0.2.1')
167 expectedResponse
.answer
.append(rrset
)
169 rrset
= dns
.rrset
.from_text(name
,
173 '2001:DB8::1', '2001:DB8::2')
174 expectedResponse
.answer
.append(rrset
)
176 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
177 self
.assertTrue(receivedResponse
)
178 self
.assertEquals(expectedResponse
, receivedResponse
)
180 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
181 self
.assertTrue(receivedResponse
)
182 self
.assertEquals(expectedResponse
, receivedResponse
)
184 class TestSpoofingLuaSpoof(DNSDistTest
):
186 _config_template
= """
187 function spoof1rule(dq)
190 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
191 elseif(dq.qtype == 28) -- AAAA
193 return DNSAction.Spoof, "2001:DB8::1"
195 return DNSAction.None, ""
198 function spoof2rule(dq)
199 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
201 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
202 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
203 newServer{address="127.0.0.1:%s"}
206 def testLuaSpoofA(self
):
208 Spoofing: Spoofing an A via Lua
210 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
211 check that dnsdist sends a spoofed result.
213 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
214 query
= dns
.message
.make_query(name
, 'A', 'IN')
215 # dnsdist set RA = RD for spoofed responses
216 query
.flags
&= ~dns
.flags
.RD
217 expectedResponse
= dns
.message
.make_response(query
)
218 rrset
= dns
.rrset
.from_text(name
,
222 '192.0.2.1', '192.0.2.2')
223 expectedResponse
.answer
.append(rrset
)
225 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
226 self
.assertTrue(receivedResponse
)
227 self
.assertEquals(expectedResponse
, receivedResponse
)
229 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
230 self
.assertTrue(receivedResponse
)
231 self
.assertEquals(expectedResponse
, receivedResponse
)
233 def testLuaSpoofAAAA(self
):
235 Spoofing: Spoofing an AAAA via Lua
237 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
238 check that dnsdist sends a spoofed result.
240 name
= 'luaspoof1.spoofing.tests.powerdns.com.'
241 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
242 # dnsdist set RA = RD for spoofed responses
243 query
.flags
&= ~dns
.flags
.RD
244 expectedResponse
= dns
.message
.make_response(query
)
245 rrset
= dns
.rrset
.from_text(name
,
250 expectedResponse
.answer
.append(rrset
)
252 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
253 self
.assertTrue(receivedResponse
)
254 self
.assertEquals(expectedResponse
, receivedResponse
)
256 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
257 self
.assertTrue(receivedResponse
)
258 self
.assertEquals(expectedResponse
, receivedResponse
)
260 def testLuaSpoofAWithCNAME(self
):
262 Spoofing: Spoofing an A with a CNAME via Lua
264 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
265 check that dnsdist sends a spoofed result.
267 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
268 query
= dns
.message
.make_query(name
, 'A', 'IN')
269 # dnsdist set RA = RD for spoofed responses
270 query
.flags
&= ~dns
.flags
.RD
271 expectedResponse
= dns
.message
.make_response(query
)
272 rrset
= dns
.rrset
.from_text(name
,
276 'spoofedcname.spoofing.tests.powerdns.com.')
277 expectedResponse
.answer
.append(rrset
)
279 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
280 self
.assertTrue(receivedResponse
)
281 self
.assertEquals(expectedResponse
, receivedResponse
)
283 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
284 self
.assertTrue(receivedResponse
)
285 self
.assertEquals(expectedResponse
, receivedResponse
)
287 def testLuaSpoofAAAAWithCNAME(self
):
289 Spoofing: Spoofing an AAAA with a CNAME via Lua
291 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
292 check that dnsdist sends a spoofed result.
294 name
= 'luaspoof2.spoofing.tests.powerdns.com.'
295 query
= dns
.message
.make_query(name
, 'AAAA', 'IN')
296 # dnsdist set RA = RD for spoofed responses
297 query
.flags
&= ~dns
.flags
.RD
298 expectedResponse
= dns
.message
.make_response(query
)
299 rrset
= dns
.rrset
.from_text(name
,
303 'spoofedcname.spoofing.tests.powerdns.com.')
304 expectedResponse
.answer
.append(rrset
)
306 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
307 self
.assertTrue(receivedResponse
)
308 self
.assertEquals(expectedResponse
, receivedResponse
)
310 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
311 self
.assertTrue(receivedResponse
)
312 self
.assertEquals(expectedResponse
, receivedResponse
)
314 class TestSpoofingLuaWithStatistics(DNSDistTest
):
316 _config_template
= """
317 function spoof1rule(dq)
318 queriesCount = getStatisticsCounters()['queries']
319 if(queriesCount == 1) then
320 return DNSAction.Spoof, "192.0.2.1"
321 elseif(queriesCount == 2) then
322 return DNSAction.Spoof, "192.0.2.2"
324 return DNSAction.Spoof, "192.0.2.0"
327 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
328 newServer{address="127.0.0.1:%s"}
331 def testLuaSpoofBasedOnStatistics(self
):
333 Spoofing: Spoofing an A via Lua based on statistics counters
336 name
= 'luaspoofwithstats.spoofing.tests.powerdns.com.'
337 query
= dns
.message
.make_query(name
, 'A', 'IN')
338 # dnsdist set RA = RD for spoofed responses
339 query
.flags
&= ~dns
.flags
.RD
340 expectedResponse1
= dns
.message
.make_response(query
)
341 rrset
= dns
.rrset
.from_text(name
,
346 expectedResponse1
.answer
.append(rrset
)
347 expectedResponse2
= dns
.message
.make_response(query
)
348 rrset
= dns
.rrset
.from_text(name
,
353 expectedResponse2
.answer
.append(rrset
)
354 expectedResponseAfterwards
= dns
.message
.make_response(query
)
355 rrset
= dns
.rrset
.from_text(name
,
360 expectedResponseAfterwards
.answer
.append(rrset
)
362 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
363 self
.assertTrue(receivedResponse
)
364 self
.assertEquals(expectedResponse1
, receivedResponse
)
366 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
367 self
.assertTrue(receivedResponse
)
368 self
.assertEquals(expectedResponse2
, receivedResponse
)
370 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
371 self
.assertTrue(receivedResponse
)
372 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)
374 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
375 self
.assertTrue(receivedResponse
)
376 self
.assertEquals(expectedResponseAfterwards
, receivedResponse
)