]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #6069 from Habbie/no-more-bootstrap
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
10 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testSpoofActionA(self):
15 """
16 Spoofing: Spoof A via Action
17
18 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
19 check that dnsdist sends a spoofed result.
20 """
21 name = 'spoofaction.spoofing.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 # dnsdist set RA = RD for spoofed responses
24 query.flags &= ~dns.flags.RD
25 expectedResponse = dns.message.make_response(query)
26 rrset = dns.rrset.from_text(name,
27 60,
28 dns.rdataclass.IN,
29 dns.rdatatype.A,
30 '192.0.2.1')
31 expectedResponse.answer.append(rrset)
32
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertTrue(receivedResponse)
35 self.assertEquals(expectedResponse, receivedResponse)
36
37 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
38 self.assertTrue(receivedResponse)
39 self.assertEquals(expectedResponse, receivedResponse)
40
41 def testSpoofActionAAAA(self):
42 """
43 Spoofing: Spoof AAAA via Action
44
45 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
46 check that dnsdist sends a spoofed result.
47 """
48 name = 'spoofaction.spoofing.tests.powerdns.com.'
49 query = dns.message.make_query(name, 'AAAA', 'IN')
50 # dnsdist set RA = RD for spoofed responses
51 query.flags &= ~dns.flags.RD
52 expectedResponse = dns.message.make_response(query)
53 rrset = dns.rrset.from_text(name,
54 60,
55 dns.rdataclass.IN,
56 dns.rdatatype.AAAA,
57 '2001:DB8::1')
58 expectedResponse.answer.append(rrset)
59
60 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
61 self.assertTrue(receivedResponse)
62 self.assertEquals(expectedResponse, receivedResponse)
63
64 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
65 self.assertTrue(receivedResponse)
66 self.assertEquals(expectedResponse, receivedResponse)
67
68 def testSpoofActionCNAME(self):
69 """
70 Spoofing: Spoof CNAME via Action
71
72 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
73 check that dnsdist sends a spoofed result.
74 """
75 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
76 query = dns.message.make_query(name, 'A', 'IN')
77 # dnsdist set RA = RD for spoofed responses
78 query.flags &= ~dns.flags.RD
79 expectedResponse = dns.message.make_response(query)
80 rrset = dns.rrset.from_text(name,
81 60,
82 dns.rdataclass.IN,
83 dns.rdatatype.CNAME,
84 'cnameaction.spoofing.tests.powerdns.com.')
85 expectedResponse.answer.append(rrset)
86
87 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
88 self.assertTrue(receivedResponse)
89 self.assertEquals(expectedResponse, receivedResponse)
90
91 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
92 self.assertTrue(receivedResponse)
93 self.assertEquals(expectedResponse, receivedResponse)
94
95 def testSpoofActionMultiA(self):
96 """
97 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
98
99 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
100 check that dnsdist sends a spoofed result.
101 """
102 name = 'multispoof.spoofing.tests.powerdns.com.'
103 query = dns.message.make_query(name, 'A', 'IN')
104 # dnsdist set RA = RD for spoofed responses
105 query.flags &= ~dns.flags.RD
106 expectedResponse = dns.message.make_response(query)
107 rrset = dns.rrset.from_text(name,
108 60,
109 dns.rdataclass.IN,
110 dns.rdatatype.A,
111 '192.0.2.2', '192.0.2.1')
112 expectedResponse.answer.append(rrset)
113
114 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
115 self.assertTrue(receivedResponse)
116 self.assertEquals(expectedResponse, receivedResponse)
117
118 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
119 self.assertTrue(receivedResponse)
120 self.assertEquals(expectedResponse, receivedResponse)
121
122 def testSpoofActionMultiAAAA(self):
123 """
124 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
125
126 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
127 check that dnsdist sends a spoofed result.
128 """
129 name = 'multispoof.spoofing.tests.powerdns.com.'
130 query = dns.message.make_query(name, 'AAAA', 'IN')
131 # dnsdist set RA = RD for spoofed responses
132 query.flags &= ~dns.flags.RD
133 expectedResponse = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 60,
136 dns.rdataclass.IN,
137 dns.rdatatype.AAAA,
138 '2001:DB8::1', '2001:DB8::2')
139 expectedResponse.answer.append(rrset)
140
141 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
142 self.assertTrue(receivedResponse)
143 self.assertEquals(expectedResponse, receivedResponse)
144
145 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
146 self.assertTrue(receivedResponse)
147 self.assertEquals(expectedResponse, receivedResponse)
148
149 def testSpoofActionMultiANY(self):
150 """
151 Spoofing: Spoof multiple addresses via AddDomainSpoof
152
153 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
154 check that dnsdist sends a spoofed result.
155 """
156 name = 'multispoof.spoofing.tests.powerdns.com.'
157 query = dns.message.make_query(name, 'ANY', 'IN')
158 # dnsdist set RA = RD for spoofed responses
159 query.flags &= ~dns.flags.RD
160 expectedResponse = dns.message.make_response(query)
161
162 rrset = dns.rrset.from_text(name,
163 60,
164 dns.rdataclass.IN,
165 dns.rdatatype.A,
166 '192.0.2.2', '192.0.2.1')
167 expectedResponse.answer.append(rrset)
168
169 rrset = dns.rrset.from_text(name,
170 60,
171 dns.rdataclass.IN,
172 dns.rdatatype.AAAA,
173 '2001:DB8::1', '2001:DB8::2')
174 expectedResponse.answer.append(rrset)
175
176 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
177 self.assertTrue(receivedResponse)
178 self.assertEquals(expectedResponse, receivedResponse)
179
180 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
181 self.assertTrue(receivedResponse)
182 self.assertEquals(expectedResponse, receivedResponse)
183
184 class TestSpoofingLuaSpoof(DNSDistTest):
185
186 _config_template = """
187 function spoof1rule(dq)
188 if(dq.qtype==1) -- A
189 then
190 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
191 elseif(dq.qtype == 28) -- AAAA
192 then
193 return DNSAction.Spoof, "2001:DB8::1"
194 else
195 return DNSAction.None, ""
196 end
197 end
198 function spoof2rule(dq)
199 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
200 end
201 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
202 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
203 newServer{address="127.0.0.1:%s"}
204 """
205
206 def testLuaSpoofA(self):
207 """
208 Spoofing: Spoofing an A via Lua
209
210 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
211 check that dnsdist sends a spoofed result.
212 """
213 name = 'luaspoof1.spoofing.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'A', 'IN')
215 # dnsdist set RA = RD for spoofed responses
216 query.flags &= ~dns.flags.RD
217 expectedResponse = dns.message.make_response(query)
218 rrset = dns.rrset.from_text(name,
219 60,
220 dns.rdataclass.IN,
221 dns.rdatatype.A,
222 '192.0.2.1', '192.0.2.2')
223 expectedResponse.answer.append(rrset)
224
225 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
226 self.assertTrue(receivedResponse)
227 self.assertEquals(expectedResponse, receivedResponse)
228
229 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
230 self.assertTrue(receivedResponse)
231 self.assertEquals(expectedResponse, receivedResponse)
232
233 def testLuaSpoofAAAA(self):
234 """
235 Spoofing: Spoofing an AAAA via Lua
236
237 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
238 check that dnsdist sends a spoofed result.
239 """
240 name = 'luaspoof1.spoofing.tests.powerdns.com.'
241 query = dns.message.make_query(name, 'AAAA', 'IN')
242 # dnsdist set RA = RD for spoofed responses
243 query.flags &= ~dns.flags.RD
244 expectedResponse = dns.message.make_response(query)
245 rrset = dns.rrset.from_text(name,
246 60,
247 dns.rdataclass.IN,
248 dns.rdatatype.AAAA,
249 '2001:DB8::1')
250 expectedResponse.answer.append(rrset)
251
252 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
253 self.assertTrue(receivedResponse)
254 self.assertEquals(expectedResponse, receivedResponse)
255
256 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
257 self.assertTrue(receivedResponse)
258 self.assertEquals(expectedResponse, receivedResponse)
259
260 def testLuaSpoofAWithCNAME(self):
261 """
262 Spoofing: Spoofing an A with a CNAME via Lua
263
264 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
265 check that dnsdist sends a spoofed result.
266 """
267 name = 'luaspoof2.spoofing.tests.powerdns.com.'
268 query = dns.message.make_query(name, 'A', 'IN')
269 # dnsdist set RA = RD for spoofed responses
270 query.flags &= ~dns.flags.RD
271 expectedResponse = dns.message.make_response(query)
272 rrset = dns.rrset.from_text(name,
273 60,
274 dns.rdataclass.IN,
275 dns.rdatatype.CNAME,
276 'spoofedcname.spoofing.tests.powerdns.com.')
277 expectedResponse.answer.append(rrset)
278
279 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
280 self.assertTrue(receivedResponse)
281 self.assertEquals(expectedResponse, receivedResponse)
282
283 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
284 self.assertTrue(receivedResponse)
285 self.assertEquals(expectedResponse, receivedResponse)
286
287 def testLuaSpoofAAAAWithCNAME(self):
288 """
289 Spoofing: Spoofing an AAAA with a CNAME via Lua
290
291 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
292 check that dnsdist sends a spoofed result.
293 """
294 name = 'luaspoof2.spoofing.tests.powerdns.com.'
295 query = dns.message.make_query(name, 'AAAA', 'IN')
296 # dnsdist set RA = RD for spoofed responses
297 query.flags &= ~dns.flags.RD
298 expectedResponse = dns.message.make_response(query)
299 rrset = dns.rrset.from_text(name,
300 60,
301 dns.rdataclass.IN,
302 dns.rdatatype.CNAME,
303 'spoofedcname.spoofing.tests.powerdns.com.')
304 expectedResponse.answer.append(rrset)
305
306 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
307 self.assertTrue(receivedResponse)
308 self.assertEquals(expectedResponse, receivedResponse)
309
310 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
311 self.assertTrue(receivedResponse)
312 self.assertEquals(expectedResponse, receivedResponse)
313
314 class TestSpoofingLuaWithStatistics(DNSDistTest):
315
316 _config_template = """
317 function spoof1rule(dq)
318 queriesCount = getStatisticsCounters()['queries']
319 if(queriesCount == 1) then
320 return DNSAction.Spoof, "192.0.2.1"
321 elseif(queriesCount == 2) then
322 return DNSAction.Spoof, "192.0.2.2"
323 else
324 return DNSAction.Spoof, "192.0.2.0"
325 end
326 end
327 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
328 newServer{address="127.0.0.1:%s"}
329 """
330
331 def testLuaSpoofBasedOnStatistics(self):
332 """
333 Spoofing: Spoofing an A via Lua based on statistics counters
334
335 """
336 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
337 query = dns.message.make_query(name, 'A', 'IN')
338 # dnsdist set RA = RD for spoofed responses
339 query.flags &= ~dns.flags.RD
340 expectedResponse1 = dns.message.make_response(query)
341 rrset = dns.rrset.from_text(name,
342 60,
343 dns.rdataclass.IN,
344 dns.rdatatype.A,
345 '192.0.2.1')
346 expectedResponse1.answer.append(rrset)
347 expectedResponse2 = dns.message.make_response(query)
348 rrset = dns.rrset.from_text(name,
349 60,
350 dns.rdataclass.IN,
351 dns.rdatatype.A,
352 '192.0.2.2')
353 expectedResponse2.answer.append(rrset)
354 expectedResponseAfterwards = dns.message.make_response(query)
355 rrset = dns.rrset.from_text(name,
356 60,
357 dns.rdataclass.IN,
358 dns.rdatatype.A,
359 '192.0.2.0')
360 expectedResponseAfterwards.answer.append(rrset)
361
362 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
363 self.assertTrue(receivedResponse)
364 self.assertEquals(expectedResponse1, receivedResponse)
365
366 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
367 self.assertTrue(receivedResponse)
368 self.assertEquals(expectedResponse2, receivedResponse)
369
370 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
371 self.assertTrue(receivedResponse)
372 self.assertEquals(expectedResponseAfterwards, receivedResponse)
373
374 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
375 self.assertTrue(receivedResponse)
376 self.assertEquals(expectedResponseAfterwards, receivedResponse)