]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class EDNSOptionsBase(DNSDistTest
):
8 _ednsTestFunction
= """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
59 return DNSAction.None, ""
64 class TestEDNSOptions(EDNSOptionsBase
):
66 _config_template
= """
69 addAction(AllRule(), LuaAction(testEDNSOptions))
71 newServer{address="127.0.0.1:%s"}
73 _config_params
= ['_ednsTestFunction', '_testServerPort']
75 def testWithoutEDNS(self
):
79 name
= 'noedns.ednsoptions.tests.powerdns.com.'
80 query
= dns
.message
.make_query(name
, 'A', 'IN')
81 response
= dns
.message
.make_response(query
)
82 rrset
= dns
.rrset
.from_text(name
,
87 response
.answer
.append(rrset
)
89 for method
in ("sendUDPQuery", "sendTCPQuery"):
90 sender
= getattr(self
, method
)
91 (receivedQuery
, receivedResponse
) = sender(query
, response
)
92 self
.assertTrue(receivedQuery
)
93 self
.assertTrue(receivedResponse
)
94 receivedQuery
.id = query
.id
95 self
.assertEquals(receivedQuery
, query
)
96 self
.assertEquals(receivedResponse
, response
)
102 name
= 'cookie.ednsoptions.tests.powerdns.com.'
103 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
104 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
105 response
= dns
.message
.make_response(query
)
106 rrset
= dns
.rrset
.from_text(name
,
111 response
.answer
.append(rrset
)
113 for method
in ("sendUDPQuery", "sendTCPQuery"):
114 sender
= getattr(self
, method
)
115 (receivedQuery
, receivedResponse
) = sender(query
, response
)
116 self
.assertTrue(receivedQuery
)
117 self
.assertTrue(receivedResponse
)
118 receivedQuery
.id = query
.id
119 self
.assertEquals(receivedQuery
, query
)
120 self
.assertEquals(receivedResponse
, response
)
126 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
127 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
128 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
129 response
= dns
.message
.make_response(query
)
130 rrset
= dns
.rrset
.from_text(name
,
135 response
.answer
.append(rrset
)
137 for method
in ("sendUDPQuery", "sendTCPQuery"):
138 sender
= getattr(self
, method
)
139 (receivedQuery
, receivedResponse
) = sender(query
, response
)
140 self
.assertTrue(receivedQuery
)
141 self
.assertTrue(receivedResponse
)
142 receivedQuery
.id = query
.id
143 self
.assertEquals(receivedQuery
, query
)
144 self
.assertEquals(receivedResponse
, response
)
150 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
151 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
152 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
153 response
= dns
.message
.make_response(query
)
154 rrset
= dns
.rrset
.from_text(name
,
159 response
.answer
.append(rrset
)
161 for method
in ("sendUDPQuery", "sendTCPQuery"):
162 sender
= getattr(self
, method
)
163 (receivedQuery
, receivedResponse
) = sender(query
, response
)
164 self
.assertTrue(receivedQuery
)
165 self
.assertTrue(receivedResponse
)
166 receivedQuery
.id = query
.id
167 self
.assertEquals(receivedQuery
, query
)
168 self
.assertEquals(receivedResponse
, response
)
170 def testECS6Cookie(self
):
172 EDNS Options: Cookie + ECS6
174 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
175 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
176 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
177 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
178 response
= dns
.message
.make_response(query
)
179 rrset
= dns
.rrset
.from_text(name
,
184 response
.answer
.append(rrset
)
186 for method
in ("sendUDPQuery", "sendTCPQuery"):
187 sender
= getattr(self
, method
)
188 (receivedQuery
, receivedResponse
) = sender(query
, response
)
189 self
.assertTrue(receivedQuery
)
190 self
.assertTrue(receivedResponse
)
191 receivedQuery
.id = query
.id
192 self
.assertEquals(receivedQuery
, query
)
193 self
.assertEquals(receivedResponse
, response
)
195 def testMultiCookiesECS6(self
):
197 EDNS Options: Two Cookies + ECS6
199 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
200 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
201 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
202 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
203 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
204 response
= dns
.message
.make_response(query
)
205 rrset
= dns
.rrset
.from_text(name
,
210 response
.answer
.append(rrset
)
212 for method
in ("sendUDPQuery", "sendTCPQuery"):
213 sender
= getattr(self
, method
)
214 (receivedQuery
, receivedResponse
) = sender(query
, response
)
215 self
.assertTrue(receivedQuery
)
216 self
.assertTrue(receivedResponse
)
217 receivedQuery
.id = query
.id
218 self
.assertEquals(receivedQuery
, query
)
219 self
.assertEquals(receivedResponse
, response
)
221 class TestEDNSOptionsAddingECS(EDNSOptionsBase
):
223 _config_template
= """
226 addAction(AllRule(), LuaAction(testEDNSOptions))
228 newServer{address="127.0.0.1:%s", useClientSubnet=true}
230 _config_params
= ['_ednsTestFunction', '_testServerPort']
232 def testWithoutEDNS(self
):
234 EDNS Options: No EDNS (adding ECS)
236 name
= 'noedns.ednsoptions-ecs.tests.powerdns.com.'
237 query
= dns
.message
.make_query(name
, 'A', 'IN')
238 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
239 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
240 response
= dns
.message
.make_response(query
)
241 rrset
= dns
.rrset
.from_text(name
,
246 response
.answer
.append(rrset
)
248 for method
in ("sendUDPQuery", "sendTCPQuery"):
249 sender
= getattr(self
, method
)
250 (receivedQuery
, receivedResponse
) = sender(query
, response
)
251 self
.assertTrue(receivedQuery
)
252 self
.assertTrue(receivedResponse
)
253 receivedQuery
.id = expectedQuery
.id
254 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
255 self
.checkResponseNoEDNS(response
, receivedResponse
)
257 def testCookie(self
):
259 EDNS Options: Cookie (adding ECS)
261 name
= 'cookie.ednsoptions-ecs.tests.powerdns.com.'
262 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
263 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[eco
])
264 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
265 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[eco
,ecso
], payload
=512)
266 response
= dns
.message
.make_response(query
)
267 rrset
= dns
.rrset
.from_text(name
,
272 response
.answer
.append(rrset
)
274 for method
in ("sendUDPQuery", "sendTCPQuery"):
275 sender
= getattr(self
, method
)
276 (receivedQuery
, receivedResponse
) = sender(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = expectedQuery
.id
280 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
281 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
285 EDNS Options: ECS4 (adding ECS)
287 name
= 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
288 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
289 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
290 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24, scope
=24)
291 response
= dns
.message
.make_response(query
)
292 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
293 rrset
= dns
.rrset
.from_text(name
,
298 response
.answer
.append(rrset
)
300 for method
in ("sendUDPQuery", "sendTCPQuery"):
301 sender
= getattr(self
, method
)
302 (receivedQuery
, receivedResponse
) = sender(query
, response
)
303 self
.assertTrue(receivedQuery
)
304 self
.assertTrue(receivedResponse
)
305 receivedQuery
.id = query
.id
306 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
307 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
311 EDNS Options: ECS6 (adding ECS)
313 name
= 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
314 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
315 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
316 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
317 response
= dns
.message
.make_response(query
)
318 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
319 rrset
= dns
.rrset
.from_text(name
,
324 response
.answer
.append(rrset
)
326 for method
in ("sendUDPQuery", "sendTCPQuery"):
327 sender
= getattr(self
, method
)
328 (receivedQuery
, receivedResponse
) = sender(query
, response
)
329 self
.assertTrue(receivedQuery
)
330 self
.assertTrue(receivedResponse
)
331 receivedQuery
.id = query
.id
332 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
333 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
335 def testECS6Cookie(self
):
337 EDNS Options: Cookie + ECS6 (adding ECS)
339 name
= 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
340 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
341 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
342 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
343 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
344 response
= dns
.message
.make_response(query
)
345 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
346 rrset
= dns
.rrset
.from_text(name
,
351 response
.answer
.append(rrset
)
353 for method
in ("sendUDPQuery", "sendTCPQuery"):
354 sender
= getattr(self
, method
)
355 (receivedQuery
, receivedResponse
) = sender(query
, response
)
356 self
.assertTrue(receivedQuery
)
357 self
.assertTrue(receivedResponse
)
358 receivedQuery
.id = query
.id
359 self
.checkQueryEDNSWithECS(query
, receivedQuery
, 1)
360 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
362 def testMultiCookiesECS6(self
):
364 EDNS Options: Two Cookies + ECS6
366 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
367 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
368 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
369 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
370 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
371 response
= dns
.message
.make_response(query
)
372 rrset
= dns
.rrset
.from_text(name
,
377 response
.answer
.append(rrset
)
379 for method
in ("sendUDPQuery", "sendTCPQuery"):
380 sender
= getattr(self
, method
)
381 (receivedQuery
, receivedResponse
) = sender(query
, response
)
382 self
.assertTrue(receivedQuery
)
383 self
.assertTrue(receivedResponse
)
384 receivedQuery
.id = query
.id
385 self
.assertEquals(receivedQuery
, query
)
386 self
.assertEquals(receivedResponse
, response
)
388 class TestEDNSOptionsLuaFFI(DNSDistTest
):
390 _config_template
= """
391 local ffi = require("ffi")
393 function testEDNSOptions(dq)
394 local options_ptr = ffi.new("const dnsdist_ffi_ednsoption_t *[1]")
395 local ret_ptr_param = ffi.cast("const dnsdist_ffi_ednsoption_t **", options_ptr)
397 local options_count = tonumber(ffi.C.dnsdist_ffi_dnsquestion_get_edns_options(dq, ret_ptr_param))
399 local qname_ptr = ffi.new("const char *[1]")
400 local qname_ptr_param = ffi.cast("const char **", qname_ptr)
401 local qname_size = ffi.new("size_t[1]")
402 local qname_size_param = ffi.cast("size_t*", qname_size)
403 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, qname_ptr_param, qname_size_param)
404 local qname = ffi.string(qname_ptr[0])
406 if string.match(qname, 'noedns') then
407 if options_count ~= 0 then
408 local str = "192.0.2.255"
409 local buf = ffi.new("char[?]", #str + 1)
411 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
412 return DNSAction.Spoof
416 local cookies_count = 0
419 local first_cookie_index = -1
420 local last_cookie_index = -1
421 if options_count > 0 then
422 for idx = 0, options_count-1 do
423 if options_ptr[0][idx].optionCode == EDNSOptionCode.COOKIE then
424 cookies_count = cookies_count + 1
425 if first_cookie_index == -1 then
426 first_cookie_index = idx
428 last_cookie_index = idx
429 elseif options_ptr[0][idx].optionCode == EDNSOptionCode.ECS then
430 ecs_count = ecs_count + 1
436 if string.match(qname, 'multiplecookies') then
437 if cookies_count == 0 then
438 local str = "192.0.2.1"
439 local buf = ffi.new("char[?]", #str + 1)
441 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
442 return DNSAction.Spoof
444 if cookies_count ~= 2 then
445 local str = "192.0.2.2"
446 local buf = ffi.new("char[?]", #str + 1)
448 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
449 return DNSAction.Spoof
451 if options_ptr[0][first_cookie_index].len ~= 16 then
452 local str = "192.0.2.3"
453 local buf = ffi.new("char[?]", #str + 1)
455 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
456 return DNSAction.Spoof
458 if options_ptr[0][last_cookie_index].len ~= 16 then
459 local str = "192.0.2.4"
460 local buf = ffi.new("char[?]", #str + 1)
462 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
463 return DNSAction.Spoof
465 elseif string.match(qname, 'cookie') then
467 if cookies_count == 0 then
468 local str = "192.0.2.1"
469 local buf = ffi.new("char[?]", #str + 1)
471 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
472 return DNSAction.Spoof
474 if cookies_count ~= 1 or options_ptr[0][first_cookie_index].len ~= 16 then
475 local str = "192.0.2.2"
476 local buf = ffi.new("char[?]", #str + 1)
478 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
479 return DNSAction.Spoof
483 if string.match(qname, 'ecs4') then
484 if ecs_count == 0 then
485 local str = "192.0.2.51"
486 local buf = ffi.new("char[?]", #str + 1)
488 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
489 return DNSAction.Spoof
492 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 8 then
493 local str = "192.0.2.52"
494 local buf = ffi.new("char[?]", #str + 1)
496 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
497 return DNSAction.Spoof
501 if string.match(qname, 'ecs6') then
502 if ecs_count == 0 then
503 local str = "192.0.2.101"
504 local buf = ffi.new("char[?]", #str + 1)
506 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
507 return DNSAction.Spoof
509 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 20 then
510 local str = "192.0.2.102"
511 local buf = ffi.new("char[?]", #str + 1)
513 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
514 return DNSAction.Spoof
518 return DNSAction.None
522 addAction(AllRule(), LuaFFIAction(testEDNSOptions))
524 newServer{address="127.0.0.1:%s"}
527 def testWithoutEDNSFFI(self
):
529 EDNS Options: No EDNS (FFI)
531 name
= 'noedns.ednsoptions.tests.powerdns.com.'
532 query
= dns
.message
.make_query(name
, 'A', 'IN')
533 response
= dns
.message
.make_response(query
)
534 rrset
= dns
.rrset
.from_text(name
,
539 response
.answer
.append(rrset
)
541 for method
in ("sendUDPQuery", "sendTCPQuery"):
542 sender
= getattr(self
, method
)
543 (receivedQuery
, receivedResponse
) = sender(query
, response
)
544 self
.assertTrue(receivedQuery
)
545 self
.assertTrue(receivedResponse
)
546 receivedQuery
.id = query
.id
547 self
.assertEquals(receivedQuery
, query
)
548 self
.assertEquals(receivedResponse
, response
)
550 def testCookieFFI(self
):
552 EDNS Options: Cookie (FFI)
554 name
= 'cookie.ednsoptions.tests.powerdns.com.'
555 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
556 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
557 response
= dns
.message
.make_response(query
)
558 rrset
= dns
.rrset
.from_text(name
,
563 response
.answer
.append(rrset
)
565 for method
in ("sendUDPQuery", "sendTCPQuery"):
566 sender
= getattr(self
, method
)
567 (receivedQuery
, receivedResponse
) = sender(query
, response
)
568 self
.assertTrue(receivedQuery
)
569 self
.assertTrue(receivedResponse
)
570 receivedQuery
.id = query
.id
571 self
.assertEquals(receivedQuery
, query
)
572 self
.assertEquals(receivedResponse
, response
)
574 def testECS4FFI(self
):
576 EDNS Options: ECS4 (FFI)
578 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
579 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
580 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
581 response
= dns
.message
.make_response(query
)
582 rrset
= dns
.rrset
.from_text(name
,
587 response
.answer
.append(rrset
)
589 for method
in ("sendUDPQuery", "sendTCPQuery"):
590 sender
= getattr(self
, method
)
591 (receivedQuery
, receivedResponse
) = sender(query
, response
)
592 self
.assertTrue(receivedQuery
)
593 self
.assertTrue(receivedResponse
)
594 receivedQuery
.id = query
.id
595 self
.assertEquals(receivedQuery
, query
)
596 self
.assertEquals(receivedResponse
, response
)
598 def testECS6FFI(self
):
600 EDNS Options: ECS6 (FFI)
602 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
603 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
604 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
605 response
= dns
.message
.make_response(query
)
606 rrset
= dns
.rrset
.from_text(name
,
611 response
.answer
.append(rrset
)
613 for method
in ("sendUDPQuery", "sendTCPQuery"):
614 sender
= getattr(self
, method
)
615 (receivedQuery
, receivedResponse
) = sender(query
, response
)
616 self
.assertTrue(receivedQuery
)
617 self
.assertTrue(receivedResponse
)
618 receivedQuery
.id = query
.id
619 self
.assertEquals(receivedQuery
, query
)
620 self
.assertEquals(receivedResponse
, response
)
622 def testECS6CookieFFI(self
):
624 EDNS Options: Cookie + ECS6 (FFI)
626 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
627 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
628 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
629 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
630 response
= dns
.message
.make_response(query
)
631 rrset
= dns
.rrset
.from_text(name
,
636 response
.answer
.append(rrset
)
638 for method
in ("sendUDPQuery", "sendTCPQuery"):
639 sender
= getattr(self
, method
)
640 (receivedQuery
, receivedResponse
) = sender(query
, response
)
641 self
.assertTrue(receivedQuery
)
642 self
.assertTrue(receivedResponse
)
643 receivedQuery
.id = query
.id
644 self
.assertEquals(receivedQuery
, query
)
645 self
.assertEquals(receivedResponse
, response
)
647 def testMultiCookiesECS6FFI(self
):
649 EDNS Options: Two Cookies + ECS6 (FFI)
651 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
652 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
653 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
654 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
655 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
656 response
= dns
.message
.make_response(query
)
657 rrset
= dns
.rrset
.from_text(name
,
662 response
.answer
.append(rrset
)
664 for method
in ("sendUDPQuery", "sendTCPQuery"):
665 sender
= getattr(self
, method
)
666 (receivedQuery
, receivedResponse
) = sender(query
, response
)
667 self
.assertTrue(receivedQuery
)
668 self
.assertTrue(receivedResponse
)
669 receivedQuery
.id = query
.id
670 self
.assertEquals(receivedQuery
, query
)
671 self
.assertEquals(receivedResponse
, response
)