]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
9 dnsdist is configured to add the EDNS0 Client Subnet
10 option, but only if it's not already present in the
14 _config_template
= """
16 newServer{address="127.0.0.1:%s", useClientSubnet=true}
19 def testWithoutEDNS(self
):
23 Send a query without EDNS, check that the query
24 received by the responder has the correct ECS value
25 and that the response received from dnsdist does not
26 have an EDNS pseudo-RR.
28 name
= 'withoutedns.ecs.tests.powerdns.com.'
29 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
30 query
= dns
.message
.make_query(name
, 'A', 'IN')
31 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
32 response
= dns
.message
.make_response(expectedQuery
)
33 expectedResponse
= dns
.message
.make_response(query
)
34 rrset
= dns
.rrset
.from_text(name
,
39 response
.answer
.append(rrset
)
40 expectedResponse
.answer
.append(rrset
)
42 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
43 self
.assertTrue(receivedQuery
)
44 self
.assertTrue(receivedResponse
)
45 receivedQuery
.id = expectedQuery
.id
46 self
.assertEquals(expectedQuery
, receivedQuery
)
47 self
.assertEquals(expectedResponse
, receivedResponse
)
48 self
.assertEquals(receivedResponse
.edns
, -1)
49 self
.assertEquals(len(receivedResponse
.options
), 0)
51 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
52 self
.assertTrue(receivedQuery
)
53 self
.assertTrue(receivedResponse
)
54 receivedQuery
.id = expectedQuery
.id
55 self
.assertEquals(expectedQuery
, receivedQuery
)
56 self
.assertEquals(expectedResponse
, receivedResponse
)
57 self
.assertEquals(receivedResponse
.edns
, -1)
58 self
.assertEquals(len(receivedResponse
.options
), 0)
60 def testWithEDNSNoECS(self
):
62 ECS: Existing EDNS without ECS
64 Send a query with EDNS but no ECS value.
65 Check that the query received by the responder
66 has a valid ECS value and that the response
67 received from dnsdist contains an EDNS pseudo-RR.
69 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
70 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
71 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
72 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
73 response
= dns
.message
.make_response(expectedQuery
)
74 expectedResponse
= dns
.message
.make_response(query
)
75 rrset
= dns
.rrset
.from_text(name
,
80 response
.answer
.append(rrset
)
81 expectedResponse
.answer
.append(rrset
)
83 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
84 self
.assertTrue(receivedQuery
)
85 self
.assertTrue(receivedResponse
)
86 receivedQuery
.id = expectedQuery
.id
87 self
.assertEquals(expectedQuery
, receivedQuery
)
88 self
.assertEquals(expectedResponse
, receivedResponse
)
89 self
.assertEquals(receivedResponse
.edns
, 0)
90 self
.assertEquals(len(receivedResponse
.options
), 0)
92 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
93 self
.assertTrue(receivedQuery
)
94 self
.assertTrue(receivedResponse
)
95 receivedQuery
.id = expectedQuery
.id
96 self
.assertEquals(expectedQuery
, receivedQuery
)
97 self
.assertEquals(expectedResponse
, receivedResponse
)
98 self
.assertEquals(receivedResponse
.edns
, 0)
99 self
.assertEquals(len(receivedResponse
.options
), 0)
101 def testWithEDNSECS(self
):
103 ECS: Existing EDNS with ECS
105 Send a query with EDNS and a crafted ECS value.
106 Check that the query received by the responder
107 has the initial ECS value (not overwritten)
108 and that the response received from dnsdist contains
111 name
= 'withednsecs.ecs.tests.powerdns.com.'
112 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
113 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
114 response
= dns
.message
.make_response(query
)
115 rrset
= dns
.rrset
.from_text(name
,
120 response
.answer
.append(rrset
)
122 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
123 self
.assertTrue(receivedQuery
)
124 self
.assertTrue(receivedResponse
)
125 receivedQuery
.id = query
.id
126 self
.assertEquals(query
, receivedQuery
)
127 self
.assertEquals(response
, receivedResponse
)
128 self
.assertEquals(receivedResponse
.edns
, 0)
129 self
.assertEquals(len(receivedResponse
.options
), 0)
131 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
132 self
.assertTrue(receivedQuery
)
133 self
.assertTrue(receivedResponse
)
134 receivedQuery
.id = query
.id
135 self
.assertEquals(query
, receivedQuery
)
136 self
.assertEquals(response
, receivedResponse
)
137 self
.assertEquals(receivedResponse
.edns
, 0)
138 self
.assertEquals(len(receivedResponse
.options
), 0)
140 def testWithoutEDNSResponseWithECS(self
):
142 ECS: No existing EDNS (BE returning ECS)
144 Send a query without EDNS, check that the query
145 received by the responder has the correct ECS value
146 and that the response received from dnsdist does not
147 have an EDNS pseudo-RR.
148 This time the response returned by the backend contains
149 an ECS option with scope set.
151 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
152 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
153 query
= dns
.message
.make_query(name
, 'A', 'IN')
154 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
155 response
= dns
.message
.make_response(expectedQuery
)
156 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
157 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
158 expectedResponse
= dns
.message
.make_response(query
)
159 rrset
= dns
.rrset
.from_text(name
,
164 response
.answer
.append(rrset
)
165 expectedResponse
.answer
.append(rrset
)
167 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
168 self
.assertTrue(receivedQuery
)
169 self
.assertTrue(receivedResponse
)
170 receivedQuery
.id = expectedQuery
.id
171 self
.assertEquals(expectedQuery
, receivedQuery
)
172 self
.assertEquals(expectedResponse
, receivedResponse
)
173 self
.assertEquals(receivedResponse
.edns
, -1)
174 self
.assertEquals(len(receivedResponse
.options
), 0)
176 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
177 self
.assertTrue(receivedQuery
)
178 self
.assertTrue(receivedResponse
)
179 receivedQuery
.id = expectedQuery
.id
180 self
.assertEquals(expectedQuery
, receivedQuery
)
181 self
.assertEquals(expectedResponse
, receivedResponse
)
182 self
.assertEquals(receivedResponse
.edns
, -1)
183 self
.assertEquals(len(receivedResponse
.options
), 0)
185 def testWithEDNSNoECSResponseWithECS(self
):
187 ECS: Existing EDNS without ECS (BE returning only the ECS option)
189 Send a query with EDNS but no ECS value.
190 Check that the query received by the responder
191 has a valid ECS value and that the response
192 received from dnsdist contains an EDNS pseudo-RR.
193 This time the response returned by the backend contains
194 an ECS option with scope set.
196 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
197 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
198 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
199 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
200 response
= dns
.message
.make_response(expectedQuery
)
201 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
202 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
203 expectedResponse
= dns
.message
.make_response(query
)
204 rrset
= dns
.rrset
.from_text(name
,
209 response
.answer
.append(rrset
)
210 expectedResponse
.answer
.append(rrset
)
212 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
213 self
.assertTrue(receivedQuery
)
214 self
.assertTrue(receivedResponse
)
215 receivedQuery
.id = expectedQuery
.id
216 self
.assertEquals(expectedQuery
, receivedQuery
)
217 self
.assertEquals(expectedResponse
, receivedResponse
)
218 self
.assertEquals(receivedResponse
.edns
, 0)
219 self
.assertEquals(len(receivedResponse
.options
), 0)
221 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
222 self
.assertTrue(receivedQuery
)
223 self
.assertTrue(receivedResponse
)
224 receivedQuery
.id = expectedQuery
.id
225 self
.assertEquals(expectedQuery
, receivedQuery
)
226 self
.assertEquals(expectedResponse
, receivedResponse
)
227 self
.assertEquals(receivedResponse
.edns
, 0)
228 self
.assertEquals(len(receivedResponse
.options
), 0)
230 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
232 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
234 Send a query with EDNS but no ECS value.
235 Check that the query received by the responder
236 has a valid ECS value and that the response
237 received from dnsdist contains an EDNS pseudo-RR.
238 This time the response returned by the backend contains
239 one cookies then one ECS option.
241 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
242 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
243 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
244 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
245 response
= dns
.message
.make_response(expectedQuery
)
246 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
247 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
248 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
249 expectedResponse
= dns
.message
.make_response(query
)
250 rrset
= dns
.rrset
.from_text(name
,
255 response
.answer
.append(rrset
)
256 expectedResponse
.answer
.append(rrset
)
258 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
259 self
.assertTrue(receivedQuery
)
260 self
.assertTrue(receivedResponse
)
261 receivedQuery
.id = expectedQuery
.id
262 self
.assertEquals(expectedQuery
, receivedQuery
)
263 self
.assertEquals(expectedResponse
, receivedResponse
)
264 self
.assertEquals(receivedResponse
.edns
, 0)
265 self
.assertEquals(len(receivedResponse
.options
), 1)
267 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
268 self
.assertTrue(receivedQuery
)
269 self
.assertTrue(receivedResponse
)
270 receivedQuery
.id = expectedQuery
.id
271 self
.assertEquals(expectedQuery
, receivedQuery
)
272 self
.assertEquals(expectedResponse
, receivedResponse
)
273 self
.assertEquals(receivedResponse
.edns
, 0)
274 self
.assertEquals(len(receivedResponse
.options
), 1)
276 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
278 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
280 Send a query with EDNS but no ECS value.
281 Check that the query received by the responder
282 has a valid ECS value and that the response
283 received from dnsdist contains an EDNS pseudo-RR.
284 This time the response returned by the backend contains
285 one ECS then one Cookies option.
287 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
288 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
289 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
290 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
291 response
= dns
.message
.make_response(expectedQuery
)
292 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
293 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
294 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
295 expectedResponse
= dns
.message
.make_response(query
)
296 rrset
= dns
.rrset
.from_text(name
,
301 response
.answer
.append(rrset
)
302 expectedResponse
.answer
.append(rrset
)
304 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
305 self
.assertTrue(receivedQuery
)
306 self
.assertTrue(receivedResponse
)
307 receivedQuery
.id = expectedQuery
.id
308 self
.assertEquals(expectedQuery
, receivedQuery
)
309 self
.assertEquals(expectedResponse
, receivedResponse
)
310 self
.assertEquals(receivedResponse
.edns
, 0)
311 self
.assertEquals(len(receivedResponse
.options
), 1)
313 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
314 self
.assertTrue(receivedQuery
)
315 self
.assertTrue(receivedResponse
)
316 receivedQuery
.id = expectedQuery
.id
317 self
.assertEquals(expectedQuery
, receivedQuery
)
318 self
.assertEquals(expectedResponse
, receivedResponse
)
319 self
.assertEquals(receivedResponse
.edns
, 0)
320 self
.assertEquals(len(receivedResponse
.options
), 1)
322 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
324 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
326 Send a query with EDNS but no ECS value.
327 Check that the query received by the responder
328 has a valid ECS value and that the response
329 received from dnsdist contains an EDNS pseudo-RR.
330 This time the response returned by the backend contains
331 one Cookies, one ECS then one Cookies option.
333 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
334 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
335 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
336 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
337 response
= dns
.message
.make_response(expectedQuery
)
338 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
339 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
340 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
341 expectedResponse
= dns
.message
.make_response(query
)
342 rrset
= dns
.rrset
.from_text(name
,
347 response
.answer
.append(rrset
)
348 expectedResponse
.answer
.append(rrset
)
350 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
351 self
.assertTrue(receivedQuery
)
352 self
.assertTrue(receivedResponse
)
353 receivedQuery
.id = expectedQuery
.id
354 self
.assertEquals(expectedQuery
, receivedQuery
)
355 self
.assertEquals(expectedResponse
, receivedResponse
)
356 self
.assertEquals(receivedResponse
.edns
, 0)
357 self
.assertEquals(len(receivedResponse
.options
), 2)
359 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
360 self
.assertTrue(receivedQuery
)
361 self
.assertTrue(receivedResponse
)
362 receivedQuery
.id = expectedQuery
.id
363 self
.assertEquals(expectedQuery
, receivedQuery
)
364 self
.assertEquals(expectedResponse
, receivedResponse
)
365 self
.assertEquals(receivedResponse
.edns
, 0)
366 self
.assertEquals(len(receivedResponse
.options
), 2)
369 class TestEdnsClientSubnetOverride(DNSDistTest
):
371 dnsdist is configured to add the EDNS0 Client Subnet
372 option, overwriting any existing value.
375 _config_template
= """
378 setECSSourcePrefixV4(24)
379 setECSSourcePrefixV6(56)
380 newServer{address="127.0.0.1:%s", useClientSubnet=true}
383 def testWithoutEDNS(self
):
385 ECS Override: No existing EDNS
387 Send a query without EDNS, check that the query
388 received by the responder has the correct ECS value
389 and that the response received from dnsdist does not
390 have an EDNS pseudo-RR.
392 name
= 'withoutedns.overriden.ecs.tests.powerdns.com.'
393 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
394 query
= dns
.message
.make_query(name
, 'A', 'IN')
395 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
396 response
= dns
.message
.make_response(expectedQuery
)
397 expectedResponse
= dns
.message
.make_response(query
)
398 rrset
= dns
.rrset
.from_text(name
,
403 response
.answer
.append(rrset
)
404 expectedResponse
.answer
.append(rrset
)
406 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
407 self
.assertTrue(receivedQuery
)
408 self
.assertTrue(receivedResponse
)
409 receivedQuery
.id = expectedQuery
.id
410 self
.assertEquals(expectedQuery
, receivedQuery
)
411 self
.assertEquals(expectedResponse
, receivedResponse
)
412 self
.assertEquals(receivedResponse
.edns
, -1)
413 self
.assertEquals(len(receivedResponse
.options
), 0)
415 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
416 self
.assertTrue(receivedQuery
)
417 self
.assertTrue(receivedResponse
)
418 receivedQuery
.id = expectedQuery
.id
419 self
.assertEquals(expectedQuery
, receivedQuery
)
420 self
.assertEquals(expectedResponse
, receivedResponse
)
421 self
.assertEquals(receivedResponse
.edns
, -1)
422 self
.assertEquals(len(receivedResponse
.options
), 0)
424 def testWithEDNSNoECS(self
):
426 ECS Override: Existing EDNS without ECS
428 Send a query with EDNS but no ECS value.
429 Check that the query received by the responder
430 has a valid ECS value and that the response
431 received from dnsdist contains an EDNS pseudo-RR.
433 name
= 'withednsnoecs.overriden.ecs.tests.powerdns.com.'
434 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
435 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
436 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
437 response
= dns
.message
.make_response(expectedQuery
)
438 expectedResponse
= dns
.message
.make_response(query
)
439 rrset
= dns
.rrset
.from_text(name
,
444 response
.answer
.append(rrset
)
445 expectedResponse
.answer
.append(rrset
)
447 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
448 self
.assertTrue(receivedQuery
)
449 self
.assertTrue(receivedResponse
)
450 receivedQuery
.id = expectedQuery
.id
451 self
.assertEquals(expectedQuery
, receivedQuery
)
452 self
.assertEquals(expectedResponse
, receivedResponse
)
453 self
.assertEquals(receivedResponse
.edns
, 0)
454 self
.assertEquals(len(receivedResponse
.options
), 0)
456 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
457 self
.assertTrue(receivedQuery
)
458 self
.assertTrue(receivedResponse
)
459 receivedQuery
.id = expectedQuery
.id
460 self
.assertEquals(expectedQuery
, receivedQuery
)
461 self
.assertEquals(expectedResponse
, receivedResponse
)
462 self
.assertEquals(receivedResponse
.edns
, 0)
463 self
.assertEquals(len(receivedResponse
.options
), 0)
465 def testWithEDNSShorterInitialECS(self
):
467 ECS Override: Existing EDNS with ECS (short)
469 Send a query with EDNS and a crafted ECS value.
470 Check that the query received by the responder
471 has an overwritten ECS value (not the initial one)
472 and that the response received from dnsdist contains
474 The initial ECS value is shorter than the one it will
477 name
= 'withednsecs.overriden.ecs.tests.powerdns.com.'
478 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
479 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
480 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
481 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
482 response
= dns
.message
.make_response(query
)
483 rrset
= dns
.rrset
.from_text(name
,
488 response
.answer
.append(rrset
)
490 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
491 self
.assertTrue(receivedQuery
)
492 self
.assertTrue(receivedResponse
)
493 receivedQuery
.id = expectedQuery
.id
494 self
.assertEquals(expectedQuery
, receivedQuery
)
495 self
.assertEquals(response
, receivedResponse
)
496 self
.assertEquals(receivedResponse
.edns
, 0)
497 self
.assertEquals(len(receivedResponse
.options
), 0)
499 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
500 self
.assertTrue(receivedQuery
)
501 self
.assertTrue(receivedResponse
)
502 receivedQuery
.id = expectedQuery
.id
503 self
.assertEquals(expectedQuery
, receivedQuery
)
504 self
.assertEquals(response
, receivedResponse
)
505 self
.assertEquals(receivedResponse
.edns
, 0)
506 self
.assertEquals(len(receivedResponse
.options
), 0)
508 def testWithEDNSLongerInitialECS(self
):
510 ECS Override: Existing EDNS with ECS (long)
512 Send a query with EDNS and a crafted ECS value.
513 Check that the query received by the responder
514 has an overwritten ECS value (not the initial one)
515 and that the response received from dnsdist contains
517 The initial ECS value is longer than the one it will
520 name
= 'withednsecs.overriden.ecs.tests.powerdns.com.'
521 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
522 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
523 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
524 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
525 response
= dns
.message
.make_response(query
)
526 rrset
= dns
.rrset
.from_text(name
,
531 response
.answer
.append(rrset
)
533 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
534 self
.assertTrue(receivedQuery
)
535 self
.assertTrue(receivedResponse
)
536 receivedQuery
.id = expectedQuery
.id
537 self
.assertEquals(expectedQuery
, receivedQuery
)
538 self
.assertEquals(response
, receivedResponse
)
539 self
.assertEquals(receivedResponse
.edns
, 0)
540 self
.assertEquals(len(receivedResponse
.options
), 0)
542 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
543 self
.assertTrue(receivedQuery
)
544 self
.assertTrue(receivedResponse
)
545 receivedQuery
.id = expectedQuery
.id
546 self
.assertEquals(expectedQuery
, receivedQuery
)
547 self
.assertEquals(response
, receivedResponse
)
548 self
.assertEquals(receivedResponse
.edns
, 0)
549 self
.assertEquals(len(receivedResponse
.options
), 0)
551 def testWithEDNSSameSizeInitialECS(self
):
553 ECS Override: Existing EDNS with ECS (same)
555 Send a query with EDNS and a crafted ECS value.
556 Check that the query received by the responder
557 has an overwritten ECS value (not the initial one)
558 and that the response received from dnsdist contains
560 The initial ECS value is exactly the same size as
561 the one it will replaced with.
563 name
= 'withednsecs.overriden.ecs.tests.powerdns.com.'
564 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
565 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
566 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
567 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
568 response
= dns
.message
.make_response(query
)
569 rrset
= dns
.rrset
.from_text(name
,
574 response
.answer
.append(rrset
)
576 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
577 self
.assertTrue(receivedQuery
)
578 self
.assertTrue(receivedResponse
)
579 receivedQuery
.id = expectedQuery
.id
580 self
.assertEquals(expectedQuery
, receivedQuery
)
581 self
.assertEquals(response
, receivedResponse
)
582 self
.assertEquals(receivedResponse
.edns
, 0)
583 self
.assertEquals(len(receivedResponse
.options
), 0)
585 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
586 self
.assertTrue(receivedQuery
)
587 self
.assertTrue(receivedResponse
)
588 receivedQuery
.id = expectedQuery
.id
589 self
.assertEquals(expectedQuery
, receivedQuery
)
590 self
.assertEquals(response
, receivedResponse
)
591 self
.assertEquals(receivedResponse
.edns
, 0)
592 self
.assertEquals(len(receivedResponse
.options
), 0)