]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #4635 from rgacogne/rec-stats-ringbuffer-4633
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addDomainSpoof("spoof.spoofing.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
9 addDomainCNAMESpoof("cnamespoof.spoofing.tests.powerdns.com.", "cname.spoofing.tests.powerdns.com.")
10 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
11 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
12 addDomainSpoof("multispoof.spoofing.tests.powerdns.com", {"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})
13 newServer{address="127.0.0.1:%s"}
14 """
15
16 def testSpoofA(self):
17 """
18 Spoofing: Spoof A
19
20 Send an A query to "spoof.spoofing.tests.powerdns.com.",
21 check that dnsdist sends a spoofed result.
22 """
23 name = 'spoof.spoofing.tests.powerdns.com.'
24 query = dns.message.make_query(name, 'A', 'IN')
25 # dnsdist set RA = RD for spoofed responses
26 query.flags &= ~dns.flags.RD
27 expectedResponse = dns.message.make_response(query)
28 rrset = dns.rrset.from_text(name,
29 60,
30 dns.rdataclass.IN,
31 dns.rdatatype.A,
32 '192.0.2.1')
33 expectedResponse.answer.append(rrset)
34
35 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
36 self.assertTrue(receivedResponse)
37 self.assertEquals(expectedResponse, receivedResponse)
38
39 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
40 self.assertTrue(receivedResponse)
41 self.assertEquals(expectedResponse, receivedResponse)
42
43 def testSpoofAAAA(self):
44 """
45 Spoofing: Spoof AAAA
46
47 Send an AAAA query to "spoof.spoofing.tests.powerdns.com.",
48 check that dnsdist sends a spoofed result.
49 """
50 name = 'spoof.spoofing.tests.powerdns.com.'
51 query = dns.message.make_query(name, 'AAAA', 'IN')
52 # dnsdist set RA = RD for spoofed responses
53 query.flags &= ~dns.flags.RD
54 expectedResponse = dns.message.make_response(query)
55 rrset = dns.rrset.from_text(name,
56 60,
57 dns.rdataclass.IN,
58 dns.rdatatype.AAAA,
59 '2001:DB8::1')
60 expectedResponse.answer.append(rrset)
61
62 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
63 self.assertTrue(receivedResponse)
64 self.assertEquals(expectedResponse, receivedResponse)
65
66 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
67 self.assertTrue(receivedResponse)
68 self.assertEquals(expectedResponse, receivedResponse)
69
70 def testSpoofCNAME(self):
71 """
72 Spoofing: Spoof CNAME
73
74 Send an A query for "cnamespoof.spoofing.tests.powerdns.com.",
75 check that dnsdist sends a spoofed result.
76 """
77 name = 'cnamespoof.spoofing.tests.powerdns.com.'
78 query = dns.message.make_query(name, 'A', 'IN')
79 # dnsdist set RA = RD for spoofed responses
80 query.flags &= ~dns.flags.RD
81 expectedResponse = dns.message.make_response(query)
82 rrset = dns.rrset.from_text(name,
83 60,
84 dns.rdataclass.IN,
85 dns.rdatatype.CNAME,
86 'cname.spoofing.tests.powerdns.com.')
87 expectedResponse.answer.append(rrset)
88
89 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
90 self.assertTrue(receivedResponse)
91 self.assertEquals(expectedResponse, receivedResponse)
92
93 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
94 self.assertTrue(receivedResponse)
95 self.assertEquals(expectedResponse, receivedResponse)
96
97 def testSpoofActionA(self):
98 """
99 Spoofing: Spoof A via Action
100
101 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
102 check that dnsdist sends a spoofed result.
103 """
104 name = 'spoofaction.spoofing.tests.powerdns.com.'
105 query = dns.message.make_query(name, 'A', 'IN')
106 # dnsdist set RA = RD for spoofed responses
107 query.flags &= ~dns.flags.RD
108 expectedResponse = dns.message.make_response(query)
109 rrset = dns.rrset.from_text(name,
110 60,
111 dns.rdataclass.IN,
112 dns.rdatatype.A,
113 '192.0.2.1')
114 expectedResponse.answer.append(rrset)
115
116 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
117 self.assertTrue(receivedResponse)
118 self.assertEquals(expectedResponse, receivedResponse)
119
120 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
121 self.assertTrue(receivedResponse)
122 self.assertEquals(expectedResponse, receivedResponse)
123
124 def testSpoofActionAAAA(self):
125 """
126 Spoofing: Spoof AAAA via Action
127
128 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
129 check that dnsdist sends a spoofed result.
130 """
131 name = 'spoofaction.spoofing.tests.powerdns.com.'
132 query = dns.message.make_query(name, 'AAAA', 'IN')
133 # dnsdist set RA = RD for spoofed responses
134 query.flags &= ~dns.flags.RD
135 expectedResponse = dns.message.make_response(query)
136 rrset = dns.rrset.from_text(name,
137 60,
138 dns.rdataclass.IN,
139 dns.rdatatype.AAAA,
140 '2001:DB8::1')
141 expectedResponse.answer.append(rrset)
142
143 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
144 self.assertTrue(receivedResponse)
145 self.assertEquals(expectedResponse, receivedResponse)
146
147 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
148 self.assertTrue(receivedResponse)
149 self.assertEquals(expectedResponse, receivedResponse)
150
151 def testSpoofActionCNAME(self):
152 """
153 Spoofing: Spoof CNAME via Action
154
155 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
156 check that dnsdist sends a spoofed result.
157 """
158 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN')
160 # dnsdist set RA = RD for spoofed responses
161 query.flags &= ~dns.flags.RD
162 expectedResponse = dns.message.make_response(query)
163 rrset = dns.rrset.from_text(name,
164 60,
165 dns.rdataclass.IN,
166 dns.rdatatype.CNAME,
167 'cnameaction.spoofing.tests.powerdns.com.')
168 expectedResponse.answer.append(rrset)
169
170 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
171 self.assertTrue(receivedResponse)
172 self.assertEquals(expectedResponse, receivedResponse)
173
174 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
175 self.assertTrue(receivedResponse)
176 self.assertEquals(expectedResponse, receivedResponse)
177
178 def testSpoofActionMultiA(self):
179 """
180 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
181
182 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
183 check that dnsdist sends a spoofed result.
184 """
185 name = 'multispoof.spoofing.tests.powerdns.com.'
186 query = dns.message.make_query(name, 'A', 'IN')
187 # dnsdist set RA = RD for spoofed responses
188 query.flags &= ~dns.flags.RD
189 expectedResponse = dns.message.make_response(query)
190 rrset = dns.rrset.from_text(name,
191 60,
192 dns.rdataclass.IN,
193 dns.rdatatype.A,
194 '192.0.2.2', '192.0.2.1')
195 expectedResponse.answer.append(rrset)
196
197 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
198 self.assertTrue(receivedResponse)
199 self.assertEquals(expectedResponse, receivedResponse)
200
201 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
202 self.assertTrue(receivedResponse)
203 self.assertEquals(expectedResponse, receivedResponse)
204
205 def testSpoofActionMultiAAAA(self):
206 """
207 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
208
209 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
210 check that dnsdist sends a spoofed result.
211 """
212 name = 'multispoof.spoofing.tests.powerdns.com.'
213 query = dns.message.make_query(name, 'AAAA', 'IN')
214 # dnsdist set RA = RD for spoofed responses
215 query.flags &= ~dns.flags.RD
216 expectedResponse = dns.message.make_response(query)
217 rrset = dns.rrset.from_text(name,
218 60,
219 dns.rdataclass.IN,
220 dns.rdatatype.AAAA,
221 '2001:DB8::1', '2001:DB8::2')
222 expectedResponse.answer.append(rrset)
223
224 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
225 self.assertTrue(receivedResponse)
226 self.assertEquals(expectedResponse, receivedResponse)
227
228 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
229 self.assertTrue(receivedResponse)
230 self.assertEquals(expectedResponse, receivedResponse)
231
232 def testSpoofActionMultiANY(self):
233 """
234 Spoofing: Spoof multiple addresses via AddDomainSpoof
235
236 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
237 check that dnsdist sends a spoofed result.
238 """
239 name = 'multispoof.spoofing.tests.powerdns.com.'
240 query = dns.message.make_query(name, 'ANY', 'IN')
241 # dnsdist set RA = RD for spoofed responses
242 query.flags &= ~dns.flags.RD
243 expectedResponse = dns.message.make_response(query)
244
245 rrset = dns.rrset.from_text(name,
246 60,
247 dns.rdataclass.IN,
248 dns.rdatatype.A,
249 '192.0.2.2', '192.0.2.1')
250 expectedResponse.answer.append(rrset)
251
252 rrset = dns.rrset.from_text(name,
253 60,
254 dns.rdataclass.IN,
255 dns.rdatatype.AAAA,
256 '2001:DB8::1', '2001:DB8::2')
257 expectedResponse.answer.append(rrset)
258
259 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
260 self.assertTrue(receivedResponse)
261 self.assertEquals(expectedResponse, receivedResponse)
262
263 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
264 self.assertTrue(receivedResponse)
265 self.assertEquals(expectedResponse, receivedResponse)
266
267 class TestSpoofingLuaSpoof(DNSDistTest):
268
269 _config_template = """
270 function spoof1rule(dq)
271 if(dq.qtype==1) -- A
272 then
273 return DNSAction.Spoof, "192.0.2.1"
274 elseif(dq.qtype == 28) -- AAAA
275 then
276 return DNSAction.Spoof, "2001:DB8::1"
277 else
278 return DNSAction.None, ""
279 end
280 end
281 function spoof2rule(dq)
282 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
283 end
284 addLuaAction("luaspoof1.spoofing.tests.powerdns.com.", spoof1rule)
285 addLuaAction("luaspoof2.spoofing.tests.powerdns.com.", spoof2rule)
286 newServer{address="127.0.0.1:%s"}
287 """
288
289 def testLuaSpoofA(self):
290 """
291 Spoofing: Spoofing an A via Lua
292
293 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
294 check that dnsdist sends a spoofed result.
295 """
296 name = 'luaspoof1.spoofing.tests.powerdns.com.'
297 query = dns.message.make_query(name, 'A', 'IN')
298 # dnsdist set RA = RD for spoofed responses
299 query.flags &= ~dns.flags.RD
300 expectedResponse = dns.message.make_response(query)
301 rrset = dns.rrset.from_text(name,
302 60,
303 dns.rdataclass.IN,
304 dns.rdatatype.A,
305 '192.0.2.1')
306 expectedResponse.answer.append(rrset)
307
308 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
309 self.assertTrue(receivedResponse)
310 self.assertEquals(expectedResponse, receivedResponse)
311
312 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
313 self.assertTrue(receivedResponse)
314 self.assertEquals(expectedResponse, receivedResponse)
315
316 def testLuaSpoofAAAA(self):
317 """
318 Spoofing: Spoofing an AAAA via Lua
319
320 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
321 check that dnsdist sends a spoofed result.
322 """
323 name = 'luaspoof1.spoofing.tests.powerdns.com.'
324 query = dns.message.make_query(name, 'AAAA', 'IN')
325 # dnsdist set RA = RD for spoofed responses
326 query.flags &= ~dns.flags.RD
327 expectedResponse = dns.message.make_response(query)
328 rrset = dns.rrset.from_text(name,
329 60,
330 dns.rdataclass.IN,
331 dns.rdatatype.AAAA,
332 '2001:DB8::1')
333 expectedResponse.answer.append(rrset)
334
335 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
336 self.assertTrue(receivedResponse)
337 self.assertEquals(expectedResponse, receivedResponse)
338
339 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
340 self.assertTrue(receivedResponse)
341 self.assertEquals(expectedResponse, receivedResponse)
342
343 def testLuaSpoofAWithCNAME(self):
344 """
345 Spoofing: Spoofing an A with a CNAME via Lua
346
347 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
348 check that dnsdist sends a spoofed result.
349 """
350 name = 'luaspoof2.spoofing.tests.powerdns.com.'
351 query = dns.message.make_query(name, 'A', 'IN')
352 # dnsdist set RA = RD for spoofed responses
353 query.flags &= ~dns.flags.RD
354 expectedResponse = dns.message.make_response(query)
355 rrset = dns.rrset.from_text(name,
356 60,
357 dns.rdataclass.IN,
358 dns.rdatatype.CNAME,
359 'spoofedcname.spoofing.tests.powerdns.com.')
360 expectedResponse.answer.append(rrset)
361
362 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
363 self.assertTrue(receivedResponse)
364 self.assertEquals(expectedResponse, receivedResponse)
365
366 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
367 self.assertTrue(receivedResponse)
368 self.assertEquals(expectedResponse, receivedResponse)
369
370 def testLuaSpoofAAAAWithCNAME(self):
371 """
372 Spoofing: Spoofing an AAAA with a CNAME via Lua
373
374 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
375 check that dnsdist sends a spoofed result.
376 """
377 name = 'luaspoof2.spoofing.tests.powerdns.com.'
378 query = dns.message.make_query(name, 'AAAA', 'IN')
379 # dnsdist set RA = RD for spoofed responses
380 query.flags &= ~dns.flags.RD
381 expectedResponse = dns.message.make_response(query)
382 rrset = dns.rrset.from_text(name,
383 60,
384 dns.rdataclass.IN,
385 dns.rdatatype.CNAME,
386 'spoofedcname.spoofing.tests.powerdns.com.')
387 expectedResponse.answer.append(rrset)
388
389 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
390 self.assertTrue(receivedResponse)
391 self.assertEquals(expectedResponse, receivedResponse)
392
393 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
394 self.assertTrue(receivedResponse)
395 self.assertEquals(expectedResponse, receivedResponse)
396
397 class TestSpoofingLuaWithStatistics(DNSDistTest):
398
399 _config_template = """
400 function spoof1rule(dq)
401 queriesCount = getStatisticsCounters()['queries']
402 if(queriesCount == 1) then
403 return DNSAction.Spoof, "192.0.2.1"
404 elseif(queriesCount == 2) then
405 return DNSAction.Spoof, "192.0.2.2"
406 else
407 return DNSAction.Spoof, "192.0.2.0"
408 end
409 end
410 addLuaAction("luaspoofwithstats.spoofing.tests.powerdns.com.", spoof1rule)
411 newServer{address="127.0.0.1:%s"}
412 """
413
414 def testLuaSpoofBasedOnStatistics(self):
415 """
416 Spoofing: Spoofing an A via Lua based on statistics counters
417
418 """
419 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
420 query = dns.message.make_query(name, 'A', 'IN')
421 # dnsdist set RA = RD for spoofed responses
422 query.flags &= ~dns.flags.RD
423 expectedResponse1 = dns.message.make_response(query)
424 rrset = dns.rrset.from_text(name,
425 60,
426 dns.rdataclass.IN,
427 dns.rdatatype.A,
428 '192.0.2.1')
429 expectedResponse1.answer.append(rrset)
430 expectedResponse2 = dns.message.make_response(query)
431 rrset = dns.rrset.from_text(name,
432 60,
433 dns.rdataclass.IN,
434 dns.rdatatype.A,
435 '192.0.2.2')
436 expectedResponse2.answer.append(rrset)
437 expectedResponseAfterwards = dns.message.make_response(query)
438 rrset = dns.rrset.from_text(name,
439 60,
440 dns.rdataclass.IN,
441 dns.rdatatype.A,
442 '192.0.2.0')
443 expectedResponseAfterwards.answer.append(rrset)
444
445 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
446 self.assertTrue(receivedResponse)
447 self.assertEquals(expectedResponse1, receivedResponse)
448
449 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
450 self.assertTrue(receivedResponse)
451 self.assertEquals(expectedResponse2, receivedResponse)
452
453 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
454 self.assertTrue(receivedResponse)
455 self.assertEquals(expectedResponseAfterwards, receivedResponse)
456
457 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
458 self.assertTrue(receivedResponse)
459 self.assertEquals(expectedResponseAfterwards, receivedResponse)