]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
Merge pull request #13567 from omoerbeek/rec-disable-sl-deprecated
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSOptions.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class EDNSOptionsBase(DNSDistTest):
8 _ednsTestFunction = """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = tostring(dq.qname)
12
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
16 end
17 end
18
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
22 end
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
25 end
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
28 end
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
31 end
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
35 end
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
38 end
39 end
40
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
44 end
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
47 end
48 end
49
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
53 end
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
56 end
57 end
58
59 return DNSAction.None, ""
60
61 end
62
63 function testEDNSOptionsInResponses(dr)
64 local options = dr:getEDNSOptions()
65 local qname = tostring(dr.qname)
66
67 if string.match(qname, 'multiplecookies') then
68 return DNSAction.None, ""
69 elseif string.match(qname, 'cookie') then
70 if options[EDNSOptionCode.COOKIE] == nil then
71 return DNSAction.Spoof, "192.0.2.1"
72 end
73 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
74 return DNSAction.Spoof, "192.0.2.2"
75 end
76 end
77
78 if string.match(qname, 'ecs4') then
79 if options[EDNSOptionCode.ECS] == nil then
80 return DNSAction.Spoof, "192.0.2.51"
81 end
82 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 7 then
83 return DNSAction.Spoof, "192.0.2.52"
84 end
85 end
86
87 if string.match(qname, 'ecs6') then
88 if options[EDNSOptionCode.ECS] == nil then
89 return DNSAction.Spoof, "192.0.2.101"
90 end
91 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 11 then
92 return DNSAction.Spoof, "192.0.2.102"
93 end
94 end
95
96 return DNSAction.None, ""
97
98 end
99 """
100
101 class TestEDNSOptions(EDNSOptionsBase):
102
103 _config_template = """
104 %s
105
106 addAction(AllRule(), LuaAction(testEDNSOptions))
107
108 newServer{address="127.0.0.1:%s"}
109 """
110 _config_params = ['_ednsTestFunction', '_testServerPort']
111
112 def testWithoutEDNS(self):
113 """
114 EDNS Options: No EDNS
115 """
116 name = 'noedns.ednsoptions.tests.powerdns.com.'
117 query = dns.message.make_query(name, 'A', 'IN')
118 response = dns.message.make_response(query)
119 rrset = dns.rrset.from_text(name,
120 3600,
121 dns.rdataclass.IN,
122 dns.rdatatype.A,
123 '192.0.2.255')
124 response.answer.append(rrset)
125
126 for method in ("sendUDPQuery", "sendTCPQuery"):
127 sender = getattr(self, method)
128 (receivedQuery, receivedResponse) = sender(query, response)
129 self.assertTrue(receivedQuery)
130 self.assertTrue(receivedResponse)
131 receivedQuery.id = query.id
132 self.assertEqual(receivedQuery, query)
133 self.assertEqual(receivedResponse, response)
134
135 def testCookie(self):
136 """
137 EDNS Options: Cookie
138 """
139 name = 'cookie.ednsoptions.tests.powerdns.com.'
140 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
141 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
142 response = dns.message.make_response(query)
143 rrset = dns.rrset.from_text(name,
144 3600,
145 dns.rdataclass.IN,
146 dns.rdatatype.A,
147 '127.0.0.1')
148 response.answer.append(rrset)
149
150 for method in ("sendUDPQuery", "sendTCPQuery"):
151 sender = getattr(self, method)
152 (receivedQuery, receivedResponse) = sender(query, response)
153 self.assertTrue(receivedQuery)
154 self.assertTrue(receivedResponse)
155 receivedQuery.id = query.id
156 self.assertEqual(receivedQuery, query)
157 self.assertEqual(receivedResponse, response)
158
159 def testECS4(self):
160 """
161 EDNS Options: ECS4
162 """
163 name = 'ecs4.ednsoptions.tests.powerdns.com.'
164 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
165 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
166 response = dns.message.make_response(query)
167 rrset = dns.rrset.from_text(name,
168 3600,
169 dns.rdataclass.IN,
170 dns.rdatatype.A,
171 '127.0.0.1')
172 response.answer.append(rrset)
173
174 for method in ("sendUDPQuery", "sendTCPQuery"):
175 sender = getattr(self, method)
176 (receivedQuery, receivedResponse) = sender(query, response)
177 self.assertTrue(receivedQuery)
178 self.assertTrue(receivedResponse)
179 receivedQuery.id = query.id
180 self.assertEqual(receivedQuery, query)
181 self.assertEqual(receivedResponse, response)
182
183 def testECS6(self):
184 """
185 EDNS Options: ECS6
186 """
187 name = 'ecs6.ednsoptions.tests.powerdns.com.'
188 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
189 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
190 response = dns.message.make_response(query)
191 rrset = dns.rrset.from_text(name,
192 3600,
193 dns.rdataclass.IN,
194 dns.rdatatype.A,
195 '127.0.0.1')
196 response.answer.append(rrset)
197
198 for method in ("sendUDPQuery", "sendTCPQuery"):
199 sender = getattr(self, method)
200 (receivedQuery, receivedResponse) = sender(query, response)
201 self.assertTrue(receivedQuery)
202 self.assertTrue(receivedResponse)
203 receivedQuery.id = query.id
204 self.assertEqual(receivedQuery, query)
205 self.assertEqual(receivedResponse, response)
206
207 def testECS6Cookie(self):
208 """
209 EDNS Options: Cookie + ECS6
210 """
211 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
212 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
213 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
214 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
215 response = dns.message.make_response(query)
216 rrset = dns.rrset.from_text(name,
217 3600,
218 dns.rdataclass.IN,
219 dns.rdatatype.A,
220 '127.0.0.1')
221 response.answer.append(rrset)
222
223 for method in ("sendUDPQuery", "sendTCPQuery"):
224 sender = getattr(self, method)
225 (receivedQuery, receivedResponse) = sender(query, response)
226 self.assertTrue(receivedQuery)
227 self.assertTrue(receivedResponse)
228 receivedQuery.id = query.id
229 self.assertEqual(receivedQuery, query)
230 self.assertEqual(receivedResponse, response)
231
232 def testMultiCookiesECS6(self):
233 """
234 EDNS Options: Two Cookies + ECS6
235 """
236 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
237 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
238 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
239 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
240 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
241 response = dns.message.make_response(query)
242 rrset = dns.rrset.from_text(name,
243 3600,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '127.0.0.1')
247 response.answer.append(rrset)
248
249 for method in ("sendUDPQuery", "sendTCPQuery"):
250 sender = getattr(self, method)
251 (receivedQuery, receivedResponse) = sender(query, response)
252 self.assertTrue(receivedQuery)
253 self.assertTrue(receivedResponse)
254 receivedQuery.id = query.id
255 self.assertEqual(receivedQuery, query)
256 self.assertEqual(receivedResponse, response)
257
258 class TestEDNSOptionsAddingECS(EDNSOptionsBase):
259
260 _config_template = """
261 %s
262
263 addAction(AllRule(), LuaAction(testEDNSOptions))
264 addResponseAction("ednsoptions-ecs.tests.powerdns.com.", LuaResponseAction(testEDNSOptionsInResponses))
265
266 newServer{address="127.0.0.1:%s", useClientSubnet=true}
267 """
268 _config_params = ['_ednsTestFunction', '_testServerPort']
269
270 def testWithoutEDNS(self):
271 """
272 EDNS Options: No EDNS (adding ECS)
273 """
274 name = 'noedns.ednsoptions-ecs.tests.powerdns.com.'
275 query = dns.message.make_query(name, 'A', 'IN')
276 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
277 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
278 response = dns.message.make_response(query)
279 rrset = dns.rrset.from_text(name,
280 3600,
281 dns.rdataclass.IN,
282 dns.rdatatype.A,
283 '127.0.0.1')
284 response.answer.append(rrset)
285
286 for method in ("sendUDPQuery", "sendTCPQuery"):
287 sender = getattr(self, method)
288 (receivedQuery, receivedResponse) = sender(query, response)
289 self.assertTrue(receivedQuery)
290 self.assertTrue(receivedResponse)
291 receivedQuery.id = expectedQuery.id
292 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
293 self.checkResponseNoEDNS(response, receivedResponse)
294
295 def testCookie(self):
296 """
297 EDNS Options: Cookie (adding ECS)
298 """
299 name = 'cookie.ednsoptions-ecs.tests.powerdns.com.'
300 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
301 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[eco])
302 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
303 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512)
304 response = dns.message.make_response(query)
305 rrset = dns.rrset.from_text(name,
306 3600,
307 dns.rdataclass.IN,
308 dns.rdatatype.A,
309 '127.0.0.1')
310 response.answer.append(rrset)
311
312 for method in ("sendUDPQuery", "sendTCPQuery"):
313 sender = getattr(self, method)
314 (receivedQuery, receivedResponse) = sender(query, response)
315 self.assertTrue(receivedQuery)
316 self.assertTrue(receivedResponse)
317 receivedQuery.id = expectedQuery.id
318 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
319 self.checkResponseEDNSWithoutECS(response, receivedResponse)
320
321 def testECS4(self):
322 """
323 EDNS Options: ECS4 (adding ECS)
324 """
325 name = 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
326 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
327 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
328 ecsoResponse = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24, scope=24)
329 response = dns.message.make_response(query)
330 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
331 rrset = dns.rrset.from_text(name,
332 3600,
333 dns.rdataclass.IN,
334 dns.rdatatype.A,
335 '127.0.0.1')
336 response.answer.append(rrset)
337
338 for method in ("sendUDPQuery", "sendTCPQuery"):
339 sender = getattr(self, method)
340 (receivedQuery, receivedResponse) = sender(query, response)
341 self.assertTrue(receivedQuery)
342 self.assertTrue(receivedResponse)
343 receivedQuery.id = query.id
344 self.checkQueryEDNSWithECS(query, receivedQuery)
345 self.checkResponseEDNSWithECS(response, receivedResponse)
346
347 def testECS6(self):
348 """
349 EDNS Options: ECS6 (adding ECS)
350 """
351 name = 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
352 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
353 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
354 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
355 response = dns.message.make_response(query)
356 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
357 rrset = dns.rrset.from_text(name,
358 3600,
359 dns.rdataclass.IN,
360 dns.rdatatype.A,
361 '127.0.0.1')
362 response.answer.append(rrset)
363
364 for method in ("sendUDPQuery", "sendTCPQuery"):
365 sender = getattr(self, method)
366 (receivedQuery, receivedResponse) = sender(query, response)
367 self.assertTrue(receivedQuery)
368 self.assertTrue(receivedResponse)
369 receivedQuery.id = query.id
370 self.checkQueryEDNSWithECS(query, receivedQuery)
371 self.checkResponseEDNSWithECS(response, receivedResponse)
372
373 def testECS6Cookie(self):
374 """
375 EDNS Options: Cookie + ECS6 (adding ECS)
376 """
377 name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
378 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
379 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
380 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
381 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
382 response = dns.message.make_response(query)
383 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
384 rrset = dns.rrset.from_text(name,
385 3600,
386 dns.rdataclass.IN,
387 dns.rdatatype.A,
388 '127.0.0.1')
389 response.answer.append(rrset)
390
391 for method in ("sendUDPQuery", "sendTCPQuery"):
392 sender = getattr(self, method)
393 (receivedQuery, receivedResponse) = sender(query, response)
394 self.assertTrue(receivedQuery)
395 self.assertTrue(receivedResponse)
396 receivedQuery.id = query.id
397 self.checkQueryEDNSWithECS(query, receivedQuery, 1)
398 self.checkResponseEDNSWithECS(response, receivedResponse)
399
400 def testMultiCookiesECS6(self):
401 """
402 EDNS Options: Two Cookies + ECS6
403 """
404 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
405 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
406 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
407 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
408 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
409 response = dns.message.make_response(query)
410 rrset = dns.rrset.from_text(name,
411 3600,
412 dns.rdataclass.IN,
413 dns.rdatatype.A,
414 '127.0.0.1')
415 response.answer.append(rrset)
416
417 for method in ("sendUDPQuery", "sendTCPQuery"):
418 sender = getattr(self, method)
419 (receivedQuery, receivedResponse) = sender(query, response)
420 self.assertTrue(receivedQuery)
421 self.assertTrue(receivedResponse)
422 receivedQuery.id = query.id
423 self.assertEqual(receivedQuery, query)
424 self.assertEqual(receivedResponse, response)
425
426 class TestEDNSOptionsLuaFFI(DNSDistTest):
427
428 _config_template = """
429 local ffi = require("ffi")
430
431 function testEDNSOptions(dq)
432 local options_ptr = ffi.new("const dnsdist_ffi_ednsoption_t *[1]")
433 local ret_ptr_param = ffi.cast("const dnsdist_ffi_ednsoption_t **", options_ptr)
434
435 local options_count = tonumber(ffi.C.dnsdist_ffi_dnsquestion_get_edns_options(dq, ret_ptr_param))
436
437 local qname_ptr = ffi.new("const char *[1]")
438 local qname_ptr_param = ffi.cast("const char **", qname_ptr)
439 local qname_size = ffi.new("size_t[1]")
440 local qname_size_param = ffi.cast("size_t*", qname_size)
441 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, qname_ptr_param, qname_size_param)
442 local qname = ffi.string(qname_ptr[0])
443
444 if string.match(qname, 'noedns') then
445 if options_count ~= 0 then
446 local str = "192.0.2.255"
447 local buf = ffi.new("char[?]", #str + 1)
448 ffi.copy(buf, str)
449 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
450 return DNSAction.Spoof
451 end
452 end
453
454 local cookies_count = 0
455 local ecs_count = 0
456 local ecs_index = -1
457 local first_cookie_index = -1
458 local last_cookie_index = -1
459 if options_count > 0 then
460 for idx = 0, options_count-1 do
461 if options_ptr[0][idx].optionCode == EDNSOptionCode.COOKIE then
462 cookies_count = cookies_count + 1
463 if first_cookie_index == -1 then
464 first_cookie_index = idx
465 end
466 last_cookie_index = idx
467 elseif options_ptr[0][idx].optionCode == EDNSOptionCode.ECS then
468 ecs_count = ecs_count + 1
469 ecs_index = idx
470 end
471 end
472 end
473
474 if string.match(qname, 'multiplecookies') then
475 if cookies_count == 0 then
476 local str = "192.0.2.1"
477 local buf = ffi.new("char[?]", #str + 1)
478 ffi.copy(buf, str)
479 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
480 return DNSAction.Spoof
481 end
482 if cookies_count ~= 2 then
483 local str = "192.0.2.2"
484 local buf = ffi.new("char[?]", #str + 1)
485 ffi.copy(buf, str)
486 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
487 return DNSAction.Spoof
488 end
489 if options_ptr[0][first_cookie_index].len ~= 16 then
490 local str = "192.0.2.3"
491 local buf = ffi.new("char[?]", #str + 1)
492 ffi.copy(buf, str)
493 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
494 return DNSAction.Spoof
495 end
496 if options_ptr[0][last_cookie_index].len ~= 16 then
497 local str = "192.0.2.4"
498 local buf = ffi.new("char[?]", #str + 1)
499 ffi.copy(buf, str)
500 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
501 return DNSAction.Spoof
502 end
503 elseif string.match(qname, 'cookie') then
504
505 if cookies_count == 0 then
506 local str = "192.0.2.1"
507 local buf = ffi.new("char[?]", #str + 1)
508 ffi.copy(buf, str)
509 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
510 return DNSAction.Spoof
511 end
512 if cookies_count ~= 1 or options_ptr[0][first_cookie_index].len ~= 16 then
513 local str = "192.0.2.2"
514 local buf = ffi.new("char[?]", #str + 1)
515 ffi.copy(buf, str)
516 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
517 return DNSAction.Spoof
518 end
519 end
520
521 if string.match(qname, 'ecs4') then
522 if ecs_count == 0 then
523 local str = "192.0.2.51"
524 local buf = ffi.new("char[?]", #str + 1)
525 ffi.copy(buf, str)
526 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
527 return DNSAction.Spoof
528 end
529
530 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 8 then
531 local str = "192.0.2.52"
532 local buf = ffi.new("char[?]", #str + 1)
533 ffi.copy(buf, str)
534 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
535 return DNSAction.Spoof
536 end
537 end
538
539 if string.match(qname, 'ecs6') then
540 if ecs_count == 0 then
541 local str = "192.0.2.101"
542 local buf = ffi.new("char[?]", #str + 1)
543 ffi.copy(buf, str)
544 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
545 return DNSAction.Spoof
546 end
547 if ecs_count ~= 1 or options_ptr[0][ecs_index].len ~= 20 then
548 local str = "192.0.2.102"
549 local buf = ffi.new("char[?]", #str + 1)
550 ffi.copy(buf, str)
551 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
552 return DNSAction.Spoof
553 end
554 end
555
556 return DNSAction.None
557
558 end
559
560 addAction(AllRule(), LuaFFIAction(testEDNSOptions))
561
562 newServer{address="127.0.0.1:%s"}
563 """
564
565 def testWithoutEDNSFFI(self):
566 """
567 EDNS Options: No EDNS (FFI)
568 """
569 name = 'noedns.ednsoptions.tests.powerdns.com.'
570 query = dns.message.make_query(name, 'A', 'IN')
571 response = dns.message.make_response(query)
572 rrset = dns.rrset.from_text(name,
573 3600,
574 dns.rdataclass.IN,
575 dns.rdatatype.A,
576 '192.0.2.255')
577 response.answer.append(rrset)
578
579 for method in ("sendUDPQuery", "sendTCPQuery"):
580 sender = getattr(self, method)
581 (receivedQuery, receivedResponse) = sender(query, response)
582 self.assertTrue(receivedQuery)
583 self.assertTrue(receivedResponse)
584 receivedQuery.id = query.id
585 self.assertEqual(receivedQuery, query)
586 self.assertEqual(receivedResponse, response)
587
588 def testCookieFFI(self):
589 """
590 EDNS Options: Cookie (FFI)
591 """
592 name = 'cookie.ednsoptions.tests.powerdns.com.'
593 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
594 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
595 response = dns.message.make_response(query)
596 rrset = dns.rrset.from_text(name,
597 3600,
598 dns.rdataclass.IN,
599 dns.rdatatype.A,
600 '127.0.0.1')
601 response.answer.append(rrset)
602
603 for method in ("sendUDPQuery", "sendTCPQuery"):
604 sender = getattr(self, method)
605 (receivedQuery, receivedResponse) = sender(query, response)
606 self.assertTrue(receivedQuery)
607 self.assertTrue(receivedResponse)
608 receivedQuery.id = query.id
609 self.assertEqual(receivedQuery, query)
610 self.assertEqual(receivedResponse, response)
611
612 def testECS4FFI(self):
613 """
614 EDNS Options: ECS4 (FFI)
615 """
616 name = 'ecs4.ednsoptions.tests.powerdns.com.'
617 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
618 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
619 response = dns.message.make_response(query)
620 rrset = dns.rrset.from_text(name,
621 3600,
622 dns.rdataclass.IN,
623 dns.rdatatype.A,
624 '127.0.0.1')
625 response.answer.append(rrset)
626
627 for method in ("sendUDPQuery", "sendTCPQuery"):
628 sender = getattr(self, method)
629 (receivedQuery, receivedResponse) = sender(query, response)
630 self.assertTrue(receivedQuery)
631 self.assertTrue(receivedResponse)
632 receivedQuery.id = query.id
633 self.assertEqual(receivedQuery, query)
634 self.assertEqual(receivedResponse, response)
635
636 def testECS6FFI(self):
637 """
638 EDNS Options: ECS6 (FFI)
639 """
640 name = 'ecs6.ednsoptions.tests.powerdns.com.'
641 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
642 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
643 response = dns.message.make_response(query)
644 rrset = dns.rrset.from_text(name,
645 3600,
646 dns.rdataclass.IN,
647 dns.rdatatype.A,
648 '127.0.0.1')
649 response.answer.append(rrset)
650
651 for method in ("sendUDPQuery", "sendTCPQuery"):
652 sender = getattr(self, method)
653 (receivedQuery, receivedResponse) = sender(query, response)
654 self.assertTrue(receivedQuery)
655 self.assertTrue(receivedResponse)
656 receivedQuery.id = query.id
657 self.assertEqual(receivedQuery, query)
658 self.assertEqual(receivedResponse, response)
659
660 def testECS6CookieFFI(self):
661 """
662 EDNS Options: Cookie + ECS6 (FFI)
663 """
664 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
665 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
666 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
667 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
668 response = dns.message.make_response(query)
669 rrset = dns.rrset.from_text(name,
670 3600,
671 dns.rdataclass.IN,
672 dns.rdatatype.A,
673 '127.0.0.1')
674 response.answer.append(rrset)
675
676 for method in ("sendUDPQuery", "sendTCPQuery"):
677 sender = getattr(self, method)
678 (receivedQuery, receivedResponse) = sender(query, response)
679 self.assertTrue(receivedQuery)
680 self.assertTrue(receivedResponse)
681 receivedQuery.id = query.id
682 self.assertEqual(receivedQuery, query)
683 self.assertEqual(receivedResponse, response)
684
685 def testMultiCookiesECS6FFI(self):
686 """
687 EDNS Options: Two Cookies + ECS6 (FFI)
688 """
689 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
690 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
691 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
692 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
693 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
694 response = dns.message.make_response(query)
695 rrset = dns.rrset.from_text(name,
696 3600,
697 dns.rdataclass.IN,
698 dns.rdatatype.A,
699 '127.0.0.1')
700 response.answer.append(rrset)
701
702 for method in ("sendUDPQuery", "sendTCPQuery"):
703 sender = getattr(self, method)
704 (receivedQuery, receivedResponse) = sender(query, response)
705 self.assertTrue(receivedQuery)
706 self.assertTrue(receivedResponse)
707 receivedQuery.id = query.id
708 self.assertEqual(receivedQuery, query)
709 self.assertEqual(receivedResponse, response)