]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
CommitLineData
903853f4
RG
1#!/usr/bin/env python
2import dns
3from dnsdisttests import DNSDistTest
4
5class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
903853f4 8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
fbb45df0
RG
9 addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
10 addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
11 addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
12 addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
903853f4 13 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
6bb38cd6 14 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
903853f4
RG
15 newServer{address="127.0.0.1:%s"}
16 """
17
903853f4
RG
18 def testSpoofActionA(self):
19 """
20 Spoofing: Spoof A via Action
21
22 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
23 check that dnsdist sends a spoofed result.
24 """
25 name = 'spoofaction.spoofing.tests.powerdns.com.'
26 query = dns.message.make_query(name, 'A', 'IN')
27 # dnsdist set RA = RD for spoofed responses
28 query.flags &= ~dns.flags.RD
29 expectedResponse = dns.message.make_response(query)
30 rrset = dns.rrset.from_text(name,
31 60,
32 dns.rdataclass.IN,
33 dns.rdatatype.A,
34 '192.0.2.1')
35 expectedResponse.answer.append(rrset)
36
6ca2e796
RG
37 for method in ("sendUDPQuery", "sendTCPQuery"):
38 sender = getattr(self, method)
39 (_, receivedResponse) = sender(query, response=None, useQueue=False)
40 self.assertTrue(receivedResponse)
41 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
42
43 def testSpoofActionAAAA(self):
44 """
45 Spoofing: Spoof AAAA via Action
46
47 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
48 check that dnsdist sends a spoofed result.
49 """
50 name = 'spoofaction.spoofing.tests.powerdns.com.'
51 query = dns.message.make_query(name, 'AAAA', 'IN')
52 # dnsdist set RA = RD for spoofed responses
53 query.flags &= ~dns.flags.RD
54 expectedResponse = dns.message.make_response(query)
55 rrset = dns.rrset.from_text(name,
56 60,
57 dns.rdataclass.IN,
58 dns.rdatatype.AAAA,
59 '2001:DB8::1')
60 expectedResponse.answer.append(rrset)
61
6ca2e796
RG
62 for method in ("sendUDPQuery", "sendTCPQuery"):
63 sender = getattr(self, method)
64 (_, receivedResponse) = sender(query, response=None, useQueue=False)
65 self.assertTrue(receivedResponse)
66 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
67
68 def testSpoofActionCNAME(self):
69 """
70 Spoofing: Spoof CNAME via Action
71
72 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
73 check that dnsdist sends a spoofed result.
74 """
75 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
76 query = dns.message.make_query(name, 'A', 'IN')
77 # dnsdist set RA = RD for spoofed responses
78 query.flags &= ~dns.flags.RD
79 expectedResponse = dns.message.make_response(query)
80 rrset = dns.rrset.from_text(name,
81 60,
82 dns.rdataclass.IN,
83 dns.rdatatype.CNAME,
84 'cnameaction.spoofing.tests.powerdns.com.')
85 expectedResponse.answer.append(rrset)
86
6ca2e796
RG
87 for method in ("sendUDPQuery", "sendTCPQuery"):
88 sender = getattr(self, method)
89 (_, receivedResponse) = sender(query, response=None, useQueue=False)
90 self.assertTrue(receivedResponse)
91 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
92
93 def testSpoofActionMultiA(self):
94 """
95 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
96
97 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
98 check that dnsdist sends a spoofed result.
99 """
100 name = 'multispoof.spoofing.tests.powerdns.com.'
101 query = dns.message.make_query(name, 'A', 'IN')
102 # dnsdist set RA = RD for spoofed responses
103 query.flags &= ~dns.flags.RD
104 expectedResponse = dns.message.make_response(query)
105 rrset = dns.rrset.from_text(name,
106 60,
107 dns.rdataclass.IN,
108 dns.rdatatype.A,
109 '192.0.2.2', '192.0.2.1')
110 expectedResponse.answer.append(rrset)
111
6ca2e796
RG
112 for method in ("sendUDPQuery", "sendTCPQuery"):
113 sender = getattr(self, method)
114 (_, receivedResponse) = sender(query, response=None, useQueue=False)
115 self.assertTrue(receivedResponse)
116 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
117
118 def testSpoofActionMultiAAAA(self):
119 """
120 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
121
122 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
123 check that dnsdist sends a spoofed result.
124 """
125 name = 'multispoof.spoofing.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'AAAA', 'IN')
127 # dnsdist set RA = RD for spoofed responses
128 query.flags &= ~dns.flags.RD
129 expectedResponse = dns.message.make_response(query)
130 rrset = dns.rrset.from_text(name,
131 60,
132 dns.rdataclass.IN,
133 dns.rdatatype.AAAA,
134 '2001:DB8::1', '2001:DB8::2')
135 expectedResponse.answer.append(rrset)
136
6ca2e796
RG
137 for method in ("sendUDPQuery", "sendTCPQuery"):
138 sender = getattr(self, method)
139 (_, receivedResponse) = sender(query, response=None, useQueue=False)
140 self.assertTrue(receivedResponse)
141 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
142
143 def testSpoofActionMultiANY(self):
144 """
145 Spoofing: Spoof multiple addresses via AddDomainSpoof
146
147 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
148 check that dnsdist sends a spoofed result.
149 """
150 name = 'multispoof.spoofing.tests.powerdns.com.'
151 query = dns.message.make_query(name, 'ANY', 'IN')
152 # dnsdist set RA = RD for spoofed responses
153 query.flags &= ~dns.flags.RD
154 expectedResponse = dns.message.make_response(query)
fe1c60f2 155
903853f4
RG
156 rrset = dns.rrset.from_text(name,
157 60,
158 dns.rdataclass.IN,
159 dns.rdatatype.A,
160 '192.0.2.2', '192.0.2.1')
161 expectedResponse.answer.append(rrset)
fe1c60f2 162
903853f4
RG
163 rrset = dns.rrset.from_text(name,
164 60,
165 dns.rdataclass.IN,
166 dns.rdatatype.AAAA,
167 '2001:DB8::1', '2001:DB8::2')
168 expectedResponse.answer.append(rrset)
169
6ca2e796
RG
170 for method in ("sendUDPQuery", "sendTCPQuery"):
171 sender = getattr(self, method)
172 (_, receivedResponse) = sender(query, response=None, useQueue=False)
173 self.assertTrue(receivedResponse)
174 self.assertEquals(expectedResponse, receivedResponse)
903853f4 175
fbb45df0
RG
176 def testSpoofActionSetAA(self):
177 """
178 Spoofing: Spoof via Action, setting AA=1
179 """
180 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
181 query = dns.message.make_query(name, 'AAAA', 'IN')
182 # dnsdist set RA = RD for spoofed responses
183 query.flags &= ~dns.flags.RD
184 expectedResponse = dns.message.make_response(query)
185 expectedResponse.flags |= dns.flags.AA
186 rrset = dns.rrset.from_text(name,
187 60,
188 dns.rdataclass.IN,
189 dns.rdatatype.AAAA,
190 '2001:DB8::1')
191 expectedResponse.answer.append(rrset)
192
193 for method in ("sendUDPQuery", "sendTCPQuery"):
194 sender = getattr(self, method)
195 (_, receivedResponse) = sender(query, response=None, useQueue=False)
196 self.assertTrue(receivedResponse)
197 self.assertEquals(expectedResponse, receivedResponse)
198
199 def testSpoofActionSetAD(self):
200 """
201 Spoofing: Spoof via Action, setting AD=1
202 """
203 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
204 query = dns.message.make_query(name, 'AAAA', 'IN')
205 # dnsdist set RA = RD for spoofed responses
206 query.flags &= ~dns.flags.RD
207 expectedResponse = dns.message.make_response(query)
208 expectedResponse.flags |= dns.flags.AD
209 rrset = dns.rrset.from_text(name,
210 60,
211 dns.rdataclass.IN,
212 dns.rdatatype.AAAA,
213 '2001:DB8::1')
214 expectedResponse.answer.append(rrset)
215
216 for method in ("sendUDPQuery", "sendTCPQuery"):
217 sender = getattr(self, method)
218 (_, receivedResponse) = sender(query, response=None, useQueue=False)
219 self.assertTrue(receivedResponse)
220 self.assertEquals(expectedResponse, receivedResponse)
221
222 def testSpoofActionSetRA(self):
223 """
224 Spoofing: Spoof via Action, setting RA=1
225 """
226 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
227 query = dns.message.make_query(name, 'AAAA', 'IN')
228 # dnsdist set RA = RD for spoofed responses
229 query.flags &= ~dns.flags.RD
230 expectedResponse = dns.message.make_response(query)
231 expectedResponse.flags |= dns.flags.RA
232 rrset = dns.rrset.from_text(name,
233 60,
234 dns.rdataclass.IN,
235 dns.rdatatype.AAAA,
236 '2001:DB8::1')
237 expectedResponse.answer.append(rrset)
238
239 for method in ("sendUDPQuery", "sendTCPQuery"):
240 sender = getattr(self, method)
241 (_, receivedResponse) = sender(query, response=None, useQueue=False)
242 self.assertTrue(receivedResponse)
243 self.assertEquals(expectedResponse, receivedResponse)
244
245 def testSpoofActionSetNoRA(self):
246 """
247 Spoofing: Spoof via Action, setting RA=0
248 """
249 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
250 query = dns.message.make_query(name, 'AAAA', 'IN')
251 expectedResponse = dns.message.make_response(query)
252 expectedResponse.flags &= ~dns.flags.RA
253 rrset = dns.rrset.from_text(name,
254 60,
255 dns.rdataclass.IN,
256 dns.rdatatype.AAAA,
257 '2001:DB8::1')
258 expectedResponse.answer.append(rrset)
259
260 for method in ("sendUDPQuery", "sendTCPQuery"):
261 sender = getattr(self, method)
262 (_, receivedResponse) = sender(query, response=None, useQueue=False)
263 self.assertTrue(receivedResponse)
264 self.assertEquals(expectedResponse, receivedResponse)
265
903853f4
RG
266class TestSpoofingLuaSpoof(DNSDistTest):
267
268 _config_template = """
269 function spoof1rule(dq)
270 if(dq.qtype==1) -- A
271 then
614239d0 272 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
903853f4
RG
273 elseif(dq.qtype == 28) -- AAAA
274 then
275 return DNSAction.Spoof, "2001:DB8::1"
276 else
277 return DNSAction.None, ""
278 end
279 end
280 function spoof2rule(dq)
281 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
282 end
a2ff35e3
RG
283 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
284 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
903853f4
RG
285 newServer{address="127.0.0.1:%s"}
286 """
287
288 def testLuaSpoofA(self):
289 """
290 Spoofing: Spoofing an A via Lua
291
292 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
293 check that dnsdist sends a spoofed result.
294 """
295 name = 'luaspoof1.spoofing.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'A', 'IN')
297 # dnsdist set RA = RD for spoofed responses
298 query.flags &= ~dns.flags.RD
299 expectedResponse = dns.message.make_response(query)
300 rrset = dns.rrset.from_text(name,
301 60,
302 dns.rdataclass.IN,
303 dns.rdatatype.A,
614239d0 304 '192.0.2.1', '192.0.2.2')
903853f4
RG
305 expectedResponse.answer.append(rrset)
306
6ca2e796
RG
307 for method in ("sendUDPQuery", "sendTCPQuery"):
308 sender = getattr(self, method)
309 (_, receivedResponse) = sender(query, response=None, useQueue=False)
310 self.assertTrue(receivedResponse)
311 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
312
313 def testLuaSpoofAAAA(self):
314 """
315 Spoofing: Spoofing an AAAA via Lua
316
317 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
318 check that dnsdist sends a spoofed result.
319 """
320 name = 'luaspoof1.spoofing.tests.powerdns.com.'
321 query = dns.message.make_query(name, 'AAAA', 'IN')
322 # dnsdist set RA = RD for spoofed responses
323 query.flags &= ~dns.flags.RD
324 expectedResponse = dns.message.make_response(query)
325 rrset = dns.rrset.from_text(name,
326 60,
327 dns.rdataclass.IN,
328 dns.rdatatype.AAAA,
329 '2001:DB8::1')
330 expectedResponse.answer.append(rrset)
331
6ca2e796
RG
332 for method in ("sendUDPQuery", "sendTCPQuery"):
333 sender = getattr(self, method)
334 (_, receivedResponse) = sender(query, response=None, useQueue=False)
335 self.assertTrue(receivedResponse)
336 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
337
338 def testLuaSpoofAWithCNAME(self):
339 """
340 Spoofing: Spoofing an A with a CNAME via Lua
341
342 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
343 check that dnsdist sends a spoofed result.
344 """
345 name = 'luaspoof2.spoofing.tests.powerdns.com.'
346 query = dns.message.make_query(name, 'A', 'IN')
347 # dnsdist set RA = RD for spoofed responses
348 query.flags &= ~dns.flags.RD
349 expectedResponse = dns.message.make_response(query)
350 rrset = dns.rrset.from_text(name,
351 60,
352 dns.rdataclass.IN,
353 dns.rdatatype.CNAME,
354 'spoofedcname.spoofing.tests.powerdns.com.')
355 expectedResponse.answer.append(rrset)
356
6ca2e796
RG
357 for method in ("sendUDPQuery", "sendTCPQuery"):
358 sender = getattr(self, method)
359 (_, receivedResponse) = sender(query, response=None, useQueue=False)
360 self.assertTrue(receivedResponse)
361 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
362
363 def testLuaSpoofAAAAWithCNAME(self):
364 """
365 Spoofing: Spoofing an AAAA with a CNAME via Lua
366
367 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
368 check that dnsdist sends a spoofed result.
369 """
370 name = 'luaspoof2.spoofing.tests.powerdns.com.'
371 query = dns.message.make_query(name, 'AAAA', 'IN')
372 # dnsdist set RA = RD for spoofed responses
373 query.flags &= ~dns.flags.RD
374 expectedResponse = dns.message.make_response(query)
375 rrset = dns.rrset.from_text(name,
376 60,
377 dns.rdataclass.IN,
378 dns.rdatatype.CNAME,
379 'spoofedcname.spoofing.tests.powerdns.com.')
380 expectedResponse.answer.append(rrset)
381
6ca2e796
RG
382 for method in ("sendUDPQuery", "sendTCPQuery"):
383 sender = getattr(self, method)
384 (_, receivedResponse) = sender(query, response=None, useQueue=False)
385 self.assertTrue(receivedResponse)
386 self.assertEquals(expectedResponse, receivedResponse)
86baa8df
RG
387
388class TestSpoofingLuaWithStatistics(DNSDistTest):
389
390 _config_template = """
391 function spoof1rule(dq)
392 queriesCount = getStatisticsCounters()['queries']
393 if(queriesCount == 1) then
394 return DNSAction.Spoof, "192.0.2.1"
395 elseif(queriesCount == 2) then
396 return DNSAction.Spoof, "192.0.2.2"
397 else
398 return DNSAction.Spoof, "192.0.2.0"
399 end
400 end
a2ff35e3 401 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
86baa8df
RG
402 newServer{address="127.0.0.1:%s"}
403 """
404
405 def testLuaSpoofBasedOnStatistics(self):
406 """
407 Spoofing: Spoofing an A via Lua based on statistics counters
408
409 """
410 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
411 query = dns.message.make_query(name, 'A', 'IN')
412 # dnsdist set RA = RD for spoofed responses
413 query.flags &= ~dns.flags.RD
414 expectedResponse1 = dns.message.make_response(query)
415 rrset = dns.rrset.from_text(name,
416 60,
417 dns.rdataclass.IN,
418 dns.rdatatype.A,
419 '192.0.2.1')
420 expectedResponse1.answer.append(rrset)
421 expectedResponse2 = dns.message.make_response(query)
422 rrset = dns.rrset.from_text(name,
423 60,
424 dns.rdataclass.IN,
425 dns.rdatatype.A,
426 '192.0.2.2')
427 expectedResponse2.answer.append(rrset)
428 expectedResponseAfterwards = dns.message.make_response(query)
429 rrset = dns.rrset.from_text(name,
430 60,
431 dns.rdataclass.IN,
432 dns.rdatatype.A,
433 '192.0.2.0')
434 expectedResponseAfterwards.answer.append(rrset)
435
436 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
437 self.assertTrue(receivedResponse)
438 self.assertEquals(expectedResponse1, receivedResponse)
439
440 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
441 self.assertTrue(receivedResponse)
442 self.assertEquals(expectedResponse2, receivedResponse)
443
6ca2e796
RG
444 for method in ("sendUDPQuery", "sendTCPQuery"):
445 sender = getattr(self, method)
446 (_, receivedResponse) = sender(query, response=None, useQueue=False)
447 self.assertTrue(receivedResponse)
448 self.assertEquals(expectedResponseAfterwards, receivedResponse)