]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
9 dnsdist is configured to add the EDNS0 Client Subnet
10 option, but only if it's not already present in the
14 _config_template
= """
16 newServer{address="127.0.0.1:%s", useClientSubnet=true}
19 def testWithoutEDNS(self
):
23 Send a query without EDNS, check that the query
24 received by the responder has the correct ECS value
25 and that the response received from dnsdist does not
26 have an EDNS pseudo-RR.
28 name
= 'withoutedns.ecs.tests.powerdns.com.'
29 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
30 query
= dns
.message
.make_query(name
, 'A', 'IN')
31 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
32 response
= dns
.message
.make_response(expectedQuery
)
33 expectedResponse
= dns
.message
.make_response(query
)
34 rrset
= dns
.rrset
.from_text(name
,
39 response
.answer
.append(rrset
)
40 expectedResponse
.answer
.append(rrset
)
42 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
43 self
.assertTrue(receivedQuery
)
44 self
.assertTrue(receivedResponse
)
45 receivedQuery
.id = expectedQuery
.id
46 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
47 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
49 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
50 self
.assertTrue(receivedQuery
)
51 self
.assertTrue(receivedResponse
)
52 receivedQuery
.id = expectedQuery
.id
53 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
54 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
56 def testWithEDNSNoECS(self
):
58 ECS: Existing EDNS without ECS
60 Send a query with EDNS but no ECS value.
61 Check that the query received by the responder
62 has a valid ECS value and that the response
63 received from dnsdist contains an EDNS pseudo-RR.
65 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
66 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
67 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
68 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
69 response
= dns
.message
.make_response(expectedQuery
)
70 expectedResponse
= dns
.message
.make_response(query
)
71 rrset
= dns
.rrset
.from_text(name
,
76 response
.answer
.append(rrset
)
77 expectedResponse
.answer
.append(rrset
)
79 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
80 self
.assertTrue(receivedQuery
)
81 self
.assertTrue(receivedResponse
)
82 receivedQuery
.id = expectedQuery
.id
83 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
84 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
86 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
87 self
.assertTrue(receivedQuery
)
88 self
.assertTrue(receivedResponse
)
89 receivedQuery
.id = expectedQuery
.id
90 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
91 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
93 def testWithEDNSECS(self
):
95 ECS: Existing EDNS with ECS
97 Send a query with EDNS and a crafted ECS value.
98 Check that the query received by the responder
99 has the initial ECS value (not overwritten)
100 and that the response received from dnsdist contains
103 name
= 'withednsecs.ecs.tests.powerdns.com.'
104 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
105 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
106 response
= dns
.message
.make_response(query
)
107 rrset
= dns
.rrset
.from_text(name
,
112 response
.answer
.append(rrset
)
115 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
116 self
.assertTrue(receivedQuery
)
117 self
.assertTrue(receivedResponse
)
118 receivedQuery
.id = query
.id
119 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
120 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
122 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
123 self
.assertTrue(receivedQuery
)
124 self
.assertTrue(receivedResponse
)
125 receivedQuery
.id = query
.id
126 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
127 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
129 def testWithoutEDNSResponseWithECS(self
):
131 ECS: No existing EDNS (BE returning ECS)
133 Send a query without EDNS, check that the query
134 received by the responder has the correct ECS value
135 and that the response received from dnsdist does not
136 have an EDNS pseudo-RR.
137 This time the response returned by the backend contains
138 an ECS option with scope set.
140 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
141 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
142 query
= dns
.message
.make_query(name
, 'A', 'IN')
143 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
144 response
= dns
.message
.make_response(expectedQuery
)
145 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
146 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
147 expectedResponse
= dns
.message
.make_response(query
)
148 rrset
= dns
.rrset
.from_text(name
,
153 response
.answer
.append(rrset
)
154 expectedResponse
.answer
.append(rrset
)
156 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
157 self
.assertTrue(receivedQuery
)
158 self
.assertTrue(receivedResponse
)
159 receivedQuery
.id = expectedQuery
.id
160 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
161 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
163 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
164 self
.assertTrue(receivedQuery
)
165 self
.assertTrue(receivedResponse
)
166 receivedQuery
.id = expectedQuery
.id
167 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
168 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
170 def testWithEDNSNoECSResponseWithECS(self
):
172 ECS: Existing EDNS without ECS (BE returning only the ECS option)
174 Send a query with EDNS but no ECS value.
175 Check that the query received by the responder
176 has a valid ECS value and that the response
177 received from dnsdist contains an EDNS pseudo-RR.
178 This time the response returned by the backend contains
179 an ECS option with scope set.
181 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
182 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
183 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
184 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
185 response
= dns
.message
.make_response(expectedQuery
)
186 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
187 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
188 expectedResponse
= dns
.message
.make_response(query
)
189 rrset
= dns
.rrset
.from_text(name
,
194 response
.answer
.append(rrset
)
195 expectedResponse
.answer
.append(rrset
)
197 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
198 self
.assertTrue(receivedQuery
)
199 self
.assertTrue(receivedResponse
)
200 receivedQuery
.id = expectedQuery
.id
201 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
202 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
204 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
205 self
.assertTrue(receivedQuery
)
206 self
.assertTrue(receivedResponse
)
207 receivedQuery
.id = expectedQuery
.id
208 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
209 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
211 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
213 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
215 Send a query with EDNS but no ECS value.
216 Check that the query received by the responder
217 has a valid ECS value and that the response
218 received from dnsdist contains an EDNS pseudo-RR.
219 This time the response returned by the backend contains
220 one cookies then one ECS option.
222 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
223 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
224 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
225 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
226 response
= dns
.message
.make_response(expectedQuery
)
227 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
228 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
229 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
230 expectedResponse
= dns
.message
.make_response(query
)
231 rrset
= dns
.rrset
.from_text(name
,
236 response
.answer
.append(rrset
)
237 expectedResponse
.answer
.append(rrset
)
238 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
240 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
241 self
.assertTrue(receivedQuery
)
242 self
.assertTrue(receivedResponse
)
243 receivedQuery
.id = expectedQuery
.id
244 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
245 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
247 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
248 self
.assertTrue(receivedQuery
)
249 self
.assertTrue(receivedResponse
)
250 receivedQuery
.id = expectedQuery
.id
251 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
252 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
254 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
256 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
258 Send a query with EDNS but no ECS value.
259 Check that the query received by the responder
260 has a valid ECS value and that the response
261 received from dnsdist contains an EDNS pseudo-RR.
262 This time the response returned by the backend contains
263 one ECS then one Cookies option.
265 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
266 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
267 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
268 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
269 response
= dns
.message
.make_response(expectedQuery
)
270 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
271 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
272 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
273 expectedResponse
= dns
.message
.make_response(query
)
274 rrset
= dns
.rrset
.from_text(name
,
279 response
.answer
.append(rrset
)
280 expectedResponse
.answer
.append(rrset
)
281 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
283 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
284 self
.assertTrue(receivedQuery
)
285 self
.assertTrue(receivedResponse
)
286 receivedQuery
.id = expectedQuery
.id
287 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
288 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
290 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
291 self
.assertTrue(receivedQuery
)
292 self
.assertTrue(receivedResponse
)
293 receivedQuery
.id = expectedQuery
.id
294 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
295 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
297 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
299 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
301 Send a query with EDNS but no ECS value.
302 Check that the query received by the responder
303 has a valid ECS value and that the response
304 received from dnsdist contains an EDNS pseudo-RR.
305 This time the response returned by the backend contains
306 one Cookies, one ECS then one Cookies option.
308 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
309 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
310 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
311 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
312 response
= dns
.message
.make_response(expectedQuery
)
313 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
314 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
315 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
316 expectedResponse
= dns
.message
.make_response(query
)
317 rrset
= dns
.rrset
.from_text(name
,
322 response
.answer
.append(rrset
)
323 expectedResponse
.answer
.append(rrset
)
325 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
326 self
.assertTrue(receivedQuery
)
327 self
.assertTrue(receivedResponse
)
328 receivedQuery
.id = expectedQuery
.id
329 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
330 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
332 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
333 self
.assertTrue(receivedQuery
)
334 self
.assertTrue(receivedResponse
)
335 receivedQuery
.id = expectedQuery
.id
336 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
337 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
340 class TestEdnsClientSubnetOverride(DNSDistTest
):
342 dnsdist is configured to add the EDNS0 Client Subnet
343 option, overwriting any existing value.
346 _config_template
= """
349 setECSSourcePrefixV4(24)
350 setECSSourcePrefixV6(56)
351 newServer{address="127.0.0.1:%s", useClientSubnet=true}
354 def testWithoutEDNS(self
):
356 ECS Override: No existing EDNS
358 Send a query without EDNS, check that the query
359 received by the responder has the correct ECS value
360 and that the response received from dnsdist does not
361 have an EDNS pseudo-RR.
363 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
364 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
365 query
= dns
.message
.make_query(name
, 'A', 'IN')
366 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
367 response
= dns
.message
.make_response(expectedQuery
)
368 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
369 rrset
= dns
.rrset
.from_text(name
,
374 response
.answer
.append(rrset
)
375 expectedResponse
= dns
.message
.make_response(query
)
376 expectedResponse
.answer
.append(rrset
)
378 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
379 self
.assertTrue(receivedQuery
)
380 self
.assertTrue(receivedResponse
)
381 receivedQuery
.id = expectedQuery
.id
382 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
383 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
385 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
386 self
.assertTrue(receivedQuery
)
387 self
.assertTrue(receivedResponse
)
388 receivedQuery
.id = expectedQuery
.id
389 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
390 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
392 def testWithEDNSNoECS(self
):
394 ECS Override: Existing EDNS without ECS
396 Send a query with EDNS but no ECS value.
397 Check that the query received by the responder
398 has a valid ECS value and that the response
399 received from dnsdist contains an EDNS pseudo-RR.
401 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
402 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
403 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
404 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
405 response
= dns
.message
.make_response(expectedQuery
)
406 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
407 rrset
= dns
.rrset
.from_text(name
,
412 response
.answer
.append(rrset
)
413 expectedResponse
= dns
.message
.make_response(query
)
414 expectedResponse
.answer
.append(rrset
)
416 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
417 self
.assertTrue(receivedQuery
)
418 self
.assertTrue(receivedResponse
)
419 receivedQuery
.id = expectedQuery
.id
420 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
421 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
423 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
424 self
.assertTrue(receivedQuery
)
425 self
.assertTrue(receivedResponse
)
426 receivedQuery
.id = expectedQuery
.id
427 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
428 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
430 def testWithEDNSShorterInitialECS(self
):
432 ECS Override: Existing EDNS with ECS (short)
434 Send a query with EDNS and a crafted ECS value.
435 Check that the query received by the responder
436 has an overwritten ECS value (not the initial one)
437 and that the response received from dnsdist contains
439 The initial ECS value is shorter than the one it will be
442 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
443 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
444 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
445 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
446 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
447 response
= dns
.message
.make_response(query
)
448 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
449 rrset
= dns
.rrset
.from_text(name
,
454 response
.answer
.append(rrset
)
456 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
457 self
.assertTrue(receivedQuery
)
458 self
.assertTrue(receivedResponse
)
459 receivedQuery
.id = expectedQuery
.id
460 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
461 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
463 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
464 self
.assertTrue(receivedQuery
)
465 self
.assertTrue(receivedResponse
)
466 receivedQuery
.id = expectedQuery
.id
467 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
468 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
470 def testWithEDNSLongerInitialECS(self
):
472 ECS Override: Existing EDNS with ECS (long)
474 Send a query with EDNS and a crafted ECS value.
475 Check that the query received by the responder
476 has an overwritten ECS value (not the initial one)
477 and that the response received from dnsdist contains
479 The initial ECS value is longer than the one it will
482 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
483 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
484 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
485 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
486 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
487 response
= dns
.message
.make_response(query
)
488 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
489 rrset
= dns
.rrset
.from_text(name
,
494 response
.answer
.append(rrset
)
496 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
497 self
.assertTrue(receivedQuery
)
498 self
.assertTrue(receivedResponse
)
499 receivedQuery
.id = expectedQuery
.id
500 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
501 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
503 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
504 self
.assertTrue(receivedQuery
)
505 self
.assertTrue(receivedResponse
)
506 receivedQuery
.id = expectedQuery
.id
507 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
508 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
510 def testWithEDNSSameSizeInitialECS(self
):
512 ECS Override: Existing EDNS with ECS (same)
514 Send a query with EDNS and a crafted ECS value.
515 Check that the query received by the responder
516 has an overwritten ECS value (not the initial one)
517 and that the response received from dnsdist contains
519 The initial ECS value is exactly the same size as
520 the one it will replaced with.
522 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
523 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
524 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
525 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
526 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
527 response
= dns
.message
.make_response(query
)
528 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
529 rrset
= dns
.rrset
.from_text(name
,
534 response
.answer
.append(rrset
)
536 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
537 self
.assertTrue(receivedQuery
)
538 self
.assertTrue(receivedResponse
)
539 receivedQuery
.id = expectedQuery
.id
540 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
541 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
543 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
544 self
.assertTrue(receivedQuery
)
545 self
.assertTrue(receivedResponse
)
546 receivedQuery
.id = expectedQuery
.id
547 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
548 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
550 class TestECSDisabledByRuleOrLua(DNSDistTest
):
552 dnsdist is configured to add the EDNS0 Client Subnet
553 option, but we disable it via DisableECSAction()
557 _config_template
= """
558 setECSOverride(false)
559 setECSSourcePrefixV4(16)
560 setECSSourcePrefixV6(16)
561 newServer{address="127.0.0.1:%s", useClientSubnet=true}
562 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
563 function disableECSViaLua(dq)
565 return DNSAction.None, ""
567 addLuaAction("disabledvialua.ecsrules.tests.powerdns.com.", disableECSViaLua)
570 def testWithECSNotDisabled(self
):
572 ECS Disable: ECS enabled in the backend
574 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
575 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
576 query
= dns
.message
.make_query(name
, 'A', 'IN')
577 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
578 response
= dns
.message
.make_response(expectedQuery
)
579 expectedResponse
= dns
.message
.make_response(query
)
580 rrset
= dns
.rrset
.from_text(name
,
585 response
.answer
.append(rrset
)
586 expectedResponse
.answer
.append(rrset
)
588 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
589 self
.assertTrue(receivedQuery
)
590 self
.assertTrue(receivedResponse
)
591 receivedQuery
.id = expectedQuery
.id
592 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
593 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
595 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
596 self
.assertTrue(receivedQuery
)
597 self
.assertTrue(receivedResponse
)
598 receivedQuery
.id = expectedQuery
.id
599 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
600 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
602 def testWithECSDisabledViaRule(self
):
604 ECS Disable: ECS enabled in the backend, but disabled by a rule
606 name
= 'disabled.ecsrules.tests.powerdns.com.'
607 query
= dns
.message
.make_query(name
, 'A', 'IN')
608 response
= dns
.message
.make_response(query
)
609 rrset
= dns
.rrset
.from_text(name
,
614 response
.answer
.append(rrset
)
616 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
617 self
.assertTrue(receivedQuery
)
618 self
.assertTrue(receivedResponse
)
619 receivedQuery
.id = query
.id
620 self
.checkQueryNoEDNS(query
, receivedQuery
)
621 self
.checkResponseNoEDNS(response
, receivedResponse
)
623 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
624 self
.assertTrue(receivedQuery
)
625 self
.assertTrue(receivedResponse
)
626 receivedQuery
.id = query
.id
627 self
.checkQueryNoEDNS(query
, receivedQuery
)
628 self
.checkResponseNoEDNS(response
, receivedResponse
)
630 def testWithECSDisabledViaLua(self
):
632 ECS Disable: ECS enabled in the backend, but disabled via Lua
634 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
635 query
= dns
.message
.make_query(name
, 'A', 'IN')
636 response
= dns
.message
.make_response(query
)
637 rrset
= dns
.rrset
.from_text(name
,
642 response
.answer
.append(rrset
)
644 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
645 self
.assertTrue(receivedQuery
)
646 self
.assertTrue(receivedResponse
)
647 receivedQuery
.id = query
.id
648 self
.checkQueryNoEDNS(query
, receivedQuery
)
649 self
.checkResponseNoEDNS(response
, receivedResponse
)
651 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
652 self
.assertTrue(receivedQuery
)
653 self
.assertTrue(receivedResponse
)
654 receivedQuery
.id = query
.id
655 self
.checkQueryNoEDNS(query
, receivedQuery
)
656 self
.checkResponseNoEDNS(response
, receivedResponse
)
658 class TestECSOverrideSetByRuleOrLua(DNSDistTest
):
660 dnsdist is configured to set the EDNS0 Client Subnet
661 option without overriding an existing one, but we
662 force the overriding via ECSOverrideAction() or Lua.
665 _config_template
= """
666 setECSOverride(false)
667 setECSSourcePrefixV4(24)
668 setECSSourcePrefixV6(56)
669 newServer{address="127.0.0.1:%s", useClientSubnet=true}
670 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
671 function overrideECSViaLua(dq)
672 dq.ecsOverride = true
673 return DNSAction.None, ""
675 addLuaAction("overriddenvialua.ecsrules.tests.powerdns.com.", overrideECSViaLua)
678 def testWithECSOverrideNotSet(self
):
680 ECS Override: not set via Lua or a rule
682 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
683 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
684 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
685 response
= dns
.message
.make_response(query
)
686 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
687 rrset
= dns
.rrset
.from_text(name
,
692 response
.answer
.append(rrset
)
694 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
695 self
.assertTrue(receivedQuery
)
696 self
.assertTrue(receivedResponse
)
697 receivedQuery
.id = query
.id
698 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
699 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
701 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
702 self
.assertTrue(receivedQuery
)
703 self
.assertTrue(receivedResponse
)
704 receivedQuery
.id = query
.id
705 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
706 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
708 def testWithECSOverrideSetViaRule(self
):
710 ECS Override: set with a rule
712 name
= 'overridden.ecsrules.tests.powerdns.com.'
713 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
714 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
715 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
716 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
717 response
= dns
.message
.make_response(query
)
718 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
719 rrset
= dns
.rrset
.from_text(name
,
724 response
.answer
.append(rrset
)
726 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
727 self
.assertTrue(receivedQuery
)
728 self
.assertTrue(receivedResponse
)
729 receivedQuery
.id = expectedQuery
.id
730 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
731 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
733 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
734 self
.assertTrue(receivedQuery
)
735 self
.assertTrue(receivedResponse
)
736 receivedQuery
.id = expectedQuery
.id
737 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
738 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
740 def testWithECSOverrideSetViaLua(self
):
742 ECS Override: set via Lua
744 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
745 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
746 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
747 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
748 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
749 response
= dns
.message
.make_response(query
)
750 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
751 rrset
= dns
.rrset
.from_text(name
,
756 response
.answer
.append(rrset
)
758 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
759 self
.assertTrue(receivedQuery
)
760 self
.assertTrue(receivedResponse
)
761 receivedQuery
.id = expectedQuery
.id
762 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
763 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
765 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
766 self
.assertTrue(receivedQuery
)
767 self
.assertTrue(receivedResponse
)
768 receivedQuery
.id = expectedQuery
.id
769 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
770 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
772 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest
):
774 dnsdist is configured to set the EDNS0 Client Subnet
775 option with a prefix length of 24 for IPv4 and 56 for IPv6,
776 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
779 _config_template
= """
780 setECSOverride(false)
781 setECSSourcePrefixV4(24)
782 setECSSourcePrefixV6(56)
783 newServer{address="127.0.0.1:%s", useClientSubnet=true}
784 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
785 function overrideECSPrefixLengthViaLua(dq)
786 dq.ecsPrefixLength = 32
787 return DNSAction.None, ""
789 addLuaAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", overrideECSPrefixLengthViaLua)
792 def testWithECSPrefixLengthNotOverridden(self
):
794 ECS Prefix Length: not overridden via Lua or a rule
796 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
797 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
798 query
= dns
.message
.make_query(name
, 'A', 'IN')
799 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
800 response
= dns
.message
.make_response(query
)
801 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
802 rrset
= dns
.rrset
.from_text(name
,
807 response
.answer
.append(rrset
)
808 expectedResponse
= dns
.message
.make_response(query
)
809 expectedResponse
.answer
.append(rrset
)
811 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
812 self
.assertTrue(receivedQuery
)
813 self
.assertTrue(receivedResponse
)
814 receivedQuery
.id = expectedQuery
.id
815 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
816 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
818 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
819 self
.assertTrue(receivedQuery
)
820 self
.assertTrue(receivedResponse
)
821 receivedQuery
.id = expectedQuery
.id
822 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
823 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
825 def testWithECSPrefixLengthOverriddenViaRule(self
):
827 ECS Prefix Length: overridden with a rule
829 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
830 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
831 query
= dns
.message
.make_query(name
, 'A', 'IN')
832 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
833 response
= dns
.message
.make_response(expectedQuery
)
834 rrset
= dns
.rrset
.from_text(name
,
839 response
.answer
.append(rrset
)
840 expectedResponse
= dns
.message
.make_response(query
)
841 expectedResponse
.answer
.append(rrset
)
843 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
844 self
.assertTrue(receivedQuery
)
845 self
.assertTrue(receivedResponse
)
846 receivedQuery
.id = expectedQuery
.id
847 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
848 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
850 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
851 self
.assertTrue(receivedQuery
)
852 self
.assertTrue(receivedResponse
)
853 receivedQuery
.id = expectedQuery
.id
854 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
855 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
857 def testWithECSPrefixLengthOverriddenViaLua(self
):
859 ECS Prefix Length: overridden via Lua
861 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
862 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
863 query
= dns
.message
.make_query(name
, 'A', 'IN')
864 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
865 response
= dns
.message
.make_response(expectedQuery
)
866 rrset
= dns
.rrset
.from_text(name
,
871 response
.answer
.append(rrset
)
872 expectedResponse
= dns
.message
.make_response(query
)
873 expectedResponse
.answer
.append(rrset
)
875 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
876 self
.assertTrue(receivedQuery
)
877 self
.assertTrue(receivedResponse
)
878 receivedQuery
.id = expectedQuery
.id
879 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
880 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
882 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
883 self
.assertTrue(receivedQuery
)
884 self
.assertTrue(receivedResponse
)
885 receivedQuery
.id = expectedQuery
.id
886 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
887 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)