]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
Merge pull request #9236 from omoerbeek/cachecleaner-cleanup
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSOptions.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class EDNSOptionsBase(DNSDistTest):
8 _ednsTestFunction = """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
12
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
16 end
17 end
18
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
22 end
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
25 end
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
28 end
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
31 end
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
35 end
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
38 end
39 end
40
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
44 end
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
47 end
48 end
49
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
53 end
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
56 end
57 end
58
59 return DNSAction.None, ""
60
61 end
62 """
63
64 class TestEDNSOptions(EDNSOptionsBase):
65
66 _config_template = """
67 %s
68
69 addAction(AllRule(), LuaAction(testEDNSOptions))
70
71 newServer{address="127.0.0.1:%s"}
72 """
73 _config_params = ['_ednsTestFunction', '_testServerPort']
74
75 def testWithoutEDNS(self):
76 """
77 EDNS Options: No EDNS
78 """
79 name = 'noedns.ednsoptions.tests.powerdns.com.'
80 query = dns.message.make_query(name, 'A', 'IN')
81 response = dns.message.make_response(query)
82 rrset = dns.rrset.from_text(name,
83 3600,
84 dns.rdataclass.IN,
85 dns.rdatatype.A,
86 '192.0.2.255')
87 response.answer.append(rrset)
88
89 for method in ("sendUDPQuery", "sendTCPQuery"):
90 sender = getattr(self, method)
91 (receivedQuery, receivedResponse) = sender(query, response)
92 self.assertTrue(receivedQuery)
93 self.assertTrue(receivedResponse)
94 receivedQuery.id = query.id
95 self.assertEquals(receivedQuery, query)
96 self.assertEquals(receivedResponse, response)
97
98 def testCookie(self):
99 """
100 EDNS Options: Cookie
101 """
102 name = 'cookie.ednsoptions.tests.powerdns.com.'
103 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
104 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
105 response = dns.message.make_response(query)
106 rrset = dns.rrset.from_text(name,
107 3600,
108 dns.rdataclass.IN,
109 dns.rdatatype.A,
110 '127.0.0.1')
111 response.answer.append(rrset)
112
113 for method in ("sendUDPQuery", "sendTCPQuery"):
114 sender = getattr(self, method)
115 (receivedQuery, receivedResponse) = sender(query, response)
116 self.assertTrue(receivedQuery)
117 self.assertTrue(receivedResponse)
118 receivedQuery.id = query.id
119 self.assertEquals(receivedQuery, query)
120 self.assertEquals(receivedResponse, response)
121
122 def testECS4(self):
123 """
124 EDNS Options: ECS4
125 """
126 name = 'ecs4.ednsoptions.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
128 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
129 response = dns.message.make_response(query)
130 rrset = dns.rrset.from_text(name,
131 3600,
132 dns.rdataclass.IN,
133 dns.rdatatype.A,
134 '127.0.0.1')
135 response.answer.append(rrset)
136
137 for method in ("sendUDPQuery", "sendTCPQuery"):
138 sender = getattr(self, method)
139 (receivedQuery, receivedResponse) = sender(query, response)
140 self.assertTrue(receivedQuery)
141 self.assertTrue(receivedResponse)
142 receivedQuery.id = query.id
143 self.assertEquals(receivedQuery, query)
144 self.assertEquals(receivedResponse, response)
145
146 def testECS6(self):
147 """
148 EDNS Options: ECS6
149 """
150 name = 'ecs6.ednsoptions.tests.powerdns.com.'
151 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
152 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
153 response = dns.message.make_response(query)
154 rrset = dns.rrset.from_text(name,
155 3600,
156 dns.rdataclass.IN,
157 dns.rdatatype.A,
158 '127.0.0.1')
159 response.answer.append(rrset)
160
161 for method in ("sendUDPQuery", "sendTCPQuery"):
162 sender = getattr(self, method)
163 (receivedQuery, receivedResponse) = sender(query, response)
164 self.assertTrue(receivedQuery)
165 self.assertTrue(receivedResponse)
166 receivedQuery.id = query.id
167 self.assertEquals(receivedQuery, query)
168 self.assertEquals(receivedResponse, response)
169
170 def testECS6Cookie(self):
171 """
172 EDNS Options: Cookie + ECS6
173 """
174 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
175 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
176 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
177 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
178 response = dns.message.make_response(query)
179 rrset = dns.rrset.from_text(name,
180 3600,
181 dns.rdataclass.IN,
182 dns.rdatatype.A,
183 '127.0.0.1')
184 response.answer.append(rrset)
185
186 for method in ("sendUDPQuery", "sendTCPQuery"):
187 sender = getattr(self, method)
188 (receivedQuery, receivedResponse) = sender(query, response)
189 self.assertTrue(receivedQuery)
190 self.assertTrue(receivedResponse)
191 receivedQuery.id = query.id
192 self.assertEquals(receivedQuery, query)
193 self.assertEquals(receivedResponse, response)
194
195 def testMultiCookiesECS6(self):
196 """
197 EDNS Options: Two Cookies + ECS6
198 """
199 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
200 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
201 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
202 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
203 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
204 response = dns.message.make_response(query)
205 rrset = dns.rrset.from_text(name,
206 3600,
207 dns.rdataclass.IN,
208 dns.rdatatype.A,
209 '127.0.0.1')
210 response.answer.append(rrset)
211
212 for method in ("sendUDPQuery", "sendTCPQuery"):
213 sender = getattr(self, method)
214 (receivedQuery, receivedResponse) = sender(query, response)
215 self.assertTrue(receivedQuery)
216 self.assertTrue(receivedResponse)
217 receivedQuery.id = query.id
218 self.assertEquals(receivedQuery, query)
219 self.assertEquals(receivedResponse, response)
220
221 class TestEDNSOptionsAddingECS(EDNSOptionsBase):
222
223 _config_template = """
224 %s
225
226 addAction(AllRule(), LuaAction(testEDNSOptions))
227
228 newServer{address="127.0.0.1:%s", useClientSubnet=true}
229 """
230 _config_params = ['_ednsTestFunction', '_testServerPort']
231
232 def testWithoutEDNS(self):
233 """
234 EDNS Options: No EDNS (adding ECS)
235 """
236 name = 'noedns.ednsoptions-ecs.tests.powerdns.com.'
237 query = dns.message.make_query(name, 'A', 'IN')
238 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
239 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
240 response = dns.message.make_response(query)
241 rrset = dns.rrset.from_text(name,
242 3600,
243 dns.rdataclass.IN,
244 dns.rdatatype.A,
245 '127.0.0.1')
246 response.answer.append(rrset)
247
248 for method in ("sendUDPQuery", "sendTCPQuery"):
249 sender = getattr(self, method)
250 (receivedQuery, receivedResponse) = sender(query, response)
251 self.assertTrue(receivedQuery)
252 self.assertTrue(receivedResponse)
253 receivedQuery.id = expectedQuery.id
254 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
255 self.checkResponseNoEDNS(response, receivedResponse)
256
257 def testCookie(self):
258 """
259 EDNS Options: Cookie (adding ECS)
260 """
261 name = 'cookie.ednsoptions-ecs.tests.powerdns.com.'
262 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
263 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[eco])
264 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
265 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512)
266 response = dns.message.make_response(query)
267 rrset = dns.rrset.from_text(name,
268 3600,
269 dns.rdataclass.IN,
270 dns.rdatatype.A,
271 '127.0.0.1')
272 response.answer.append(rrset)
273
274 for method in ("sendUDPQuery", "sendTCPQuery"):
275 sender = getattr(self, method)
276 (receivedQuery, receivedResponse) = sender(query, response)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = expectedQuery.id
280 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
281 self.checkResponseEDNSWithoutECS(response, receivedResponse)
282
283 def testECS4(self):
284 """
285 EDNS Options: ECS4 (adding ECS)
286 """
287 name = 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
288 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
289 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
290 ecsoResponse = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24, scope=24)
291 response = dns.message.make_response(query)
292 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
293 rrset = dns.rrset.from_text(name,
294 3600,
295 dns.rdataclass.IN,
296 dns.rdatatype.A,
297 '127.0.0.1')
298 response.answer.append(rrset)
299
300 for method in ("sendUDPQuery", "sendTCPQuery"):
301 sender = getattr(self, method)
302 (receivedQuery, receivedResponse) = sender(query, response)
303 self.assertTrue(receivedQuery)
304 self.assertTrue(receivedResponse)
305 receivedQuery.id = query.id
306 self.checkQueryEDNSWithECS(query, receivedQuery)
307 self.checkResponseEDNSWithECS(response, receivedResponse)
308
309 def testECS6(self):
310 """
311 EDNS Options: ECS6 (adding ECS)
312 """
313 name = 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
314 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
315 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
316 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
317 response = dns.message.make_response(query)
318 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
319 rrset = dns.rrset.from_text(name,
320 3600,
321 dns.rdataclass.IN,
322 dns.rdatatype.A,
323 '127.0.0.1')
324 response.answer.append(rrset)
325
326 for method in ("sendUDPQuery", "sendTCPQuery"):
327 sender = getattr(self, method)
328 (receivedQuery, receivedResponse) = sender(query, response)
329 self.assertTrue(receivedQuery)
330 self.assertTrue(receivedResponse)
331 receivedQuery.id = query.id
332 self.checkQueryEDNSWithECS(query, receivedQuery)
333 self.checkResponseEDNSWithECS(response, receivedResponse)
334
335 def testECS6Cookie(self):
336 """
337 EDNS Options: Cookie + ECS6 (adding ECS)
338 """
339 name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
340 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
341 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
342 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
343 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
344 response = dns.message.make_response(query)
345 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
346 rrset = dns.rrset.from_text(name,
347 3600,
348 dns.rdataclass.IN,
349 dns.rdatatype.A,
350 '127.0.0.1')
351 response.answer.append(rrset)
352
353 for method in ("sendUDPQuery", "sendTCPQuery"):
354 sender = getattr(self, method)
355 (receivedQuery, receivedResponse) = sender(query, response)
356 self.assertTrue(receivedQuery)
357 self.assertTrue(receivedResponse)
358 receivedQuery.id = query.id
359 self.checkQueryEDNSWithECS(query, receivedQuery, 1)
360 self.checkResponseEDNSWithECS(response, receivedResponse)
361
362 def testMultiCookiesECS6(self):
363 """
364 EDNS Options: Two Cookies + ECS6
365 """
366 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
367 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
368 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
369 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
370 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
371 response = dns.message.make_response(query)
372 rrset = dns.rrset.from_text(name,
373 3600,
374 dns.rdataclass.IN,
375 dns.rdatatype.A,
376 '127.0.0.1')
377 response.answer.append(rrset)
378
379 for method in ("sendUDPQuery", "sendTCPQuery"):
380 sender = getattr(self, method)
381 (receivedQuery, receivedResponse) = sender(query, response)
382 self.assertTrue(receivedQuery)
383 self.assertTrue(receivedResponse)
384 receivedQuery.id = query.id
385 self.assertEquals(receivedQuery, query)
386 self.assertEquals(receivedResponse, response)
387
388 class TestEDNSOptionsLuaFFI(DNSDistTest):
389
390 _config_template = """
391 local ffi = require("ffi")
392
393 function testEDNSOptions(dq)
394 local options_ptr = ffi.new("const dnsdist_ffi_ednsoption_t *[1]")
395 local ret_ptr_param = ffi.cast("const dnsdist_ffi_ednsoption_t **", options_ptr)
396
397 local options_count = tonumber(ffi.C.dnsdist_ffi_dnsquestion_get_edns_options(dq, ret_ptr_param))
398
399 local qname_ptr = ffi.new("const char *[1]")
400 local qname_ptr_param = ffi.cast("const char **", qname_ptr)
401 local qname_size = ffi.new("size_t[1]")
402 local qname_size_param = ffi.cast("size_t*", qname_size)
403 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, qname_ptr_param, qname_size_param)
404 local qname = ffi.string(qname_ptr[0])
405
406 if string.match(qname, 'noedns') then
407 if options_count ~= 0 then
408 local str = "192.0.2.255"
409 local buf = ffi.new("char[?]", #str + 1)
410 ffi.copy(buf, str)
411 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
412 return DNSAction.Spoof
413 end
414 end
415
416 local cookies_count = 0
417 local ecs_count = 0
418 local ecs_index = -1
419 local first_cookie_index = -1
420 local last_cookie_index = -1
421 if options_count > 0 then
422 for idx = 0, options_count-1 do
423 if options_ptr[0][idx].optionCode == EDNSOptionCode.COOKIE then
424 cookies_count = cookies_count + 1
425 if first_cookie_index == -1 then
426 first_cookie_index = idx
427 end
428 last_cookie_index = idx
429 elseif options_ptr[0][idx].optionCode == EDNSOptionCode.ECS then
430 ecs_count = ecs_count + 1
431 ecs_index = idx
432 end
433 end
434 end
435
436 if string.match(qname, 'multiplecookies') then
437 if cookies_count == 0 then
438 local str = "192.0.2.1"
439 local buf = ffi.new("char[?]", #str + 1)
440 ffi.copy(buf, str)
441 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
442 return DNSAction.Spoof
443 end
444 if cookies_count ~= 2 then
445 local str = "192.0.2.2"
446 local buf = ffi.new("char[?]", #str + 1)
447 ffi.copy(buf, str)
448 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
449 return DNSAction.Spoof
450 end
451 if options_ptr[0][first_cookie_index].len ~= 16 then
452 local str = "192.0.2.3"
453 local buf = ffi.new("char[?]", #str + 1)
454 ffi.copy(buf, str)
455 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
456 return DNSAction.Spoof
457 end
458 if options_ptr[0][last_cookie_index].len ~= 16 then
459 local str = "192.0.2.4"
460 local buf = ffi.new("char[?]", #str + 1)
461 ffi.copy(buf, str)
462 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
463 return DNSAction.Spoof
464 end
465 elseif string.match(qname, 'cookie') then
466
467 if cookies_count == 0 then
468 local str = "192.0.2.1"
469 local buf = ffi.new("char[?]", #str + 1)
470 ffi.copy(buf, str)
471 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
472 return DNSAction.Spoof
473 end
474 if cookies_count ~= 1 or options_ptr[0][first_cookie_index].len ~= 16 then
475 local str = "192.0.2.2"
476 local buf = ffi.new("char[?]", #str + 1)
477 ffi.copy(buf, str)
478 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
479 return DNSAction.Spoof
480 end
481 end
482
483 if string.match(qname, 'ecs4') then
484 if ecs_count == 0 then
485 local str = "192.0.2.51"
486 local buf = ffi.new("char[?]", #str + 1)
487 ffi.copy(buf, str)
488 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
489 return DNSAction.Spoof
490 end
491
492 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 8 then
493 local str = "192.0.2.52"
494 local buf = ffi.new("char[?]", #str + 1)
495 ffi.copy(buf, str)
496 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
497 return DNSAction.Spoof
498 end
499 end
500
501 if string.match(qname, 'ecs6') then
502 if ecs_count == 0 then
503 local str = "192.0.2.101"
504 local buf = ffi.new("char[?]", #str + 1)
505 ffi.copy(buf, str)
506 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
507 return DNSAction.Spoof
508 end
509 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 20 then
510 local str = "192.0.2.102"
511 local buf = ffi.new("char[?]", #str + 1)
512 ffi.copy(buf, str)
513 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
514 return DNSAction.Spoof
515 end
516 end
517
518 return DNSAction.None
519
520 end
521
522 addAction(AllRule(), LuaFFIAction(testEDNSOptions))
523
524 newServer{address="127.0.0.1:%s"}
525 """
526
527 def testWithoutEDNSFFI(self):
528 """
529 EDNS Options: No EDNS (FFI)
530 """
531 name = 'noedns.ednsoptions.tests.powerdns.com.'
532 query = dns.message.make_query(name, 'A', 'IN')
533 response = dns.message.make_response(query)
534 rrset = dns.rrset.from_text(name,
535 3600,
536 dns.rdataclass.IN,
537 dns.rdatatype.A,
538 '192.0.2.255')
539 response.answer.append(rrset)
540
541 for method in ("sendUDPQuery", "sendTCPQuery"):
542 sender = getattr(self, method)
543 (receivedQuery, receivedResponse) = sender(query, response)
544 self.assertTrue(receivedQuery)
545 self.assertTrue(receivedResponse)
546 receivedQuery.id = query.id
547 self.assertEquals(receivedQuery, query)
548 self.assertEquals(receivedResponse, response)
549
550 def testCookieFFI(self):
551 """
552 EDNS Options: Cookie (FFI)
553 """
554 name = 'cookie.ednsoptions.tests.powerdns.com.'
555 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
556 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
557 response = dns.message.make_response(query)
558 rrset = dns.rrset.from_text(name,
559 3600,
560 dns.rdataclass.IN,
561 dns.rdatatype.A,
562 '127.0.0.1')
563 response.answer.append(rrset)
564
565 for method in ("sendUDPQuery", "sendTCPQuery"):
566 sender = getattr(self, method)
567 (receivedQuery, receivedResponse) = sender(query, response)
568 self.assertTrue(receivedQuery)
569 self.assertTrue(receivedResponse)
570 receivedQuery.id = query.id
571 self.assertEquals(receivedQuery, query)
572 self.assertEquals(receivedResponse, response)
573
574 def testECS4FFI(self):
575 """
576 EDNS Options: ECS4 (FFI)
577 """
578 name = 'ecs4.ednsoptions.tests.powerdns.com.'
579 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
580 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
581 response = dns.message.make_response(query)
582 rrset = dns.rrset.from_text(name,
583 3600,
584 dns.rdataclass.IN,
585 dns.rdatatype.A,
586 '127.0.0.1')
587 response.answer.append(rrset)
588
589 for method in ("sendUDPQuery", "sendTCPQuery"):
590 sender = getattr(self, method)
591 (receivedQuery, receivedResponse) = sender(query, response)
592 self.assertTrue(receivedQuery)
593 self.assertTrue(receivedResponse)
594 receivedQuery.id = query.id
595 self.assertEquals(receivedQuery, query)
596 self.assertEquals(receivedResponse, response)
597
598 def testECS6FFI(self):
599 """
600 EDNS Options: ECS6 (FFI)
601 """
602 name = 'ecs6.ednsoptions.tests.powerdns.com.'
603 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
604 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
605 response = dns.message.make_response(query)
606 rrset = dns.rrset.from_text(name,
607 3600,
608 dns.rdataclass.IN,
609 dns.rdatatype.A,
610 '127.0.0.1')
611 response.answer.append(rrset)
612
613 for method in ("sendUDPQuery", "sendTCPQuery"):
614 sender = getattr(self, method)
615 (receivedQuery, receivedResponse) = sender(query, response)
616 self.assertTrue(receivedQuery)
617 self.assertTrue(receivedResponse)
618 receivedQuery.id = query.id
619 self.assertEquals(receivedQuery, query)
620 self.assertEquals(receivedResponse, response)
621
622 def testECS6CookieFFI(self):
623 """
624 EDNS Options: Cookie + ECS6 (FFI)
625 """
626 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
627 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
628 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
629 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
630 response = dns.message.make_response(query)
631 rrset = dns.rrset.from_text(name,
632 3600,
633 dns.rdataclass.IN,
634 dns.rdatatype.A,
635 '127.0.0.1')
636 response.answer.append(rrset)
637
638 for method in ("sendUDPQuery", "sendTCPQuery"):
639 sender = getattr(self, method)
640 (receivedQuery, receivedResponse) = sender(query, response)
641 self.assertTrue(receivedQuery)
642 self.assertTrue(receivedResponse)
643 receivedQuery.id = query.id
644 self.assertEquals(receivedQuery, query)
645 self.assertEquals(receivedResponse, response)
646
647 def testMultiCookiesECS6FFI(self):
648 """
649 EDNS Options: Two Cookies + ECS6 (FFI)
650 """
651 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
652 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
653 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
654 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
655 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
656 response = dns.message.make_response(query)
657 rrset = dns.rrset.from_text(name,
658 3600,
659 dns.rdataclass.IN,
660 dns.rdatatype.A,
661 '127.0.0.1')
662 response.answer.append(rrset)
663
664 for method in ("sendUDPQuery", "sendTCPQuery"):
665 sender = getattr(self, method)
666 (receivedQuery, receivedResponse) = sender(query, response)
667 self.assertTrue(receivedQuery)
668 self.assertTrue(receivedResponse)
669 receivedQuery.id = query.id
670 self.assertEquals(receivedQuery, query)
671 self.assertEquals(receivedResponse, response)