]>
Commit | Line | Data |
---|---|---|
903853f4 RG |
1 | #!/usr/bin/env python |
2 | import dns | |
3 | from dnsdisttests import DNSDistTest | |
4 | ||
5 | class TestSpoofingSpoof(DNSDistTest): | |
6 | ||
7 | _config_template = """ | |
903853f4 RG |
8 | addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1")) |
9 | addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com.")) | |
6bb38cd6 | 10 | addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})) |
903853f4 RG |
11 | newServer{address="127.0.0.1:%s"} |
12 | """ | |
13 | ||
903853f4 RG |
14 | def testSpoofActionA(self): |
15 | """ | |
16 | Spoofing: Spoof A via Action | |
17 | ||
18 | Send an A query to "spoofaction.spoofing.tests.powerdns.com.", | |
19 | check that dnsdist sends a spoofed result. | |
20 | """ | |
21 | name = 'spoofaction.spoofing.tests.powerdns.com.' | |
22 | query = dns.message.make_query(name, 'A', 'IN') | |
23 | # dnsdist set RA = RD for spoofed responses | |
24 | query.flags &= ~dns.flags.RD | |
25 | expectedResponse = dns.message.make_response(query) | |
26 | rrset = dns.rrset.from_text(name, | |
27 | 60, | |
28 | dns.rdataclass.IN, | |
29 | dns.rdatatype.A, | |
30 | '192.0.2.1') | |
31 | expectedResponse.answer.append(rrset) | |
32 | ||
6ca2e796 RG |
33 | for method in ("sendUDPQuery", "sendTCPQuery"): |
34 | sender = getattr(self, method) | |
35 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
36 | self.assertTrue(receivedResponse) | |
37 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
38 | |
39 | def testSpoofActionAAAA(self): | |
40 | """ | |
41 | Spoofing: Spoof AAAA via Action | |
42 | ||
43 | Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.", | |
44 | check that dnsdist sends a spoofed result. | |
45 | """ | |
46 | name = 'spoofaction.spoofing.tests.powerdns.com.' | |
47 | query = dns.message.make_query(name, 'AAAA', 'IN') | |
48 | # dnsdist set RA = RD for spoofed responses | |
49 | query.flags &= ~dns.flags.RD | |
50 | expectedResponse = dns.message.make_response(query) | |
51 | rrset = dns.rrset.from_text(name, | |
52 | 60, | |
53 | dns.rdataclass.IN, | |
54 | dns.rdatatype.AAAA, | |
55 | '2001:DB8::1') | |
56 | expectedResponse.answer.append(rrset) | |
57 | ||
6ca2e796 RG |
58 | for method in ("sendUDPQuery", "sendTCPQuery"): |
59 | sender = getattr(self, method) | |
60 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
61 | self.assertTrue(receivedResponse) | |
62 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
63 | |
64 | def testSpoofActionCNAME(self): | |
65 | """ | |
66 | Spoofing: Spoof CNAME via Action | |
67 | ||
68 | Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.", | |
69 | check that dnsdist sends a spoofed result. | |
70 | """ | |
71 | name = 'cnamespoofaction.spoofing.tests.powerdns.com.' | |
72 | query = dns.message.make_query(name, 'A', 'IN') | |
73 | # dnsdist set RA = RD for spoofed responses | |
74 | query.flags &= ~dns.flags.RD | |
75 | expectedResponse = dns.message.make_response(query) | |
76 | rrset = dns.rrset.from_text(name, | |
77 | 60, | |
78 | dns.rdataclass.IN, | |
79 | dns.rdatatype.CNAME, | |
80 | 'cnameaction.spoofing.tests.powerdns.com.') | |
81 | expectedResponse.answer.append(rrset) | |
82 | ||
6ca2e796 RG |
83 | for method in ("sendUDPQuery", "sendTCPQuery"): |
84 | sender = getattr(self, method) | |
85 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
86 | self.assertTrue(receivedResponse) | |
87 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
88 | |
89 | def testSpoofActionMultiA(self): | |
90 | """ | |
91 | Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof | |
92 | ||
93 | Send an A query for "multispoof.spoofing.tests.powerdns.com.", | |
94 | check that dnsdist sends a spoofed result. | |
95 | """ | |
96 | name = 'multispoof.spoofing.tests.powerdns.com.' | |
97 | query = dns.message.make_query(name, 'A', 'IN') | |
98 | # dnsdist set RA = RD for spoofed responses | |
99 | query.flags &= ~dns.flags.RD | |
100 | expectedResponse = dns.message.make_response(query) | |
101 | rrset = dns.rrset.from_text(name, | |
102 | 60, | |
103 | dns.rdataclass.IN, | |
104 | dns.rdatatype.A, | |
105 | '192.0.2.2', '192.0.2.1') | |
106 | expectedResponse.answer.append(rrset) | |
107 | ||
6ca2e796 RG |
108 | for method in ("sendUDPQuery", "sendTCPQuery"): |
109 | sender = getattr(self, method) | |
110 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
111 | self.assertTrue(receivedResponse) | |
112 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
113 | |
114 | def testSpoofActionMultiAAAA(self): | |
115 | """ | |
116 | Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof | |
117 | ||
118 | Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.", | |
119 | check that dnsdist sends a spoofed result. | |
120 | """ | |
121 | name = 'multispoof.spoofing.tests.powerdns.com.' | |
122 | query = dns.message.make_query(name, 'AAAA', 'IN') | |
123 | # dnsdist set RA = RD for spoofed responses | |
124 | query.flags &= ~dns.flags.RD | |
125 | expectedResponse = dns.message.make_response(query) | |
126 | rrset = dns.rrset.from_text(name, | |
127 | 60, | |
128 | dns.rdataclass.IN, | |
129 | dns.rdatatype.AAAA, | |
130 | '2001:DB8::1', '2001:DB8::2') | |
131 | expectedResponse.answer.append(rrset) | |
132 | ||
6ca2e796 RG |
133 | for method in ("sendUDPQuery", "sendTCPQuery"): |
134 | sender = getattr(self, method) | |
135 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
136 | self.assertTrue(receivedResponse) | |
137 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
138 | |
139 | def testSpoofActionMultiANY(self): | |
140 | """ | |
141 | Spoofing: Spoof multiple addresses via AddDomainSpoof | |
142 | ||
143 | Send an ANY query for "multispoof.spoofing.tests.powerdns.com.", | |
144 | check that dnsdist sends a spoofed result. | |
145 | """ | |
146 | name = 'multispoof.spoofing.tests.powerdns.com.' | |
147 | query = dns.message.make_query(name, 'ANY', 'IN') | |
148 | # dnsdist set RA = RD for spoofed responses | |
149 | query.flags &= ~dns.flags.RD | |
150 | expectedResponse = dns.message.make_response(query) | |
fe1c60f2 | 151 | |
903853f4 RG |
152 | rrset = dns.rrset.from_text(name, |
153 | 60, | |
154 | dns.rdataclass.IN, | |
155 | dns.rdatatype.A, | |
156 | '192.0.2.2', '192.0.2.1') | |
157 | expectedResponse.answer.append(rrset) | |
fe1c60f2 | 158 | |
903853f4 RG |
159 | rrset = dns.rrset.from_text(name, |
160 | 60, | |
161 | dns.rdataclass.IN, | |
162 | dns.rdatatype.AAAA, | |
163 | '2001:DB8::1', '2001:DB8::2') | |
164 | expectedResponse.answer.append(rrset) | |
165 | ||
6ca2e796 RG |
166 | for method in ("sendUDPQuery", "sendTCPQuery"): |
167 | sender = getattr(self, method) | |
168 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
169 | self.assertTrue(receivedResponse) | |
170 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
171 | |
172 | class TestSpoofingLuaSpoof(DNSDistTest): | |
173 | ||
174 | _config_template = """ | |
175 | function spoof1rule(dq) | |
176 | if(dq.qtype==1) -- A | |
177 | then | |
614239d0 | 178 | return DNSAction.Spoof, "192.0.2.1,192.0.2.2" |
903853f4 RG |
179 | elseif(dq.qtype == 28) -- AAAA |
180 | then | |
181 | return DNSAction.Spoof, "2001:DB8::1" | |
182 | else | |
183 | return DNSAction.None, "" | |
184 | end | |
185 | end | |
186 | function spoof2rule(dq) | |
187 | return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com." | |
188 | end | |
a2ff35e3 RG |
189 | addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule)) |
190 | addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule)) | |
903853f4 RG |
191 | newServer{address="127.0.0.1:%s"} |
192 | """ | |
193 | ||
194 | def testLuaSpoofA(self): | |
195 | """ | |
196 | Spoofing: Spoofing an A via Lua | |
197 | ||
198 | Send an A query to "luaspoof1.spoofing.tests.powerdns.com.", | |
199 | check that dnsdist sends a spoofed result. | |
200 | """ | |
201 | name = 'luaspoof1.spoofing.tests.powerdns.com.' | |
202 | query = dns.message.make_query(name, 'A', 'IN') | |
203 | # dnsdist set RA = RD for spoofed responses | |
204 | query.flags &= ~dns.flags.RD | |
205 | expectedResponse = dns.message.make_response(query) | |
206 | rrset = dns.rrset.from_text(name, | |
207 | 60, | |
208 | dns.rdataclass.IN, | |
209 | dns.rdatatype.A, | |
614239d0 | 210 | '192.0.2.1', '192.0.2.2') |
903853f4 RG |
211 | expectedResponse.answer.append(rrset) |
212 | ||
6ca2e796 RG |
213 | for method in ("sendUDPQuery", "sendTCPQuery"): |
214 | sender = getattr(self, method) | |
215 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
216 | self.assertTrue(receivedResponse) | |
217 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
218 | |
219 | def testLuaSpoofAAAA(self): | |
220 | """ | |
221 | Spoofing: Spoofing an AAAA via Lua | |
222 | ||
223 | Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.", | |
224 | check that dnsdist sends a spoofed result. | |
225 | """ | |
226 | name = 'luaspoof1.spoofing.tests.powerdns.com.' | |
227 | query = dns.message.make_query(name, 'AAAA', 'IN') | |
228 | # dnsdist set RA = RD for spoofed responses | |
229 | query.flags &= ~dns.flags.RD | |
230 | expectedResponse = dns.message.make_response(query) | |
231 | rrset = dns.rrset.from_text(name, | |
232 | 60, | |
233 | dns.rdataclass.IN, | |
234 | dns.rdatatype.AAAA, | |
235 | '2001:DB8::1') | |
236 | expectedResponse.answer.append(rrset) | |
237 | ||
6ca2e796 RG |
238 | for method in ("sendUDPQuery", "sendTCPQuery"): |
239 | sender = getattr(self, method) | |
240 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
241 | self.assertTrue(receivedResponse) | |
242 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
243 | |
244 | def testLuaSpoofAWithCNAME(self): | |
245 | """ | |
246 | Spoofing: Spoofing an A with a CNAME via Lua | |
247 | ||
248 | Send an A query to "luaspoof2.spoofing.tests.powerdns.com.", | |
249 | check that dnsdist sends a spoofed result. | |
250 | """ | |
251 | name = 'luaspoof2.spoofing.tests.powerdns.com.' | |
252 | query = dns.message.make_query(name, 'A', 'IN') | |
253 | # dnsdist set RA = RD for spoofed responses | |
254 | query.flags &= ~dns.flags.RD | |
255 | expectedResponse = dns.message.make_response(query) | |
256 | rrset = dns.rrset.from_text(name, | |
257 | 60, | |
258 | dns.rdataclass.IN, | |
259 | dns.rdatatype.CNAME, | |
260 | 'spoofedcname.spoofing.tests.powerdns.com.') | |
261 | expectedResponse.answer.append(rrset) | |
262 | ||
6ca2e796 RG |
263 | for method in ("sendUDPQuery", "sendTCPQuery"): |
264 | sender = getattr(self, method) | |
265 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
266 | self.assertTrue(receivedResponse) | |
267 | self.assertEquals(expectedResponse, receivedResponse) | |
903853f4 RG |
268 | |
269 | def testLuaSpoofAAAAWithCNAME(self): | |
270 | """ | |
271 | Spoofing: Spoofing an AAAA with a CNAME via Lua | |
272 | ||
273 | Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.", | |
274 | check that dnsdist sends a spoofed result. | |
275 | """ | |
276 | name = 'luaspoof2.spoofing.tests.powerdns.com.' | |
277 | query = dns.message.make_query(name, 'AAAA', 'IN') | |
278 | # dnsdist set RA = RD for spoofed responses | |
279 | query.flags &= ~dns.flags.RD | |
280 | expectedResponse = dns.message.make_response(query) | |
281 | rrset = dns.rrset.from_text(name, | |
282 | 60, | |
283 | dns.rdataclass.IN, | |
284 | dns.rdatatype.CNAME, | |
285 | 'spoofedcname.spoofing.tests.powerdns.com.') | |
286 | expectedResponse.answer.append(rrset) | |
287 | ||
6ca2e796 RG |
288 | for method in ("sendUDPQuery", "sendTCPQuery"): |
289 | sender = getattr(self, method) | |
290 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
291 | self.assertTrue(receivedResponse) | |
292 | self.assertEquals(expectedResponse, receivedResponse) | |
86baa8df RG |
293 | |
294 | class TestSpoofingLuaWithStatistics(DNSDistTest): | |
295 | ||
296 | _config_template = """ | |
297 | function spoof1rule(dq) | |
298 | queriesCount = getStatisticsCounters()['queries'] | |
299 | if(queriesCount == 1) then | |
300 | return DNSAction.Spoof, "192.0.2.1" | |
301 | elseif(queriesCount == 2) then | |
302 | return DNSAction.Spoof, "192.0.2.2" | |
303 | else | |
304 | return DNSAction.Spoof, "192.0.2.0" | |
305 | end | |
306 | end | |
a2ff35e3 | 307 | addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule)) |
86baa8df RG |
308 | newServer{address="127.0.0.1:%s"} |
309 | """ | |
310 | ||
311 | def testLuaSpoofBasedOnStatistics(self): | |
312 | """ | |
313 | Spoofing: Spoofing an A via Lua based on statistics counters | |
314 | ||
315 | """ | |
316 | name = 'luaspoofwithstats.spoofing.tests.powerdns.com.' | |
317 | query = dns.message.make_query(name, 'A', 'IN') | |
318 | # dnsdist set RA = RD for spoofed responses | |
319 | query.flags &= ~dns.flags.RD | |
320 | expectedResponse1 = dns.message.make_response(query) | |
321 | rrset = dns.rrset.from_text(name, | |
322 | 60, | |
323 | dns.rdataclass.IN, | |
324 | dns.rdatatype.A, | |
325 | '192.0.2.1') | |
326 | expectedResponse1.answer.append(rrset) | |
327 | expectedResponse2 = dns.message.make_response(query) | |
328 | rrset = dns.rrset.from_text(name, | |
329 | 60, | |
330 | dns.rdataclass.IN, | |
331 | dns.rdatatype.A, | |
332 | '192.0.2.2') | |
333 | expectedResponse2.answer.append(rrset) | |
334 | expectedResponseAfterwards = dns.message.make_response(query) | |
335 | rrset = dns.rrset.from_text(name, | |
336 | 60, | |
337 | dns.rdataclass.IN, | |
338 | dns.rdatatype.A, | |
339 | '192.0.2.0') | |
340 | expectedResponseAfterwards.answer.append(rrset) | |
341 | ||
342 | (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) | |
343 | self.assertTrue(receivedResponse) | |
344 | self.assertEquals(expectedResponse1, receivedResponse) | |
345 | ||
346 | (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) | |
347 | self.assertTrue(receivedResponse) | |
348 | self.assertEquals(expectedResponse2, receivedResponse) | |
349 | ||
6ca2e796 RG |
350 | for method in ("sendUDPQuery", "sendTCPQuery"): |
351 | sender = getattr(self, method) | |
352 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
353 | self.assertTrue(receivedResponse) | |
354 | self.assertEquals(expectedResponseAfterwards, receivedResponse) |