]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class EDNSOptionsBase(DNSDistTest
):
8 _ednsTestFunction
= """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = tostring(dq.qname)
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
59 return DNSAction.None, ""
63 function testEDNSOptionsInResponses(dr)
64 local options = dr:getEDNSOptions()
65 local qname = tostring(dr.qname)
67 if string.match(qname, 'multiplecookies') then
68 return DNSAction.None, ""
69 elseif string.match(qname, 'cookie') then
70 if options[EDNSOptionCode.COOKIE] == nil then
71 return DNSAction.Spoof, "192.0.2.1"
73 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
74 return DNSAction.Spoof, "192.0.2.2"
78 if string.match(qname, 'ecs4') then
79 if options[EDNSOptionCode.ECS] == nil then
80 return DNSAction.Spoof, "192.0.2.51"
82 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 7 then
83 return DNSAction.Spoof, "192.0.2.52"
87 if string.match(qname, 'ecs6') then
88 if options[EDNSOptionCode.ECS] == nil then
89 return DNSAction.Spoof, "192.0.2.101"
91 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 11 then
92 return DNSAction.Spoof, "192.0.2.102"
96 return DNSAction.None, ""
101 class TestEDNSOptions(EDNSOptionsBase
):
103 _config_template
= """
106 addAction(AllRule(), LuaAction(testEDNSOptions))
108 newServer{address="127.0.0.1:%s"}
110 _config_params
= ['_ednsTestFunction', '_testServerPort']
112 def testWithoutEDNS(self
):
114 EDNS Options: No EDNS
116 name
= 'noedns.ednsoptions.tests.powerdns.com.'
117 query
= dns
.message
.make_query(name
, 'A', 'IN')
118 response
= dns
.message
.make_response(query
)
119 rrset
= dns
.rrset
.from_text(name
,
124 response
.answer
.append(rrset
)
126 for method
in ("sendUDPQuery", "sendTCPQuery"):
127 sender
= getattr(self
, method
)
128 (receivedQuery
, receivedResponse
) = sender(query
, response
)
129 self
.assertTrue(receivedQuery
)
130 self
.assertTrue(receivedResponse
)
131 receivedQuery
.id = query
.id
132 self
.assertEqual(receivedQuery
, query
)
133 self
.assertEqual(receivedResponse
, response
)
135 def testCookie(self
):
139 name
= 'cookie.ednsoptions.tests.powerdns.com.'
140 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
141 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
142 response
= dns
.message
.make_response(query
)
143 rrset
= dns
.rrset
.from_text(name
,
148 response
.answer
.append(rrset
)
150 for method
in ("sendUDPQuery", "sendTCPQuery"):
151 sender
= getattr(self
, method
)
152 (receivedQuery
, receivedResponse
) = sender(query
, response
)
153 self
.assertTrue(receivedQuery
)
154 self
.assertTrue(receivedResponse
)
155 receivedQuery
.id = query
.id
156 self
.assertEqual(receivedQuery
, query
)
157 self
.assertEqual(receivedResponse
, response
)
163 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
164 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
165 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
166 response
= dns
.message
.make_response(query
)
167 rrset
= dns
.rrset
.from_text(name
,
172 response
.answer
.append(rrset
)
174 for method
in ("sendUDPQuery", "sendTCPQuery"):
175 sender
= getattr(self
, method
)
176 (receivedQuery
, receivedResponse
) = sender(query
, response
)
177 self
.assertTrue(receivedQuery
)
178 self
.assertTrue(receivedResponse
)
179 receivedQuery
.id = query
.id
180 self
.assertEqual(receivedQuery
, query
)
181 self
.assertEqual(receivedResponse
, response
)
187 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
188 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
189 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
190 response
= dns
.message
.make_response(query
)
191 rrset
= dns
.rrset
.from_text(name
,
196 response
.answer
.append(rrset
)
198 for method
in ("sendUDPQuery", "sendTCPQuery"):
199 sender
= getattr(self
, method
)
200 (receivedQuery
, receivedResponse
) = sender(query
, response
)
201 self
.assertTrue(receivedQuery
)
202 self
.assertTrue(receivedResponse
)
203 receivedQuery
.id = query
.id
204 self
.assertEqual(receivedQuery
, query
)
205 self
.assertEqual(receivedResponse
, response
)
207 def testECS6Cookie(self
):
209 EDNS Options: Cookie + ECS6
211 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
212 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
213 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
214 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
215 response
= dns
.message
.make_response(query
)
216 rrset
= dns
.rrset
.from_text(name
,
221 response
.answer
.append(rrset
)
223 for method
in ("sendUDPQuery", "sendTCPQuery"):
224 sender
= getattr(self
, method
)
225 (receivedQuery
, receivedResponse
) = sender(query
, response
)
226 self
.assertTrue(receivedQuery
)
227 self
.assertTrue(receivedResponse
)
228 receivedQuery
.id = query
.id
229 self
.assertEqual(receivedQuery
, query
)
230 self
.assertEqual(receivedResponse
, response
)
232 def testMultiCookiesECS6(self
):
234 EDNS Options: Two Cookies + ECS6
236 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
237 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
238 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
239 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
240 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
241 response
= dns
.message
.make_response(query
)
242 rrset
= dns
.rrset
.from_text(name
,
247 response
.answer
.append(rrset
)
249 for method
in ("sendUDPQuery", "sendTCPQuery"):
250 sender
= getattr(self
, method
)
251 (receivedQuery
, receivedResponse
) = sender(query
, response
)
252 self
.assertTrue(receivedQuery
)
253 self
.assertTrue(receivedResponse
)
254 receivedQuery
.id = query
.id
255 self
.assertEqual(receivedQuery
, query
)
256 self
.assertEqual(receivedResponse
, response
)
258 class TestEDNSOptionsAddingECS(EDNSOptionsBase
):
260 _config_template
= """
263 addAction(AllRule(), LuaAction(testEDNSOptions))
264 addResponseAction("ednsoptions-ecs.tests.powerdns.com.", LuaResponseAction(testEDNSOptionsInResponses))
266 newServer{address="127.0.0.1:%s", useClientSubnet=true}
268 _config_params
= ['_ednsTestFunction', '_testServerPort']
270 def testWithoutEDNS(self
):
272 EDNS Options: No EDNS (adding ECS)
274 name
= 'noedns.ednsoptions-ecs.tests.powerdns.com.'
275 query
= dns
.message
.make_query(name
, 'A', 'IN')
276 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
277 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
278 response
= dns
.message
.make_response(query
)
279 rrset
= dns
.rrset
.from_text(name
,
284 response
.answer
.append(rrset
)
286 for method
in ("sendUDPQuery", "sendTCPQuery"):
287 sender
= getattr(self
, method
)
288 (receivedQuery
, receivedResponse
) = sender(query
, response
)
289 self
.assertTrue(receivedQuery
)
290 self
.assertTrue(receivedResponse
)
291 receivedQuery
.id = expectedQuery
.id
292 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
293 self
.checkResponseNoEDNS(response
, receivedResponse
)
295 def testCookie(self
):
297 EDNS Options: Cookie (adding ECS)
299 name
= 'cookie.ednsoptions-ecs.tests.powerdns.com.'
300 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
301 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[eco
])
302 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
303 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[eco
,ecso
], payload
=512)
304 response
= dns
.message
.make_response(query
)
305 rrset
= dns
.rrset
.from_text(name
,
310 response
.answer
.append(rrset
)
312 for method
in ("sendUDPQuery", "sendTCPQuery"):
313 sender
= getattr(self
, method
)
314 (receivedQuery
, receivedResponse
) = sender(query
, response
)
315 self
.assertTrue(receivedQuery
)
316 self
.assertTrue(receivedResponse
)
317 receivedQuery
.id = expectedQuery
.id
318 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
319 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
323 EDNS Options: ECS4 (adding ECS)
325 name
= 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
326 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
327 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
328 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24, scope
=24)
329 response
= dns
.message
.make_response(query
)
330 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
331 rrset
= dns
.rrset
.from_text(name
,
336 response
.answer
.append(rrset
)
338 for method
in ("sendUDPQuery", "sendTCPQuery"):
339 sender
= getattr(self
, method
)
340 (receivedQuery
, receivedResponse
) = sender(query
, response
)
341 self
.assertTrue(receivedQuery
)
342 self
.assertTrue(receivedResponse
)
343 receivedQuery
.id = query
.id
344 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
345 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
349 EDNS Options: ECS6 (adding ECS)
351 name
= 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
352 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
353 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
354 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
355 response
= dns
.message
.make_response(query
)
356 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
357 rrset
= dns
.rrset
.from_text(name
,
362 response
.answer
.append(rrset
)
364 for method
in ("sendUDPQuery", "sendTCPQuery"):
365 sender
= getattr(self
, method
)
366 (receivedQuery
, receivedResponse
) = sender(query
, response
)
367 self
.assertTrue(receivedQuery
)
368 self
.assertTrue(receivedResponse
)
369 receivedQuery
.id = query
.id
370 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
371 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
373 def testECS6Cookie(self
):
375 EDNS Options: Cookie + ECS6 (adding ECS)
377 name
= 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
378 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
379 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
380 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
381 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
382 response
= dns
.message
.make_response(query
)
383 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
384 rrset
= dns
.rrset
.from_text(name
,
389 response
.answer
.append(rrset
)
391 for method
in ("sendUDPQuery", "sendTCPQuery"):
392 sender
= getattr(self
, method
)
393 (receivedQuery
, receivedResponse
) = sender(query
, response
)
394 self
.assertTrue(receivedQuery
)
395 self
.assertTrue(receivedResponse
)
396 receivedQuery
.id = query
.id
397 self
.checkQueryEDNSWithECS(query
, receivedQuery
, 1)
398 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
400 def testMultiCookiesECS6(self
):
402 EDNS Options: Two Cookies + ECS6
404 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
405 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
406 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
407 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
408 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
409 response
= dns
.message
.make_response(query
)
410 rrset
= dns
.rrset
.from_text(name
,
415 response
.answer
.append(rrset
)
417 for method
in ("sendUDPQuery", "sendTCPQuery"):
418 sender
= getattr(self
, method
)
419 (receivedQuery
, receivedResponse
) = sender(query
, response
)
420 self
.assertTrue(receivedQuery
)
421 self
.assertTrue(receivedResponse
)
422 receivedQuery
.id = query
.id
423 self
.assertEqual(receivedQuery
, query
)
424 self
.assertEqual(receivedResponse
, response
)
426 class TestEDNSOptionsLuaFFI(DNSDistTest
):
428 _config_template
= """
429 local ffi = require("ffi")
431 function testEDNSOptions(dq)
432 local options_ptr = ffi.new("const dnsdist_ffi_ednsoption_t *[1]")
433 local ret_ptr_param = ffi.cast("const dnsdist_ffi_ednsoption_t **", options_ptr)
435 local options_count = tonumber(ffi.C.dnsdist_ffi_dnsquestion_get_edns_options(dq, ret_ptr_param))
437 local qname_ptr = ffi.new("const char *[1]")
438 local qname_ptr_param = ffi.cast("const char **", qname_ptr)
439 local qname_size = ffi.new("size_t[1]")
440 local qname_size_param = ffi.cast("size_t*", qname_size)
441 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, qname_ptr_param, qname_size_param)
442 local qname = ffi.string(qname_ptr[0])
444 if string.match(qname, 'noedns') then
445 if options_count ~= 0 then
446 local str = "192.0.2.255"
447 local buf = ffi.new("char[?]", #str + 1)
449 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
450 return DNSAction.Spoof
454 local cookies_count = 0
457 local first_cookie_index = -1
458 local last_cookie_index = -1
459 if options_count > 0 then
460 for idx = 0, options_count-1 do
461 if options_ptr[0][idx].optionCode == EDNSOptionCode.COOKIE then
462 cookies_count = cookies_count + 1
463 if first_cookie_index == -1 then
464 first_cookie_index = idx
466 last_cookie_index = idx
467 elseif options_ptr[0][idx].optionCode == EDNSOptionCode.ECS then
468 ecs_count = ecs_count + 1
474 if string.match(qname, 'multiplecookies') then
475 if cookies_count == 0 then
476 local str = "192.0.2.1"
477 local buf = ffi.new("char[?]", #str + 1)
479 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
480 return DNSAction.Spoof
482 if cookies_count ~= 2 then
483 local str = "192.0.2.2"
484 local buf = ffi.new("char[?]", #str + 1)
486 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
487 return DNSAction.Spoof
489 if options_ptr[0][first_cookie_index].len ~= 16 then
490 local str = "192.0.2.3"
491 local buf = ffi.new("char[?]", #str + 1)
493 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
494 return DNSAction.Spoof
496 if options_ptr[0][last_cookie_index].len ~= 16 then
497 local str = "192.0.2.4"
498 local buf = ffi.new("char[?]", #str + 1)
500 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
501 return DNSAction.Spoof
503 elseif string.match(qname, 'cookie') then
505 if cookies_count == 0 then
506 local str = "192.0.2.1"
507 local buf = ffi.new("char[?]", #str + 1)
509 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
510 return DNSAction.Spoof
512 if cookies_count ~= 1 or options_ptr[0][first_cookie_index].len ~= 16 then
513 local str = "192.0.2.2"
514 local buf = ffi.new("char[?]", #str + 1)
516 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
517 return DNSAction.Spoof
521 if string.match(qname, 'ecs4') then
522 if ecs_count == 0 then
523 local str = "192.0.2.51"
524 local buf = ffi.new("char[?]", #str + 1)
526 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
527 return DNSAction.Spoof
530 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 8 then
531 local str = "192.0.2.52"
532 local buf = ffi.new("char[?]", #str + 1)
534 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
535 return DNSAction.Spoof
539 if string.match(qname, 'ecs6') then
540 if ecs_count == 0 then
541 local str = "192.0.2.101"
542 local buf = ffi.new("char[?]", #str + 1)
544 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
545 return DNSAction.Spoof
547 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 20 then
548 local str = "192.0.2.102"
549 local buf = ffi.new("char[?]", #str + 1)
551 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
552 return DNSAction.Spoof
556 return DNSAction.None
560 addAction(AllRule(), LuaFFIAction(testEDNSOptions))
562 newServer{address="127.0.0.1:%s"}
565 def testWithoutEDNSFFI(self
):
567 EDNS Options: No EDNS (FFI)
569 name
= 'noedns.ednsoptions.tests.powerdns.com.'
570 query
= dns
.message
.make_query(name
, 'A', 'IN')
571 response
= dns
.message
.make_response(query
)
572 rrset
= dns
.rrset
.from_text(name
,
577 response
.answer
.append(rrset
)
579 for method
in ("sendUDPQuery", "sendTCPQuery"):
580 sender
= getattr(self
, method
)
581 (receivedQuery
, receivedResponse
) = sender(query
, response
)
582 self
.assertTrue(receivedQuery
)
583 self
.assertTrue(receivedResponse
)
584 receivedQuery
.id = query
.id
585 self
.assertEqual(receivedQuery
, query
)
586 self
.assertEqual(receivedResponse
, response
)
588 def testCookieFFI(self
):
590 EDNS Options: Cookie (FFI)
592 name
= 'cookie.ednsoptions.tests.powerdns.com.'
593 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
594 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
595 response
= dns
.message
.make_response(query
)
596 rrset
= dns
.rrset
.from_text(name
,
601 response
.answer
.append(rrset
)
603 for method
in ("sendUDPQuery", "sendTCPQuery"):
604 sender
= getattr(self
, method
)
605 (receivedQuery
, receivedResponse
) = sender(query
, response
)
606 self
.assertTrue(receivedQuery
)
607 self
.assertTrue(receivedResponse
)
608 receivedQuery
.id = query
.id
609 self
.assertEqual(receivedQuery
, query
)
610 self
.assertEqual(receivedResponse
, response
)
612 def testECS4FFI(self
):
614 EDNS Options: ECS4 (FFI)
616 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
617 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
618 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
619 response
= dns
.message
.make_response(query
)
620 rrset
= dns
.rrset
.from_text(name
,
625 response
.answer
.append(rrset
)
627 for method
in ("sendUDPQuery", "sendTCPQuery"):
628 sender
= getattr(self
, method
)
629 (receivedQuery
, receivedResponse
) = sender(query
, response
)
630 self
.assertTrue(receivedQuery
)
631 self
.assertTrue(receivedResponse
)
632 receivedQuery
.id = query
.id
633 self
.assertEqual(receivedQuery
, query
)
634 self
.assertEqual(receivedResponse
, response
)
636 def testECS6FFI(self
):
638 EDNS Options: ECS6 (FFI)
640 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
641 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
642 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
643 response
= dns
.message
.make_response(query
)
644 rrset
= dns
.rrset
.from_text(name
,
649 response
.answer
.append(rrset
)
651 for method
in ("sendUDPQuery", "sendTCPQuery"):
652 sender
= getattr(self
, method
)
653 (receivedQuery
, receivedResponse
) = sender(query
, response
)
654 self
.assertTrue(receivedQuery
)
655 self
.assertTrue(receivedResponse
)
656 receivedQuery
.id = query
.id
657 self
.assertEqual(receivedQuery
, query
)
658 self
.assertEqual(receivedResponse
, response
)
660 def testECS6CookieFFI(self
):
662 EDNS Options: Cookie + ECS6 (FFI)
664 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
665 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
666 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
667 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
668 response
= dns
.message
.make_response(query
)
669 rrset
= dns
.rrset
.from_text(name
,
674 response
.answer
.append(rrset
)
676 for method
in ("sendUDPQuery", "sendTCPQuery"):
677 sender
= getattr(self
, method
)
678 (receivedQuery
, receivedResponse
) = sender(query
, response
)
679 self
.assertTrue(receivedQuery
)
680 self
.assertTrue(receivedResponse
)
681 receivedQuery
.id = query
.id
682 self
.assertEqual(receivedQuery
, query
)
683 self
.assertEqual(receivedResponse
, response
)
685 def testMultiCookiesECS6FFI(self
):
687 EDNS Options: Two Cookies + ECS6 (FFI)
689 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
690 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
691 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
692 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
693 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
694 response
= dns
.message
.make_response(query
)
695 rrset
= dns
.rrset
.from_text(name
,
700 response
.answer
.append(rrset
)
702 for method
in ("sendUDPQuery", "sendTCPQuery"):
703 sender
= getattr(self
, method
)
704 (receivedQuery
, receivedResponse
) = sender(query
, response
)
705 self
.assertTrue(receivedQuery
)
706 self
.assertTrue(receivedResponse
)
707 receivedQuery
.id = query
.id
708 self
.assertEqual(receivedQuery
, query
)
709 self
.assertEqual(receivedResponse
, response
)