]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Spoofing.py
Merge pull request #14078 from rgacogne/ddist-harvest-quic
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Spoofing.py
CommitLineData
903853f4
RG
1#!/usr/bin/env python
2import dns
3from dnsdisttests import DNSDistTest
4
5class TestSpoofingSpoof(DNSDistTest):
6
7 _config_template = """
cc35f43b
RG
8 addAction(SuffixMatchNodeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}))
9 addAction(SuffixMatchNodeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {aa=true}))
10 addAction(SuffixMatchNodeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ad=true}))
11 addAction(SuffixMatchNodeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=true}))
12 addAction(SuffixMatchNodeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ra=false}))
13 addAction(SuffixMatchNodeRule("spoofaction-ttl.spoofing.tests.powerdns.com."), SpoofAction({"192.0.2.1", "2001:DB8::1"}, {ttl=1500}))
14 addAction(SuffixMatchNodeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
6bb38cd6 15 addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
cc35f43b
RG
16 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction("\\192\\000\\002\\001"))
17 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
18 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
19 addAction(AndRule{SuffixMatchNodeRule("rawchaos.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT), QClassRule(DNSClass.CHAOS)}, SpoofRawAction("\\005chaos"))
20 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction({"\\003aaa\\004bbbb", "\\011ccccccccccc"}))
21 addAction(AndRule{SuffixMatchNodeRule("multiraw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.A)}, SpoofRawAction({"\\192\\000\\002\\001", "\\192\\000\\002\\002"}))
eeaf2819
RG
22 -- rfc8482
23 addAction(AndRule{SuffixMatchNodeRule("raw-any.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.ANY)}, SpoofRawAction("\\007rfc\\056\\052\\056\\050\\000", { typeForAny=DNSQType.HINFO }))
903853f4
RG
24 newServer{address="127.0.0.1:%s"}
25 """
26
903853f4
RG
27 def testSpoofActionA(self):
28 """
29 Spoofing: Spoof A via Action
30
31 Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
32 check that dnsdist sends a spoofed result.
33 """
34 name = 'spoofaction.spoofing.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 # dnsdist set RA = RD for spoofed responses
37 query.flags &= ~dns.flags.RD
38 expectedResponse = dns.message.make_response(query)
39 rrset = dns.rrset.from_text(name,
40 60,
41 dns.rdataclass.IN,
42 dns.rdatatype.A,
43 '192.0.2.1')
44 expectedResponse.answer.append(rrset)
45
6ca2e796
RG
46 for method in ("sendUDPQuery", "sendTCPQuery"):
47 sender = getattr(self, method)
48 (_, receivedResponse) = sender(query, response=None, useQueue=False)
49 self.assertTrue(receivedResponse)
4bfebc93 50 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
51
52 def testSpoofActionAAAA(self):
53 """
54 Spoofing: Spoof AAAA via Action
55
56 Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
57 check that dnsdist sends a spoofed result.
58 """
59 name = 'spoofaction.spoofing.tests.powerdns.com.'
60 query = dns.message.make_query(name, 'AAAA', 'IN')
61 # dnsdist set RA = RD for spoofed responses
62 query.flags &= ~dns.flags.RD
63 expectedResponse = dns.message.make_response(query)
64 rrset = dns.rrset.from_text(name,
65 60,
66 dns.rdataclass.IN,
67 dns.rdatatype.AAAA,
68 '2001:DB8::1')
69 expectedResponse.answer.append(rrset)
70
6ca2e796
RG
71 for method in ("sendUDPQuery", "sendTCPQuery"):
72 sender = getattr(self, method)
73 (_, receivedResponse) = sender(query, response=None, useQueue=False)
74 self.assertTrue(receivedResponse)
4bfebc93 75 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
76
77 def testSpoofActionCNAME(self):
78 """
79 Spoofing: Spoof CNAME via Action
80
81 Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
82 check that dnsdist sends a spoofed result.
83 """
84 name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
85 query = dns.message.make_query(name, 'A', 'IN')
86 # dnsdist set RA = RD for spoofed responses
87 query.flags &= ~dns.flags.RD
88 expectedResponse = dns.message.make_response(query)
89 rrset = dns.rrset.from_text(name,
90 60,
91 dns.rdataclass.IN,
92 dns.rdatatype.CNAME,
93 'cnameaction.spoofing.tests.powerdns.com.')
94 expectedResponse.answer.append(rrset)
95
6ca2e796
RG
96 for method in ("sendUDPQuery", "sendTCPQuery"):
97 sender = getattr(self, method)
98 (_, receivedResponse) = sender(query, response=None, useQueue=False)
99 self.assertTrue(receivedResponse)
4bfebc93 100 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
101
102 def testSpoofActionMultiA(self):
103 """
104 Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
105
106 Send an A query for "multispoof.spoofing.tests.powerdns.com.",
107 check that dnsdist sends a spoofed result.
108 """
109 name = 'multispoof.spoofing.tests.powerdns.com.'
110 query = dns.message.make_query(name, 'A', 'IN')
111 # dnsdist set RA = RD for spoofed responses
112 query.flags &= ~dns.flags.RD
113 expectedResponse = dns.message.make_response(query)
114 rrset = dns.rrset.from_text(name,
115 60,
116 dns.rdataclass.IN,
117 dns.rdatatype.A,
118 '192.0.2.2', '192.0.2.1')
119 expectedResponse.answer.append(rrset)
120
6ca2e796
RG
121 for method in ("sendUDPQuery", "sendTCPQuery"):
122 sender = getattr(self, method)
123 (_, receivedResponse) = sender(query, response=None, useQueue=False)
124 self.assertTrue(receivedResponse)
4bfebc93 125 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
126
127 def testSpoofActionMultiAAAA(self):
128 """
129 Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
130
131 Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
132 check that dnsdist sends a spoofed result.
133 """
134 name = 'multispoof.spoofing.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'AAAA', 'IN')
136 # dnsdist set RA = RD for spoofed responses
137 query.flags &= ~dns.flags.RD
138 expectedResponse = dns.message.make_response(query)
139 rrset = dns.rrset.from_text(name,
140 60,
141 dns.rdataclass.IN,
142 dns.rdatatype.AAAA,
143 '2001:DB8::1', '2001:DB8::2')
144 expectedResponse.answer.append(rrset)
145
6ca2e796
RG
146 for method in ("sendUDPQuery", "sendTCPQuery"):
147 sender = getattr(self, method)
148 (_, receivedResponse) = sender(query, response=None, useQueue=False)
149 self.assertTrue(receivedResponse)
4bfebc93 150 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
151
152 def testSpoofActionMultiANY(self):
153 """
154 Spoofing: Spoof multiple addresses via AddDomainSpoof
155
156 Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
157 check that dnsdist sends a spoofed result.
158 """
159 name = 'multispoof.spoofing.tests.powerdns.com.'
160 query = dns.message.make_query(name, 'ANY', 'IN')
161 # dnsdist set RA = RD for spoofed responses
162 query.flags &= ~dns.flags.RD
163 expectedResponse = dns.message.make_response(query)
fe1c60f2 164
903853f4
RG
165 rrset = dns.rrset.from_text(name,
166 60,
167 dns.rdataclass.IN,
168 dns.rdatatype.A,
169 '192.0.2.2', '192.0.2.1')
170 expectedResponse.answer.append(rrset)
fe1c60f2 171
903853f4
RG
172 rrset = dns.rrset.from_text(name,
173 60,
174 dns.rdataclass.IN,
175 dns.rdatatype.AAAA,
176 '2001:DB8::1', '2001:DB8::2')
177 expectedResponse.answer.append(rrset)
178
6ca2e796
RG
179 for method in ("sendUDPQuery", "sendTCPQuery"):
180 sender = getattr(self, method)
181 (_, receivedResponse) = sender(query, response=None, useQueue=False)
182 self.assertTrue(receivedResponse)
4bfebc93 183 self.assertEqual(expectedResponse, receivedResponse)
903853f4 184
fbb45df0
RG
185 def testSpoofActionSetAA(self):
186 """
187 Spoofing: Spoof via Action, setting AA=1
188 """
189 name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
190 query = dns.message.make_query(name, 'AAAA', 'IN')
191 # dnsdist set RA = RD for spoofed responses
192 query.flags &= ~dns.flags.RD
193 expectedResponse = dns.message.make_response(query)
194 expectedResponse.flags |= dns.flags.AA
195 rrset = dns.rrset.from_text(name,
196 60,
197 dns.rdataclass.IN,
198 dns.rdatatype.AAAA,
199 '2001:DB8::1')
200 expectedResponse.answer.append(rrset)
201
202 for method in ("sendUDPQuery", "sendTCPQuery"):
203 sender = getattr(self, method)
204 (_, receivedResponse) = sender(query, response=None, useQueue=False)
205 self.assertTrue(receivedResponse)
4bfebc93
CH
206 self.assertEqual(expectedResponse, receivedResponse)
207 self.assertEqual(receivedResponse.answer[0].ttl, 60)
fbb45df0
RG
208
209 def testSpoofActionSetAD(self):
210 """
211 Spoofing: Spoof via Action, setting AD=1
212 """
213 name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'AAAA', 'IN')
215 # dnsdist set RA = RD for spoofed responses
216 query.flags &= ~dns.flags.RD
217 expectedResponse = dns.message.make_response(query)
218 expectedResponse.flags |= dns.flags.AD
219 rrset = dns.rrset.from_text(name,
220 60,
221 dns.rdataclass.IN,
222 dns.rdatatype.AAAA,
223 '2001:DB8::1')
224 expectedResponse.answer.append(rrset)
225
226 for method in ("sendUDPQuery", "sendTCPQuery"):
227 sender = getattr(self, method)
228 (_, receivedResponse) = sender(query, response=None, useQueue=False)
229 self.assertTrue(receivedResponse)
4bfebc93
CH
230 self.assertEqual(expectedResponse, receivedResponse)
231 self.assertEqual(receivedResponse.answer[0].ttl, 60)
fbb45df0
RG
232
233 def testSpoofActionSetRA(self):
234 """
235 Spoofing: Spoof via Action, setting RA=1
236 """
237 name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'AAAA', 'IN')
239 # dnsdist set RA = RD for spoofed responses
240 query.flags &= ~dns.flags.RD
241 expectedResponse = dns.message.make_response(query)
242 expectedResponse.flags |= dns.flags.RA
243 rrset = dns.rrset.from_text(name,
244 60,
245 dns.rdataclass.IN,
246 dns.rdatatype.AAAA,
247 '2001:DB8::1')
248 expectedResponse.answer.append(rrset)
249
250 for method in ("sendUDPQuery", "sendTCPQuery"):
251 sender = getattr(self, method)
252 (_, receivedResponse) = sender(query, response=None, useQueue=False)
253 self.assertTrue(receivedResponse)
4bfebc93
CH
254 self.assertEqual(expectedResponse, receivedResponse)
255 self.assertEqual(receivedResponse.answer[0].ttl, 60)
fbb45df0
RG
256
257 def testSpoofActionSetNoRA(self):
258 """
259 Spoofing: Spoof via Action, setting RA=0
260 """
261 name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
262 query = dns.message.make_query(name, 'AAAA', 'IN')
263 expectedResponse = dns.message.make_response(query)
264 expectedResponse.flags &= ~dns.flags.RA
265 rrset = dns.rrset.from_text(name,
266 60,
267 dns.rdataclass.IN,
268 dns.rdatatype.AAAA,
269 '2001:DB8::1')
270 expectedResponse.answer.append(rrset)
271
272 for method in ("sendUDPQuery", "sendTCPQuery"):
273 sender = getattr(self, method)
274 (_, receivedResponse) = sender(query, response=None, useQueue=False)
275 self.assertTrue(receivedResponse)
4bfebc93
CH
276 self.assertEqual(expectedResponse, receivedResponse)
277 self.assertEqual(receivedResponse.answer[0].ttl, 60)
202c4ab9 278
e9143e93
PL
279 def testSpoofActionSetTTL(self):
280 """
281 Spoofing: Spoof via Action, setting the TTL to 1500
282 """
283 name = 'spoofaction-ttl.spoofing.tests.powerdns.com.'
284 query = dns.message.make_query(name, 'AAAA', 'IN')
285 expectedResponse = dns.message.make_response(query)
286 expectedResponse.flags |= dns.flags.RA
287 rrset = dns.rrset.from_text(name,
288 60,
289 dns.rdataclass.IN,
290 dns.rdatatype.AAAA,
291 '2001:DB8::1')
292 expectedResponse.answer.append(rrset)
293
294 for method in ("sendUDPQuery", "sendTCPQuery"):
295 sender = getattr(self, method)
296 (_, receivedResponse) = sender(query, response=None, useQueue=False)
297 self.assertTrue(receivedResponse)
4bfebc93
CH
298 self.assertEqual(expectedResponse, receivedResponse)
299 self.assertEqual(receivedResponse.answer[0].ttl, 1500)
e9143e93 300
202c4ab9
RG
301 def testSpoofRawAction(self):
302 """
303 Spoofing: Spoof a response from raw bytes
304 """
305 name = 'raw.spoofing.tests.powerdns.com.'
306
307 # A
308 query = dns.message.make_query(name, 'A', 'IN')
309 query.flags &= ~dns.flags.RD
310 expectedResponse = dns.message.make_response(query)
311 expectedResponse.flags &= ~dns.flags.AA
312 rrset = dns.rrset.from_text(name,
313 60,
314 dns.rdataclass.IN,
315 dns.rdatatype.A,
316 '192.0.2.1')
317 expectedResponse.answer.append(rrset)
318
319 for method in ("sendUDPQuery", "sendTCPQuery"):
320 sender = getattr(self, method)
321 (_, receivedResponse) = sender(query, response=None, useQueue=False)
322 self.assertTrue(receivedResponse)
4bfebc93
CH
323 self.assertEqual(expectedResponse, receivedResponse)
324 self.assertEqual(receivedResponse.answer[0].ttl, 60)
202c4ab9
RG
325
326 # TXT
327 query = dns.message.make_query(name, 'TXT', 'IN')
328 query.flags &= ~dns.flags.RD
329 expectedResponse = dns.message.make_response(query)
330 expectedResponse.flags &= ~dns.flags.AA
331 rrset = dns.rrset.from_text(name,
332 60,
333 dns.rdataclass.IN,
334 dns.rdatatype.TXT,
335 '"aaa" "bbbb" "ccccccccccc"')
336 expectedResponse.answer.append(rrset)
337
338 for method in ("sendUDPQuery", "sendTCPQuery"):
339 sender = getattr(self, method)
340 (_, receivedResponse) = sender(query, response=None, useQueue=False)
341 self.assertTrue(receivedResponse)
4bfebc93
CH
342 self.assertEqual(expectedResponse, receivedResponse)
343 self.assertEqual(receivedResponse.answer[0].ttl, 60)
202c4ab9
RG
344
345 # SRV
346 query = dns.message.make_query(name, 'SRV', 'IN')
347 query.flags &= ~dns.flags.RD
348 expectedResponse = dns.message.make_response(query)
349 # this one should have the AA flag set
350 expectedResponse.flags |= dns.flags.AA
351 rrset = dns.rrset.from_text(name,
352 3600,
353 dns.rdataclass.IN,
354 dns.rdatatype.SRV,
355 '0 0 65535 srv.powerdns.com.')
356 expectedResponse.answer.append(rrset)
357
358 for method in ("sendUDPQuery", "sendTCPQuery"):
359 sender = getattr(self, method)
360 (_, receivedResponse) = sender(query, response=None, useQueue=False)
361 self.assertTrue(receivedResponse)
4bfebc93
CH
362 self.assertEqual(expectedResponse, receivedResponse)
363 self.assertEqual(receivedResponse.answer[0].ttl, 3600)
fbb45df0 364
6bb404b6
CC
365 def testSpoofRawChaosAction(self):
366 """
367 Spoofing: Spoof a response from several raw bytes in QCLass CH
368 """
369 name = 'rawchaos.spoofing.tests.powerdns.com.'
370
371 # TXT CH
372 query = dns.message.make_query(name, 'TXT', 'CH')
373 query.flags &= ~dns.flags.RD
374 expectedResponse = dns.message.make_response(query)
375 expectedResponse.flags &= ~dns.flags.AA
376 rrset = dns.rrset.from_text(name,
377 60,
378 dns.rdataclass.CH,
379 dns.rdatatype.TXT,
380 '"chaos"')
381 expectedResponse.answer.append(rrset)
382
383 for method in ("sendUDPQuery", "sendTCPQuery"):
384 sender = getattr(self, method)
385 (_, receivedResponse) = sender(query, response=None, useQueue=False)
386 self.assertTrue(receivedResponse)
387 self.assertEqual(expectedResponse, receivedResponse)
388 self.assertEqual(receivedResponse.answer[0].ttl, 60)
389
eeaf2819
RG
390 def testSpoofRawANYAction(self):
391 """
392 Spoofing: Spoof a HINFO response for ANY queries
393 """
394 name = 'raw-any.spoofing.tests.powerdns.com.'
395
396 query = dns.message.make_query(name, 'ANY', 'IN')
397 query.flags &= ~dns.flags.RD
398 expectedResponse = dns.message.make_response(query)
399 expectedResponse.flags &= ~dns.flags.AA
400 rrset = dns.rrset.from_text(name,
401 60,
402 dns.rdataclass.IN,
403 dns.rdatatype.HINFO,
404 '"rfc8482" ""')
405 expectedResponse.answer.append(rrset)
406
407 for method in ("sendUDPQuery", "sendTCPQuery"):
408 sender = getattr(self, method)
409 (_, receivedResponse) = sender(query, response=None, useQueue=False)
410 self.assertTrue(receivedResponse)
411 self.assertEqual(expectedResponse, receivedResponse)
412 self.assertEqual(receivedResponse.answer[0].ttl, 60)
6bb404b6 413
b85487c3
SH
414 def testSpoofRawActionMulti(self):
415 """
416 Spoofing: Spoof a response from several raw bytes
417 """
418 name = 'multiraw.spoofing.tests.powerdns.com.'
419
420 # A
421 query = dns.message.make_query(name, 'A', 'IN')
422 query.flags &= ~dns.flags.RD
423 expectedResponse = dns.message.make_response(query)
424 expectedResponse.flags &= ~dns.flags.AA
425 rrset = dns.rrset.from_text(name,
426 60,
427 dns.rdataclass.IN,
428 dns.rdatatype.A,
429 '192.0.2.1', '192.0.2.2')
430 expectedResponse.answer.append(rrset)
431
432 for method in ("sendUDPQuery", "sendTCPQuery"):
433 sender = getattr(self, method)
434 (_, receivedResponse) = sender(query, response=None, useQueue=False)
435 self.assertTrue(receivedResponse)
4bfebc93
CH
436 self.assertEqual(expectedResponse, receivedResponse)
437 self.assertEqual(receivedResponse.answer[0].ttl, 60)
b85487c3
SH
438
439 # TXT
440 query = dns.message.make_query(name, 'TXT', 'IN')
441 query.flags &= ~dns.flags.RD
442 expectedResponse = dns.message.make_response(query)
443 expectedResponse.flags &= ~dns.flags.AA
444 rrset = dns.rrset.from_text(name,
445 60,
446 dns.rdataclass.IN,
447 dns.rdatatype.TXT,
448 '"aaa" "bbbb"', '"ccccccccccc"')
449 expectedResponse.answer.append(rrset)
450
451 for method in ("sendUDPQuery", "sendTCPQuery"):
452 sender = getattr(self, method)
453 (_, receivedResponse) = sender(query, response=None, useQueue=False)
454 self.assertTrue(receivedResponse)
4bfebc93
CH
455 self.assertEqual(expectedResponse, receivedResponse)
456 self.assertEqual(receivedResponse.answer[0].ttl, 60)
b85487c3 457
903853f4
RG
458class TestSpoofingLuaSpoof(DNSDistTest):
459
460 _config_template = """
461 function spoof1rule(dq)
462 if(dq.qtype==1) -- A
463 then
614239d0 464 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
903853f4
RG
465 elseif(dq.qtype == 28) -- AAAA
466 then
467 return DNSAction.Spoof, "2001:DB8::1"
468 else
469 return DNSAction.None, ""
470 end
471 end
202c4ab9 472
903853f4
RG
473 function spoof2rule(dq)
474 return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
475 end
202c4ab9 476
cc35f43b
RG
477 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.TXT)}, SpoofRawAction("\\003aaa\\004bbbb\\011ccccccccccc"))
478 addAction(AndRule{SuffixMatchNodeRule("raw.spoofing.tests.powerdns.com"), QTypeRule(DNSQType.SRV)}, SpoofRawAction("\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000", { aa=true, ttl=3600 }))
202c4ab9
RG
479
480 function spoofrawrule(dq)
481 if dq.qtype == DNSQType.A then
482 return DNSAction.SpoofRaw, "\\192\\000\\002\\001"
483 elseif dq.qtype == DNSQType.TXT then
484 return DNSAction.SpoofRaw, "\\003aaa\\004bbbb\\011ccccccccccc"
485 elseif dq.qtype == DNSQType.SRV then
486 dq.dh:setAA(true)
487 return DNSAction.SpoofRaw, "\\000\\000\\000\\000\\255\\255\\003srv\\008powerdns\\003com\\000"
488 end
489 return DNSAction.None, ""
490 end
491
a2ff35e3
RG
492 addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
493 addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
202c4ab9 494 addAction("lua-raw.spoofing.tests.powerdns.com.", LuaAction(spoofrawrule))
903853f4
RG
495 newServer{address="127.0.0.1:%s"}
496 """
497
498 def testLuaSpoofA(self):
499 """
500 Spoofing: Spoofing an A via Lua
501
502 Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
503 check that dnsdist sends a spoofed result.
504 """
505 name = 'luaspoof1.spoofing.tests.powerdns.com.'
506 query = dns.message.make_query(name, 'A', 'IN')
507 # dnsdist set RA = RD for spoofed responses
508 query.flags &= ~dns.flags.RD
509 expectedResponse = dns.message.make_response(query)
510 rrset = dns.rrset.from_text(name,
511 60,
512 dns.rdataclass.IN,
513 dns.rdatatype.A,
614239d0 514 '192.0.2.1', '192.0.2.2')
903853f4
RG
515 expectedResponse.answer.append(rrset)
516
6ca2e796
RG
517 for method in ("sendUDPQuery", "sendTCPQuery"):
518 sender = getattr(self, method)
519 (_, receivedResponse) = sender(query, response=None, useQueue=False)
520 self.assertTrue(receivedResponse)
4bfebc93 521 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
522
523 def testLuaSpoofAAAA(self):
524 """
525 Spoofing: Spoofing an AAAA via Lua
526
527 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
528 check that dnsdist sends a spoofed result.
529 """
530 name = 'luaspoof1.spoofing.tests.powerdns.com.'
531 query = dns.message.make_query(name, 'AAAA', 'IN')
532 # dnsdist set RA = RD for spoofed responses
533 query.flags &= ~dns.flags.RD
534 expectedResponse = dns.message.make_response(query)
535 rrset = dns.rrset.from_text(name,
536 60,
537 dns.rdataclass.IN,
538 dns.rdatatype.AAAA,
539 '2001:DB8::1')
540 expectedResponse.answer.append(rrset)
541
6ca2e796
RG
542 for method in ("sendUDPQuery", "sendTCPQuery"):
543 sender = getattr(self, method)
544 (_, receivedResponse) = sender(query, response=None, useQueue=False)
545 self.assertTrue(receivedResponse)
4bfebc93 546 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
547
548 def testLuaSpoofAWithCNAME(self):
549 """
550 Spoofing: Spoofing an A with a CNAME via Lua
551
552 Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
553 check that dnsdist sends a spoofed result.
554 """
555 name = 'luaspoof2.spoofing.tests.powerdns.com.'
556 query = dns.message.make_query(name, 'A', 'IN')
557 # dnsdist set RA = RD for spoofed responses
558 query.flags &= ~dns.flags.RD
559 expectedResponse = dns.message.make_response(query)
560 rrset = dns.rrset.from_text(name,
561 60,
562 dns.rdataclass.IN,
563 dns.rdatatype.CNAME,
564 'spoofedcname.spoofing.tests.powerdns.com.')
565 expectedResponse.answer.append(rrset)
566
6ca2e796
RG
567 for method in ("sendUDPQuery", "sendTCPQuery"):
568 sender = getattr(self, method)
569 (_, receivedResponse) = sender(query, response=None, useQueue=False)
570 self.assertTrue(receivedResponse)
4bfebc93 571 self.assertEqual(expectedResponse, receivedResponse)
903853f4
RG
572
573 def testLuaSpoofAAAAWithCNAME(self):
574 """
575 Spoofing: Spoofing an AAAA with a CNAME via Lua
576
577 Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
578 check that dnsdist sends a spoofed result.
579 """
580 name = 'luaspoof2.spoofing.tests.powerdns.com.'
581 query = dns.message.make_query(name, 'AAAA', 'IN')
582 # dnsdist set RA = RD for spoofed responses
583 query.flags &= ~dns.flags.RD
584 expectedResponse = dns.message.make_response(query)
585 rrset = dns.rrset.from_text(name,
586 60,
587 dns.rdataclass.IN,
588 dns.rdatatype.CNAME,
589 'spoofedcname.spoofing.tests.powerdns.com.')
590 expectedResponse.answer.append(rrset)
591
6ca2e796
RG
592 for method in ("sendUDPQuery", "sendTCPQuery"):
593 sender = getattr(self, method)
594 (_, receivedResponse) = sender(query, response=None, useQueue=False)
595 self.assertTrue(receivedResponse)
4bfebc93 596 self.assertEqual(expectedResponse, receivedResponse)
86baa8df 597
202c4ab9
RG
598 def testLuaSpoofRawAction(self):
599 """
600 Spoofing: Spoof a response from raw bytes via Lua
601 """
602 name = 'lua-raw.spoofing.tests.powerdns.com.'
603
604 # A
605 query = dns.message.make_query(name, 'A', 'IN')
606 query.flags &= ~dns.flags.RD
607 expectedResponse = dns.message.make_response(query)
608 expectedResponse.flags &= ~dns.flags.AA
609 rrset = dns.rrset.from_text(name,
610 60,
611 dns.rdataclass.IN,
612 dns.rdatatype.A,
613 '192.0.2.1')
614 expectedResponse.answer.append(rrset)
615
616 for method in ("sendUDPQuery", "sendTCPQuery"):
617 sender = getattr(self, method)
618 (_, receivedResponse) = sender(query, response=None, useQueue=False)
619 self.assertTrue(receivedResponse)
4bfebc93
CH
620 self.assertEqual(expectedResponse, receivedResponse)
621 self.assertEqual(receivedResponse.answer[0].ttl, 60)
202c4ab9
RG
622
623 # TXT
624 query = dns.message.make_query(name, 'TXT', 'IN')
625 query.flags &= ~dns.flags.RD
626 expectedResponse = dns.message.make_response(query)
627 expectedResponse.flags &= ~dns.flags.AA
628 rrset = dns.rrset.from_text(name,
629 60,
630 dns.rdataclass.IN,
631 dns.rdatatype.TXT,
632 '"aaa" "bbbb" "ccccccccccc"')
633 expectedResponse.answer.append(rrset)
634
635 for method in ("sendUDPQuery", "sendTCPQuery"):
636 sender = getattr(self, method)
637 (_, receivedResponse) = sender(query, response=None, useQueue=False)
638 self.assertTrue(receivedResponse)
4bfebc93
CH
639 self.assertEqual(expectedResponse, receivedResponse)
640 self.assertEqual(receivedResponse.answer[0].ttl, 60)
202c4ab9
RG
641
642 # SRV
643 query = dns.message.make_query(name, 'SRV', 'IN')
644 query.flags &= ~dns.flags.RD
645 expectedResponse = dns.message.make_response(query)
646 # this one should have the AA flag set
647 expectedResponse.flags |= dns.flags.AA
648 rrset = dns.rrset.from_text(name,
649 3600,
650 dns.rdataclass.IN,
651 dns.rdatatype.SRV,
652 '0 0 65535 srv.powerdns.com.')
653 expectedResponse.answer.append(rrset)
654
655 for method in ("sendUDPQuery", "sendTCPQuery"):
4efa18f1
SH
656 sender = getattr(self, method)
657 (_, receivedResponse) = sender(query, response=None, useQueue=False)
658 self.assertTrue(receivedResponse)
4bfebc93 659 self.assertEqual(expectedResponse, receivedResponse)
4efa18f1 660 # sorry, we can't set the TTL from the Lua API right now
4bfebc93 661 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
4efa18f1
SH
662
663class TestSpoofingLuaSpoofMulti(DNSDistTest):
664
665 _config_template = """
666 function spoof1multirule(dq)
667 if(dq.qtype==1) -- A
668 then
669 dq:spoof({ newCA("192.0.2.1"), newCA("192.0.2.2") })
670 return DNSAction.HeaderModify
671 elseif(dq.qtype == 28) -- AAAA
672 then
673 dq:spoof({ newCA("2001:DB8::1"), newCA("2001:DB8::2") })
674 return DNSAction.HeaderModify
675 else
676 return DNSAction.None, ""
677 end
678 end
679
680 function spoofrawmultirule(dq)
681 if dq.qtype == DNSQType.A then
682 dq:spoof({ "\\192\\000\\002\\001", "\\192\\000\\002\\002" })
683 return DNSAction.HeaderModify
684 elseif dq.qtype == DNSQType.TXT then
685 dq:spoof({ "\\003aaa\\004bbbb", "\\011ccccccccccc" })
686 return DNSAction.HeaderModify
687 elseif dq.qtype == DNSQType.SRV then
688 dq.dh:setAA(true)
689 dq:spoof({ "\\000\\000\\000\\000\\255\\255\\004srv1\\008powerdns\\003com\\000","\\000\\000\\000\\000\\255\\255\\004srv2\\008powerdns\\003com\\000" })
690 return DNSAction.HeaderModify
691 end
692 return DNSAction.None, ""
693 end
694
695 addAction("luaspoof1multi.spoofing.tests.powerdns.com.", LuaAction(spoof1multirule))
696 addAction("lua-raw-multi.spoofing.tests.powerdns.com.", LuaAction(spoofrawmultirule))
697 newServer{address="127.0.0.1:%s"}
698 """
699
700 def testLuaSpoofMultiA(self):
701 """
702 Spoofing: Spoofing multiple A via Lua dq:spoof
703
704 Send an A query to "luaspoof1multi.spoofing.tests.powerdns.com.",
705 check that dnsdist sends a spoofed result.
706 """
707 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
708 query = dns.message.make_query(name, 'A', 'IN')
709 # dnsdist set RA = RD for spoofed responses
710 query.flags &= ~dns.flags.RD
711 expectedResponse = dns.message.make_response(query)
712 rrset = dns.rrset.from_text(name,
713 60,
714 dns.rdataclass.IN,
715 dns.rdatatype.A,
716 '192.0.2.1', '192.0.2.2')
717 expectedResponse.answer.append(rrset)
718
719 for method in ("sendUDPQuery", "sendTCPQuery"):
720 sender = getattr(self, method)
721 (_, receivedResponse) = sender(query, response=None, useQueue=False)
722 self.assertTrue(receivedResponse)
4bfebc93 723 self.assertEqual(expectedResponse, receivedResponse)
4efa18f1
SH
724
725 def testLuaSpoofMultiAAAA(self):
726 """
727 Spoofing: Spoofing multiple AAAA via Lua dq:spoof
728
729 Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
730 check that dnsdist sends a spoofed result.
731 """
732 name = 'luaspoof1multi.spoofing.tests.powerdns.com.'
733 query = dns.message.make_query(name, 'AAAA', 'IN')
734 # dnsdist set RA = RD for spoofed responses
735 query.flags &= ~dns.flags.RD
736 expectedResponse = dns.message.make_response(query)
737 rrset = dns.rrset.from_text(name,
738 60,
739 dns.rdataclass.IN,
740 dns.rdatatype.AAAA,
741 '2001:DB8::1', '2001:DB8::2')
742 expectedResponse.answer.append(rrset)
743
744 for method in ("sendUDPQuery", "sendTCPQuery"):
745 sender = getattr(self, method)
746 (_, receivedResponse) = sender(query, response=None, useQueue=False)
747 self.assertTrue(receivedResponse)
4bfebc93 748 self.assertEqual(expectedResponse, receivedResponse)
4efa18f1
SH
749
750 def testLuaSpoofMultiRawAction(self):
751 """
752 Spoofing: Spoof responses from raw bytes via Lua dq:spoof
753 """
754 name = 'lua-raw-multi.spoofing.tests.powerdns.com.'
755
756 # A
757 query = dns.message.make_query(name, 'A', 'IN')
758 query.flags &= ~dns.flags.RD
759 expectedResponse = dns.message.make_response(query)
760 expectedResponse.flags &= ~dns.flags.AA
761 rrset = dns.rrset.from_text(name,
762 60,
763 dns.rdataclass.IN,
764 dns.rdatatype.A,
765 '192.0.2.1', '192.0.2.2')
766 expectedResponse.answer.append(rrset)
767
768 for method in ("sendUDPQuery", "sendTCPQuery"):
769 sender = getattr(self, method)
770 (_, receivedResponse) = sender(query, response=None, useQueue=False)
771 self.assertTrue(receivedResponse)
4bfebc93
CH
772 self.assertEqual(expectedResponse, receivedResponse)
773 self.assertEqual(receivedResponse.answer[0].ttl, 60)
4efa18f1
SH
774
775 # TXT
776 query = dns.message.make_query(name, 'TXT', 'IN')
777 query.flags &= ~dns.flags.RD
778 expectedResponse = dns.message.make_response(query)
779 expectedResponse.flags &= ~dns.flags.AA
780 rrset = dns.rrset.from_text(name,
781 60,
782 dns.rdataclass.IN,
783 dns.rdatatype.TXT,
784 '"aaa" "bbbb"', '"ccccccccccc"')
785 expectedResponse.answer.append(rrset)
786
787 for method in ("sendUDPQuery", "sendTCPQuery"):
788 sender = getattr(self, method)
789 (_, receivedResponse) = sender(query, response=None, useQueue=False)
790 self.assertTrue(receivedResponse)
4bfebc93
CH
791 self.assertEqual(expectedResponse, receivedResponse)
792 self.assertEqual(receivedResponse.answer[0].ttl, 60)
4efa18f1
SH
793
794 # SRV
795 query = dns.message.make_query(name, 'SRV', 'IN')
796 query.flags &= ~dns.flags.RD
797 expectedResponse = dns.message.make_response(query)
798 # this one should have the AA flag set
799 expectedResponse.flags |= dns.flags.AA
800 rrset = dns.rrset.from_text(name,
801 3600,
802 dns.rdataclass.IN,
803 dns.rdatatype.SRV,
804 '0 0 65535 srv1.powerdns.com.', '0 0 65535 srv2.powerdns.com.')
805 expectedResponse.answer.append(rrset)
806
807 for method in ("sendUDPQuery", "sendTCPQuery"):
202c4ab9
RG
808 sender = getattr(self, method)
809 (_, receivedResponse) = sender(query, response=None, useQueue=False)
810 self.assertTrue(receivedResponse)
4bfebc93 811 self.assertEqual(expectedResponse, receivedResponse)
202c4ab9 812 # sorry, we can't set the TTL from the Lua API right now
4bfebc93 813 #self.assertEqual(receivedResponse.answer[0].ttl, 3600)
202c4ab9 814
4b9274b6
RG
815class TestSpoofingLuaFFISpoofMulti(DNSDistTest):
816
817 _config_template = """
818 local ffi = require("ffi")
819
820 function spoofrawmultirule(dq)
821 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
822
823 if qtype == DNSQType.A then
824 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
825
826 local str = "\\192\\000\\002\\001"
827 records[0].size = #str
7285a166 828 records[0].value = str
4b9274b6
RG
829
830 local str = "\\192\\000\\002\\255"
7285a166 831 records[1].value = str
4b9274b6
RG
832 records[1].size = #str
833
834 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
835 return DNSAction.HeaderModify
836 elseif qtype == DNSQType.TXT then
837 local records = ffi.new("dnsdist_ffi_raw_value_t [2]")
838
839 local str = "\\033this text has a comma at the end,"
840 records[0].size = #str
7285a166 841 records[0].value = str
4b9274b6
RG
842
843 local str = "\\003aaa\\004bbbb"
844 records[1].size = #str
7285a166 845 records[1].value = str
4b9274b6
RG
846
847 ffi.C.dnsdist_ffi_dnsquestion_spoof_raw(dq, records, 2)
848 return DNSAction.HeaderModify
849 end
850
851 return DNSAction.None, ""
852 end
853
854 addAction("lua-raw-multi.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofrawmultirule))
855 newServer{address="127.0.0.1:%s"}
856 """
857 _verboseMode = True
858
859 def testLuaSpoofMultiRawAction(self):
860 """
861 Spoofing via Lua FFI: Spoof responses from raw bytes via Lua FFI
862 """
863 name = 'lua-raw-multi.ffi-spoofing.tests.powerdns.com.'
864
865 # A
866 query = dns.message.make_query(name, 'A', 'IN')
867 query.flags &= ~dns.flags.RD
868 expectedResponse = dns.message.make_response(query)
869 expectedResponse.flags &= ~dns.flags.AA
870 rrset = dns.rrset.from_text(name,
871 60,
872 dns.rdataclass.IN,
873 dns.rdatatype.A,
874 '192.0.2.1', '192.0.2.255')
875 expectedResponse.answer.append(rrset)
876
877 for method in ("sendUDPQuery", "sendTCPQuery"):
878 sender = getattr(self, method)
879 (_, receivedResponse) = sender(query, response=None, useQueue=False)
880 self.assertTrue(receivedResponse)
881 self.assertEqual(expectedResponse, receivedResponse)
882 self.assertEqual(receivedResponse.answer[0].ttl, 60)
883
884 # TXT
885 query = dns.message.make_query(name, 'TXT', 'IN')
886 query.flags &= ~dns.flags.RD
887 expectedResponse = dns.message.make_response(query)
888 expectedResponse.flags &= ~dns.flags.AA
889 rrset = dns.rrset.from_text(name,
890 60,
891 dns.rdataclass.IN,
892 dns.rdatatype.TXT,
893 '"this text has a comma at the end,"', '"aaa" "bbbb"')
894 expectedResponse.answer.append(rrset)
895
896 for method in ("sendUDPQuery", "sendTCPQuery"):
897 sender = getattr(self, method)
898 (_, receivedResponse) = sender(query, response=None, useQueue=False)
899 self.assertTrue(receivedResponse)
900 self.assertEqual(expectedResponse, receivedResponse)
901 self.assertEqual(receivedResponse.answer[0].ttl, 60)
902
86baa8df
RG
903class TestSpoofingLuaWithStatistics(DNSDistTest):
904
905 _config_template = """
906 function spoof1rule(dq)
907 queriesCount = getStatisticsCounters()['queries']
908 if(queriesCount == 1) then
909 return DNSAction.Spoof, "192.0.2.1"
910 elseif(queriesCount == 2) then
911 return DNSAction.Spoof, "192.0.2.2"
912 else
913 return DNSAction.Spoof, "192.0.2.0"
914 end
915 end
a2ff35e3 916 addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
86baa8df
RG
917 newServer{address="127.0.0.1:%s"}
918 """
919
920 def testLuaSpoofBasedOnStatistics(self):
921 """
922 Spoofing: Spoofing an A via Lua based on statistics counters
923
924 """
925 name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
926 query = dns.message.make_query(name, 'A', 'IN')
927 # dnsdist set RA = RD for spoofed responses
928 query.flags &= ~dns.flags.RD
929 expectedResponse1 = dns.message.make_response(query)
930 rrset = dns.rrset.from_text(name,
931 60,
932 dns.rdataclass.IN,
933 dns.rdatatype.A,
934 '192.0.2.1')
935 expectedResponse1.answer.append(rrset)
936 expectedResponse2 = dns.message.make_response(query)
937 rrset = dns.rrset.from_text(name,
938 60,
939 dns.rdataclass.IN,
940 dns.rdatatype.A,
941 '192.0.2.2')
942 expectedResponse2.answer.append(rrset)
943 expectedResponseAfterwards = dns.message.make_response(query)
944 rrset = dns.rrset.from_text(name,
945 60,
946 dns.rdataclass.IN,
947 dns.rdatatype.A,
948 '192.0.2.0')
949 expectedResponseAfterwards.answer.append(rrset)
950
951 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
952 self.assertTrue(receivedResponse)
4bfebc93 953 self.assertEqual(expectedResponse1, receivedResponse)
86baa8df
RG
954
955 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
956 self.assertTrue(receivedResponse)
4bfebc93 957 self.assertEqual(expectedResponse2, receivedResponse)
86baa8df 958
6ca2e796
RG
959 for method in ("sendUDPQuery", "sendTCPQuery"):
960 sender = getattr(self, method)
961 (_, receivedResponse) = sender(query, response=None, useQueue=False)
962 self.assertTrue(receivedResponse)
4bfebc93 963 self.assertEqual(expectedResponseAfterwards, receivedResponse)
1907b330
CHB
964
965class TestSpoofingLuaSpoofPacket(DNSDistTest):
966
967 _config_template = """
1907b330
CHB
968
969 function spoofpacket(dq)
0d9e5508
CHB
970 if dq.qtype == DNSQType.A then
971 return DNSAction.SpoofPacket, "\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
972 end
973 return DNSAction.None, ""
974 end
975
976 addAction("lua-raw-packet.spoofing.tests.powerdns.com.", LuaAction(spoofpacket))
977 local rawResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\019rule\\045lua\\045raw\\045packet\\008spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
cc35f43b 978 addAction(AndRule{QTypeRule(DNSQType.A), SuffixMatchNodeRule("rule-lua-raw-packet.spoofing.tests.powerdns.com.")}, SpoofPacketAction(rawResponse, string.len(rawResponse)))
0d9e5508
CHB
979
980 local ffi = require("ffi")
981
982 function spoofpacketffi(dq)
1907b330 983 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
0d9e5508
CHB
984 if qtype == DNSQType.A then
985 -- REFUSED answer
986 local refusedResponse="\\000\\000\\129\\133\\000\\001\\000\\000\\000\\000\\000\\000\\014lua\\045raw\\045packet\\012ffi\\045spoofing\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001"
987
988 ffi.C.dnsdist_ffi_dnsquestion_spoof_packet(dq, refusedResponse, string.len(refusedResponse))
989 return DNSAction.HeaderModify
990 end
991 return DNSAction.None, ""
1907b330
CHB
992 end
993
0d9e5508 994 addAction("lua-raw-packet.ffi-spoofing.tests.powerdns.com.", LuaFFIAction(spoofpacketffi))
1907b330
CHB
995 newServer{address="127.0.0.1:%s"}
996 """
997 _verboseMode = True
998
0d9e5508 999 def testLuaSpoofPacket(self):
1907b330
CHB
1000 """
1001 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1002 """
0d9e5508 1003 for name in ('lua-raw-packet.spoofing.tests.powerdns.com.', 'rule-lua-raw-packet.spoofing.tests.powerdns.com.'):
1907b330 1004
0d9e5508
CHB
1005 query = dns.message.make_query(name, 'A', 'IN')
1006 expectedResponse = dns.message.make_response(query)
1007 expectedResponse.flags |= dns.flags.RA
1008 expectedResponse.set_rcode(dns.rcode.REFUSED)
1907b330 1009
0d9e5508
CHB
1010 for method in ("sendUDPQuery", "sendTCPQuery"):
1011 sender = getattr(self, method)
1012 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1013 self.assertTrue(receivedResponse)
1014 self.assertEqual(expectedResponse, receivedResponse)
1907b330 1015
0d9e5508 1016 def testLuaFFISpoofPacket(self):
1907b330 1017 """
0d9e5508 1018 Spoofing via Lua FFI: Spoof raw response via Lua FFI
1907b330 1019 """
0d9e5508 1020 name = 'lua-raw-packet.ffi-spoofing.tests.powerdns.com.'
1907b330
CHB
1021
1022 #
1023 query = dns.message.make_query(name, 'A', 'IN')
1024 expectedResponse = dns.message.make_response(query)
1025 expectedResponse.flags |= dns.flags.RA
1026 expectedResponse.set_rcode(dns.rcode.REFUSED)
1027
1028 for method in ("sendUDPQuery", "sendTCPQuery"):
1029 sender = getattr(self, method)
1030 (_, receivedResponse) = sender(query, response=None, useQueue=False)
1031 self.assertTrue(receivedResponse)
1032 self.assertEqual(expectedResponse, receivedResponse)