]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #8688 from omoerbeek/rec-socketdir-message
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
CommitLineData
903853f4
RG
1#!/usr/bin/env python
2import dns
3from dnsdisttests import DNSDistTest
4
5class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
903853f4
RG
8 addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
9 addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
6bb38cd6 10 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
903853f4
RG
11 newServer{address="127.0.0.1:%s"}
12 """
13
903853f4
RG
14 def testSpoofActionA(self):
15 """
16 Spoofing: Spoof A via Action
17
18 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
19 check that dnsdist sends a spoofed result.
20 """
21 name = 'spoofaction.spoofing.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 # dnsdist set RA = RD for spoofed responses
24 query.flags &= ~dns.flags.RD
25 expectedResponse = dns.message.make_response(query)
26 rrset = dns.rrset.from_text(name,
27 60,
28 dns.rdataclass.IN,
29 dns.rdatatype.A,
30 '192.0.2.1')
31 expectedResponse.answer.append(rrset)
32
6ca2e796
RG
33 for method in ("sendUDPQuery", "sendTCPQuery"):
34 sender = getattr(self, method)
35 (_, receivedResponse) = sender(query, response=None, useQueue=False)
36 self.assertTrue(receivedResponse)
37 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
38
39 def testSpoofActionAAAA(self):
40 """
41 Spoofing: Spoof AAAA via Action
42
43 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
44 check that dnsdist sends a spoofed result.
45 """
46 name = 'spoofaction.spoofing.tests.powerdns.com.'
47 query = dns.message.make_query(name, 'AAAA', 'IN')
48 # dnsdist set RA = RD for spoofed responses
49 query.flags &= ~dns.flags.RD
50 expectedResponse = dns.message.make_response(query)
51 rrset = dns.rrset.from_text(name,
52 60,
53 dns.rdataclass.IN,
54 dns.rdatatype.AAAA,
55 '2001:DB8::1')
56 expectedResponse.answer.append(rrset)
57
6ca2e796
RG
58 for method in ("sendUDPQuery", "sendTCPQuery"):
59 sender = getattr(self, method)
60 (_, receivedResponse) = sender(query, response=None, useQueue=False)
61 self.assertTrue(receivedResponse)
62 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
63
64 def testSpoofActionCNAME(self):
65 """
66 Spoofing: Spoof CNAME via Action
67
68 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
69 check that dnsdist sends a spoofed result.
70 """
71 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'A', 'IN')
73 # dnsdist set RA = RD for spoofed responses
74 query.flags &= ~dns.flags.RD
75 expectedResponse = dns.message.make_response(query)
76 rrset = dns.rrset.from_text(name,
77 60,
78 dns.rdataclass.IN,
79 dns.rdatatype.CNAME,
80 'cnameaction.spoofing.tests.powerdns.com.')
81 expectedResponse.answer.append(rrset)
82
6ca2e796
RG
83 for method in ("sendUDPQuery", "sendTCPQuery"):
84 sender = getattr(self, method)
85 (_, receivedResponse) = sender(query, response=None, useQueue=False)
86 self.assertTrue(receivedResponse)
87 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
88
89 def testSpoofActionMultiA(self):
90 """
91 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
92
93 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
94 check that dnsdist sends a spoofed result.
95 """
96 name = 'multispoof.spoofing.tests.powerdns.com.'
97 query = dns.message.make_query(name, 'A', 'IN')
98 # dnsdist set RA = RD for spoofed responses
99 query.flags &= ~dns.flags.RD
100 expectedResponse = dns.message.make_response(query)
101 rrset = dns.rrset.from_text(name,
102 60,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '192.0.2.2', '192.0.2.1')
106 expectedResponse.answer.append(rrset)
107
6ca2e796
RG
108 for method in ("sendUDPQuery", "sendTCPQuery"):
109 sender = getattr(self, method)
110 (_, receivedResponse) = sender(query, response=None, useQueue=False)
111 self.assertTrue(receivedResponse)
112 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
113
114 def testSpoofActionMultiAAAA(self):
115 """
116 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
117
118 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
119 check that dnsdist sends a spoofed result.
120 """
121 name = 'multispoof.spoofing.tests.powerdns.com.'
122 query = dns.message.make_query(name, 'AAAA', 'IN')
123 # dnsdist set RA = RD for spoofed responses
124 query.flags &= ~dns.flags.RD
125 expectedResponse = dns.message.make_response(query)
126 rrset = dns.rrset.from_text(name,
127 60,
128 dns.rdataclass.IN,
129 dns.rdatatype.AAAA,
130 '2001:DB8::1', '2001:DB8::2')
131 expectedResponse.answer.append(rrset)
132
6ca2e796
RG
133 for method in ("sendUDPQuery", "sendTCPQuery"):
134 sender = getattr(self, method)
135 (_, receivedResponse) = sender(query, response=None, useQueue=False)
136 self.assertTrue(receivedResponse)
137 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
138
139 def testSpoofActionMultiANY(self):
140 """
141 Spoofing: Spoof multiple addresses via AddDomainSpoof
142
143 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
144 check that dnsdist sends a spoofed result.
145 """
146 name = 'multispoof.spoofing.tests.powerdns.com.'
147 query = dns.message.make_query(name, 'ANY', 'IN')
148 # dnsdist set RA = RD for spoofed responses
149 query.flags &= ~dns.flags.RD
150 expectedResponse = dns.message.make_response(query)
fe1c60f2 151
903853f4
RG
152 rrset = dns.rrset.from_text(name,
153 60,
154 dns.rdataclass.IN,
155 dns.rdatatype.A,
156 '192.0.2.2', '192.0.2.1')
157 expectedResponse.answer.append(rrset)
fe1c60f2 158
903853f4
RG
159 rrset = dns.rrset.from_text(name,
160 60,
161 dns.rdataclass.IN,
162 dns.rdatatype.AAAA,
163 '2001:DB8::1', '2001:DB8::2')
164 expectedResponse.answer.append(rrset)
165
6ca2e796
RG
166 for method in ("sendUDPQuery", "sendTCPQuery"):
167 sender = getattr(self, method)
168 (_, receivedResponse) = sender(query, response=None, useQueue=False)
169 self.assertTrue(receivedResponse)
170 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
171
172class TestSpoofingLuaSpoof(DNSDistTest):
173
174 _config_template = """
175 function spoof1rule(dq)
176 if(dq.qtype==1) -- A
177 then
614239d0 178 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
903853f4
RG
179 elseif(dq.qtype == 28) -- AAAA
180 then
181 return DNSAction.Spoof, "2001:DB8::1"
182 else
183 return DNSAction.None, ""
184 end
185 end
186 function spoof2rule(dq)
187 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
188 end
a2ff35e3
RG
189 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
190 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
903853f4
RG
191 newServer{address="127.0.0.1:%s"}
192 """
193
194 def testLuaSpoofA(self):
195 """
196 Spoofing: Spoofing an A via Lua
197
198 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
199 check that dnsdist sends a spoofed result.
200 """
201 name = 'luaspoof1.spoofing.tests.powerdns.com.'
202 query = dns.message.make_query(name, 'A', 'IN')
203 # dnsdist set RA = RD for spoofed responses
204 query.flags &= ~dns.flags.RD
205 expectedResponse = dns.message.make_response(query)
206 rrset = dns.rrset.from_text(name,
207 60,
208 dns.rdataclass.IN,
209 dns.rdatatype.A,
614239d0 210 '192.0.2.1', '192.0.2.2')
903853f4
RG
211 expectedResponse.answer.append(rrset)
212
6ca2e796
RG
213 for method in ("sendUDPQuery", "sendTCPQuery"):
214 sender = getattr(self, method)
215 (_, receivedResponse) = sender(query, response=None, useQueue=False)
216 self.assertTrue(receivedResponse)
217 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
218
219 def testLuaSpoofAAAA(self):
220 """
221 Spoofing: Spoofing an AAAA via Lua
222
223 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
224 check that dnsdist sends a spoofed result.
225 """
226 name = 'luaspoof1.spoofing.tests.powerdns.com.'
227 query = dns.message.make_query(name, 'AAAA', 'IN')
228 # dnsdist set RA = RD for spoofed responses
229 query.flags &= ~dns.flags.RD
230 expectedResponse = dns.message.make_response(query)
231 rrset = dns.rrset.from_text(name,
232 60,
233 dns.rdataclass.IN,
234 dns.rdatatype.AAAA,
235 '2001:DB8::1')
236 expectedResponse.answer.append(rrset)
237
6ca2e796
RG
238 for method in ("sendUDPQuery", "sendTCPQuery"):
239 sender = getattr(self, method)
240 (_, receivedResponse) = sender(query, response=None, useQueue=False)
241 self.assertTrue(receivedResponse)
242 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
243
244 def testLuaSpoofAWithCNAME(self):
245 """
246 Spoofing: Spoofing an A with a CNAME via Lua
247
248 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
249 check that dnsdist sends a spoofed result.
250 """
251 name = 'luaspoof2.spoofing.tests.powerdns.com.'
252 query = dns.message.make_query(name, 'A', 'IN')
253 # dnsdist set RA = RD for spoofed responses
254 query.flags &= ~dns.flags.RD
255 expectedResponse = dns.message.make_response(query)
256 rrset = dns.rrset.from_text(name,
257 60,
258 dns.rdataclass.IN,
259 dns.rdatatype.CNAME,
260 'spoofedcname.spoofing.tests.powerdns.com.')
261 expectedResponse.answer.append(rrset)
262
6ca2e796
RG
263 for method in ("sendUDPQuery", "sendTCPQuery"):
264 sender = getattr(self, method)
265 (_, receivedResponse) = sender(query, response=None, useQueue=False)
266 self.assertTrue(receivedResponse)
267 self.assertEquals(expectedResponse, receivedResponse)
903853f4
RG
268
269 def testLuaSpoofAAAAWithCNAME(self):
270 """
271 Spoofing: Spoofing an AAAA with a CNAME via Lua
272
273 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
274 check that dnsdist sends a spoofed result.
275 """
276 name = 'luaspoof2.spoofing.tests.powerdns.com.'
277 query = dns.message.make_query(name, 'AAAA', 'IN')
278 # dnsdist set RA = RD for spoofed responses
279 query.flags &= ~dns.flags.RD
280 expectedResponse = dns.message.make_response(query)
281 rrset = dns.rrset.from_text(name,
282 60,
283 dns.rdataclass.IN,
284 dns.rdatatype.CNAME,
285 'spoofedcname.spoofing.tests.powerdns.com.')
286 expectedResponse.answer.append(rrset)
287
6ca2e796
RG
288 for method in ("sendUDPQuery", "sendTCPQuery"):
289 sender = getattr(self, method)
290 (_, receivedResponse) = sender(query, response=None, useQueue=False)
291 self.assertTrue(receivedResponse)
292 self.assertEquals(expectedResponse, receivedResponse)
86baa8df
RG
293
294class TestSpoofingLuaWithStatistics(DNSDistTest):
295
296 _config_template = """
297 function spoof1rule(dq)
298 queriesCount = getStatisticsCounters()['queries']
299 if(queriesCount == 1) then
300 return DNSAction.Spoof, "192.0.2.1"
301 elseif(queriesCount == 2) then
302 return DNSAction.Spoof, "192.0.2.2"
303 else
304 return DNSAction.Spoof, "192.0.2.0"
305 end
306 end
a2ff35e3 307 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
86baa8df
RG
308 newServer{address="127.0.0.1:%s"}
309 """
310
311 def testLuaSpoofBasedOnStatistics(self):
312 """
313 Spoofing: Spoofing an A via Lua based on statistics counters
314
315 """
316 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
317 query = dns.message.make_query(name, 'A', 'IN')
318 # dnsdist set RA = RD for spoofed responses
319 query.flags &= ~dns.flags.RD
320 expectedResponse1 = dns.message.make_response(query)
321 rrset = dns.rrset.from_text(name,
322 60,
323 dns.rdataclass.IN,
324 dns.rdatatype.A,
325 '192.0.2.1')
326 expectedResponse1.answer.append(rrset)
327 expectedResponse2 = dns.message.make_response(query)
328 rrset = dns.rrset.from_text(name,
329 60,
330 dns.rdataclass.IN,
331 dns.rdatatype.A,
332 '192.0.2.2')
333 expectedResponse2.answer.append(rrset)
334 expectedResponseAfterwards = dns.message.make_response(query)
335 rrset = dns.rrset.from_text(name,
336 60,
337 dns.rdataclass.IN,
338 dns.rdatatype.A,
339 '192.0.2.0')
340 expectedResponseAfterwards.answer.append(rrset)
341
342 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
343 self.assertTrue(receivedResponse)
344 self.assertEquals(expectedResponse1, receivedResponse)
345
346 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
347 self.assertTrue(receivedResponse)
348 self.assertEquals(expectedResponse2, receivedResponse)
349
6ca2e796
RG
350 for method in ("sendUDPQuery", "sendTCPQuery"):
351 sender = getattr(self, method)
352 (_, receivedResponse) = sender(query, response=None, useQueue=False)
353 self.assertTrue(receivedResponse)
354 self.assertEquals(expectedResponseAfterwards, receivedResponse)