]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
6 from datetime
import datetime
, timedelta
8 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
15 _config_template
= """
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
20 def testWithoutEDNS(self
):
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
29 name
= 'withoutedns.ecs.tests.powerdns.com.'
30 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
33 response
= dns
.message
.make_response(expectedQuery
)
34 expectedResponse
= dns
.message
.make_response(query
)
35 rrset
= dns
.rrset
.from_text(name
,
40 response
.answer
.append(rrset
)
41 expectedResponse
.answer
.append(rrset
)
43 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
44 self
.assertTrue(receivedQuery
)
45 self
.assertTrue(receivedResponse
)
46 receivedQuery
.id = expectedQuery
.id
47 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
48 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
50 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
51 self
.assertTrue(receivedQuery
)
52 self
.assertTrue(receivedResponse
)
53 receivedQuery
.id = expectedQuery
.id
54 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
55 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
57 def testWithEDNSNoECS(self
):
59 ECS: Existing EDNS without ECS
61 Send a query with EDNS but no ECS value.
62 Check that the query received by the responder
63 has a valid ECS value and that the response
64 received from dnsdist contains an EDNS pseudo-RR.
66 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
67 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
68 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
69 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
70 response
= dns
.message
.make_response(expectedQuery
)
71 expectedResponse
= dns
.message
.make_response(query
)
72 rrset
= dns
.rrset
.from_text(name
,
77 response
.answer
.append(rrset
)
78 expectedResponse
.answer
.append(rrset
)
80 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
81 self
.assertTrue(receivedQuery
)
82 self
.assertTrue(receivedResponse
)
83 receivedQuery
.id = expectedQuery
.id
84 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
85 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
87 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
88 self
.assertTrue(receivedQuery
)
89 self
.assertTrue(receivedResponse
)
90 receivedQuery
.id = expectedQuery
.id
91 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
92 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
94 def testWithEDNSECS(self
):
96 ECS: Existing EDNS with ECS
98 Send a query with EDNS and a crafted ECS value.
99 Check that the query received by the responder
100 has the initial ECS value (not overwritten)
101 and that the response received from dnsdist contains
104 name
= 'withednsecs.ecs.tests.powerdns.com.'
105 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
106 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
107 response
= dns
.message
.make_response(query
)
108 rrset
= dns
.rrset
.from_text(name
,
113 response
.answer
.append(rrset
)
116 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
117 self
.assertTrue(receivedQuery
)
118 self
.assertTrue(receivedResponse
)
119 receivedQuery
.id = query
.id
120 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
121 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
123 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
124 self
.assertTrue(receivedQuery
)
125 self
.assertTrue(receivedResponse
)
126 receivedQuery
.id = query
.id
127 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
128 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
130 def testWithoutEDNSResponseWithECS(self
):
132 ECS: No existing EDNS (BE returning ECS)
134 Send a query without EDNS, check that the query
135 received by the responder has the correct ECS value
136 and that the response received from dnsdist does not
137 have an EDNS pseudo-RR.
138 This time the response returned by the backend contains
139 an ECS option with scope set.
141 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
142 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
143 query
= dns
.message
.make_query(name
, 'A', 'IN')
144 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
145 response
= dns
.message
.make_response(expectedQuery
)
146 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
147 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
148 expectedResponse
= dns
.message
.make_response(query
)
149 rrset
= dns
.rrset
.from_text(name
,
154 response
.answer
.append(rrset
)
155 expectedResponse
.answer
.append(rrset
)
157 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
158 self
.assertTrue(receivedQuery
)
159 self
.assertTrue(receivedResponse
)
160 receivedQuery
.id = expectedQuery
.id
161 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
162 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
164 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
165 self
.assertTrue(receivedQuery
)
166 self
.assertTrue(receivedResponse
)
167 receivedQuery
.id = expectedQuery
.id
168 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
169 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
171 def testWithEDNSNoECSResponseWithECS(self
):
173 ECS: Existing EDNS without ECS (BE returning only the ECS option)
175 Send a query with EDNS but no ECS value.
176 Check that the query received by the responder
177 has a valid ECS value and that the response
178 received from dnsdist contains an EDNS pseudo-RR.
179 This time the response returned by the backend contains
180 an ECS option with scope set.
182 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
183 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
184 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
185 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
186 response
= dns
.message
.make_response(expectedQuery
)
187 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
188 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
189 expectedResponse
= dns
.message
.make_response(query
)
190 rrset
= dns
.rrset
.from_text(name
,
195 response
.answer
.append(rrset
)
196 expectedResponse
.answer
.append(rrset
)
198 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
199 self
.assertTrue(receivedQuery
)
200 self
.assertTrue(receivedResponse
)
201 receivedQuery
.id = expectedQuery
.id
202 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
203 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
205 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
206 self
.assertTrue(receivedQuery
)
207 self
.assertTrue(receivedResponse
)
208 receivedQuery
.id = expectedQuery
.id
209 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
210 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
212 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
214 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
216 Send a query with EDNS but no ECS value.
217 Check that the query received by the responder
218 has a valid ECS value and that the response
219 received from dnsdist contains an EDNS pseudo-RR.
220 This time the response returned by the backend contains
221 one cookies then one ECS option.
223 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
224 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
225 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
226 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
227 response
= dns
.message
.make_response(expectedQuery
)
228 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
229 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
230 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
231 expectedResponse
= dns
.message
.make_response(query
)
232 rrset
= dns
.rrset
.from_text(name
,
237 response
.answer
.append(rrset
)
238 expectedResponse
.answer
.append(rrset
)
239 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
241 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
242 self
.assertTrue(receivedQuery
)
243 self
.assertTrue(receivedResponse
)
244 receivedQuery
.id = expectedQuery
.id
245 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
246 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
248 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
249 self
.assertTrue(receivedQuery
)
250 self
.assertTrue(receivedResponse
)
251 receivedQuery
.id = expectedQuery
.id
252 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
253 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
255 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
257 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
259 Send a query with EDNS but no ECS value.
260 Check that the query received by the responder
261 has a valid ECS value and that the response
262 received from dnsdist contains an EDNS pseudo-RR.
263 This time the response returned by the backend contains
264 one ECS then one Cookies option.
266 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
267 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
268 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
269 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
270 response
= dns
.message
.make_response(expectedQuery
)
271 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
272 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
273 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
274 expectedResponse
= dns
.message
.make_response(query
)
275 rrset
= dns
.rrset
.from_text(name
,
280 response
.answer
.append(rrset
)
281 expectedResponse
.answer
.append(rrset
)
282 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
284 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
285 self
.assertTrue(receivedQuery
)
286 self
.assertTrue(receivedResponse
)
287 receivedQuery
.id = expectedQuery
.id
288 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
289 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
291 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
292 self
.assertTrue(receivedQuery
)
293 self
.assertTrue(receivedResponse
)
294 receivedQuery
.id = expectedQuery
.id
295 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
296 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
298 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
300 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
302 Send a query with EDNS but no ECS value.
303 Check that the query received by the responder
304 has a valid ECS value and that the response
305 received from dnsdist contains an EDNS pseudo-RR.
306 This time the response returned by the backend contains
307 one Cookies, one ECS then one Cookies option.
309 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
310 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
311 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
312 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
313 response
= dns
.message
.make_response(expectedQuery
)
314 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
315 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
316 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
317 expectedResponse
= dns
.message
.make_response(query
)
318 rrset
= dns
.rrset
.from_text(name
,
323 response
.answer
.append(rrset
)
324 expectedResponse
.answer
.append(rrset
)
326 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
327 self
.assertTrue(receivedQuery
)
328 self
.assertTrue(receivedResponse
)
329 receivedQuery
.id = expectedQuery
.id
330 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
331 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
333 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
334 self
.assertTrue(receivedQuery
)
335 self
.assertTrue(receivedResponse
)
336 receivedQuery
.id = expectedQuery
.id
337 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
338 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
341 class TestEdnsClientSubnetOverride(DNSDistTest
):
343 dnsdist is configured to add the EDNS0 Client Subnet
344 option, overwriting any existing value.
347 _config_template
= """
350 setECSSourcePrefixV4(24)
351 setECSSourcePrefixV6(56)
352 newServer{address="127.0.0.1:%s", useClientSubnet=true}
355 def testWithoutEDNS(self
):
357 ECS Override: No existing EDNS
359 Send a query without EDNS, check that the query
360 received by the responder has the correct ECS value
361 and that the response received from dnsdist does not
362 have an EDNS pseudo-RR.
364 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
365 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
366 query
= dns
.message
.make_query(name
, 'A', 'IN')
367 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
368 response
= dns
.message
.make_response(expectedQuery
)
369 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
370 rrset
= dns
.rrset
.from_text(name
,
375 response
.answer
.append(rrset
)
376 expectedResponse
= dns
.message
.make_response(query
)
377 expectedResponse
.answer
.append(rrset
)
379 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
380 self
.assertTrue(receivedQuery
)
381 self
.assertTrue(receivedResponse
)
382 receivedQuery
.id = expectedQuery
.id
383 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
384 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
386 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
387 self
.assertTrue(receivedQuery
)
388 self
.assertTrue(receivedResponse
)
389 receivedQuery
.id = expectedQuery
.id
390 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
391 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
393 def testWithEDNSNoECS(self
):
395 ECS Override: Existing EDNS without ECS
397 Send a query with EDNS but no ECS value.
398 Check that the query received by the responder
399 has a valid ECS value and that the response
400 received from dnsdist contains an EDNS pseudo-RR.
402 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
403 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
404 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
405 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
406 response
= dns
.message
.make_response(expectedQuery
)
407 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
408 rrset
= dns
.rrset
.from_text(name
,
413 response
.answer
.append(rrset
)
414 expectedResponse
= dns
.message
.make_response(query
)
415 expectedResponse
.answer
.append(rrset
)
417 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
418 self
.assertTrue(receivedQuery
)
419 self
.assertTrue(receivedResponse
)
420 receivedQuery
.id = expectedQuery
.id
421 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
422 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
424 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
425 self
.assertTrue(receivedQuery
)
426 self
.assertTrue(receivedResponse
)
427 receivedQuery
.id = expectedQuery
.id
428 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
429 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
431 def testWithEDNSShorterInitialECS(self
):
433 ECS Override: Existing EDNS with ECS (short)
435 Send a query with EDNS and a crafted ECS value.
436 Check that the query received by the responder
437 has an overwritten ECS value (not the initial one)
438 and that the response received from dnsdist contains
440 The initial ECS value is shorter than the one it will be
443 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
444 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
445 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
446 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
447 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
448 response
= dns
.message
.make_response(query
)
449 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
450 rrset
= dns
.rrset
.from_text(name
,
455 response
.answer
.append(rrset
)
457 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
458 self
.assertTrue(receivedQuery
)
459 self
.assertTrue(receivedResponse
)
460 receivedQuery
.id = expectedQuery
.id
461 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
462 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
464 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
465 self
.assertTrue(receivedQuery
)
466 self
.assertTrue(receivedResponse
)
467 receivedQuery
.id = expectedQuery
.id
468 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
469 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
471 def testWithEDNSLongerInitialECS(self
):
473 ECS Override: Existing EDNS with ECS (long)
475 Send a query with EDNS and a crafted ECS value.
476 Check that the query received by the responder
477 has an overwritten ECS value (not the initial one)
478 and that the response received from dnsdist contains
480 The initial ECS value is longer than the one it will
483 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
484 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
485 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
486 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
487 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
488 response
= dns
.message
.make_response(query
)
489 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
490 rrset
= dns
.rrset
.from_text(name
,
495 response
.answer
.append(rrset
)
497 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
498 self
.assertTrue(receivedQuery
)
499 self
.assertTrue(receivedResponse
)
500 receivedQuery
.id = expectedQuery
.id
501 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
502 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
504 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
505 self
.assertTrue(receivedQuery
)
506 self
.assertTrue(receivedResponse
)
507 receivedQuery
.id = expectedQuery
.id
508 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
509 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
511 def testWithEDNSSameSizeInitialECS(self
):
513 ECS Override: Existing EDNS with ECS (same)
515 Send a query with EDNS and a crafted ECS value.
516 Check that the query received by the responder
517 has an overwritten ECS value (not the initial one)
518 and that the response received from dnsdist contains
520 The initial ECS value is exactly the same size as
521 the one it will replaced with.
523 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
524 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
525 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
526 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
527 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
528 response
= dns
.message
.make_response(query
)
529 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
530 rrset
= dns
.rrset
.from_text(name
,
535 response
.answer
.append(rrset
)
537 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
538 self
.assertTrue(receivedQuery
)
539 self
.assertTrue(receivedResponse
)
540 receivedQuery
.id = expectedQuery
.id
541 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
542 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
544 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
545 self
.assertTrue(receivedQuery
)
546 self
.assertTrue(receivedResponse
)
547 receivedQuery
.id = expectedQuery
.id
548 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
549 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
551 class TestECSDisabledByRuleOrLua(DNSDistTest
):
553 dnsdist is configured to add the EDNS0 Client Subnet
554 option, but we disable it via DisableECSAction()
558 _config_template
= """
559 setECSOverride(false)
560 setECSSourcePrefixV4(16)
561 setECSSourcePrefixV6(16)
562 newServer{address="127.0.0.1:%s", useClientSubnet=true}
563 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
564 function disableECSViaLua(dq)
566 return DNSAction.None, ""
568 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
571 def testWithECSNotDisabled(self
):
573 ECS Disable: ECS enabled in the backend
575 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
576 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
577 query
= dns
.message
.make_query(name
, 'A', 'IN')
578 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
579 response
= dns
.message
.make_response(expectedQuery
)
580 expectedResponse
= dns
.message
.make_response(query
)
581 rrset
= dns
.rrset
.from_text(name
,
586 response
.answer
.append(rrset
)
587 expectedResponse
.answer
.append(rrset
)
589 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
590 self
.assertTrue(receivedQuery
)
591 self
.assertTrue(receivedResponse
)
592 receivedQuery
.id = expectedQuery
.id
593 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
594 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
596 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
597 self
.assertTrue(receivedQuery
)
598 self
.assertTrue(receivedResponse
)
599 receivedQuery
.id = expectedQuery
.id
600 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
601 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
603 def testWithECSDisabledViaRule(self
):
605 ECS Disable: ECS enabled in the backend, but disabled by a rule
607 name
= 'disabled.ecsrules.tests.powerdns.com.'
608 query
= dns
.message
.make_query(name
, 'A', 'IN')
609 response
= dns
.message
.make_response(query
)
610 rrset
= dns
.rrset
.from_text(name
,
615 response
.answer
.append(rrset
)
617 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
618 self
.assertTrue(receivedQuery
)
619 self
.assertTrue(receivedResponse
)
620 receivedQuery
.id = query
.id
621 self
.checkQueryNoEDNS(query
, receivedQuery
)
622 self
.checkResponseNoEDNS(response
, receivedResponse
)
624 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
625 self
.assertTrue(receivedQuery
)
626 self
.assertTrue(receivedResponse
)
627 receivedQuery
.id = query
.id
628 self
.checkQueryNoEDNS(query
, receivedQuery
)
629 self
.checkResponseNoEDNS(response
, receivedResponse
)
631 def testWithECSDisabledViaLua(self
):
633 ECS Disable: ECS enabled in the backend, but disabled via Lua
635 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
636 query
= dns
.message
.make_query(name
, 'A', 'IN')
637 response
= dns
.message
.make_response(query
)
638 rrset
= dns
.rrset
.from_text(name
,
643 response
.answer
.append(rrset
)
645 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
646 self
.assertTrue(receivedQuery
)
647 self
.assertTrue(receivedResponse
)
648 receivedQuery
.id = query
.id
649 self
.checkQueryNoEDNS(query
, receivedQuery
)
650 self
.checkResponseNoEDNS(response
, receivedResponse
)
652 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
653 self
.assertTrue(receivedQuery
)
654 self
.assertTrue(receivedResponse
)
655 receivedQuery
.id = query
.id
656 self
.checkQueryNoEDNS(query
, receivedQuery
)
657 self
.checkResponseNoEDNS(response
, receivedResponse
)
659 class TestECSOverrideSetByRuleOrLua(DNSDistTest
):
661 dnsdist is configured to set the EDNS0 Client Subnet
662 option without overriding an existing one, but we
663 force the overriding via ECSOverrideAction() or Lua.
666 _config_template
= """
667 setECSOverride(false)
668 setECSSourcePrefixV4(24)
669 setECSSourcePrefixV6(56)
670 newServer{address="127.0.0.1:%s", useClientSubnet=true}
671 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
672 function overrideECSViaLua(dq)
673 dq.ecsOverride = true
674 return DNSAction.None, ""
676 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
679 def testWithECSOverrideNotSet(self
):
681 ECS Override: not set via Lua or a rule
683 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
684 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
685 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
686 response
= dns
.message
.make_response(query
)
687 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
688 rrset
= dns
.rrset
.from_text(name
,
693 response
.answer
.append(rrset
)
695 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
696 self
.assertTrue(receivedQuery
)
697 self
.assertTrue(receivedResponse
)
698 receivedQuery
.id = query
.id
699 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
700 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
702 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
703 self
.assertTrue(receivedQuery
)
704 self
.assertTrue(receivedResponse
)
705 receivedQuery
.id = query
.id
706 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
707 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
709 def testWithECSOverrideSetViaRule(self
):
711 ECS Override: set with a rule
713 name
= 'overridden.ecsrules.tests.powerdns.com.'
714 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
715 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
716 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
717 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
718 response
= dns
.message
.make_response(query
)
719 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
720 rrset
= dns
.rrset
.from_text(name
,
725 response
.answer
.append(rrset
)
727 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
728 self
.assertTrue(receivedQuery
)
729 self
.assertTrue(receivedResponse
)
730 receivedQuery
.id = expectedQuery
.id
731 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
732 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
734 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
735 self
.assertTrue(receivedQuery
)
736 self
.assertTrue(receivedResponse
)
737 receivedQuery
.id = expectedQuery
.id
738 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
739 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
741 def testWithECSOverrideSetViaLua(self
):
743 ECS Override: set via Lua
745 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
746 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
747 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
748 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
749 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
750 response
= dns
.message
.make_response(query
)
751 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
752 rrset
= dns
.rrset
.from_text(name
,
757 response
.answer
.append(rrset
)
759 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
760 self
.assertTrue(receivedQuery
)
761 self
.assertTrue(receivedResponse
)
762 receivedQuery
.id = expectedQuery
.id
763 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
764 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
766 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
767 self
.assertTrue(receivedQuery
)
768 self
.assertTrue(receivedResponse
)
769 receivedQuery
.id = expectedQuery
.id
770 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
771 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
773 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest
):
775 dnsdist is configured to set the EDNS0 Client Subnet
776 option with a prefix length of 24 for IPv4 and 56 for IPv6,
777 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
780 _config_template
= """
781 setECSOverride(false)
782 setECSSourcePrefixV4(24)
783 setECSSourcePrefixV6(56)
784 newServer{address="127.0.0.1:%s", useClientSubnet=true}
785 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
786 function overrideECSPrefixLengthViaLua(dq)
787 dq.ecsPrefixLength = 32
788 return DNSAction.None, ""
790 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
793 def testWithECSPrefixLengthNotOverridden(self
):
795 ECS Prefix Length: not overridden via Lua or a rule
797 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
798 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
799 query
= dns
.message
.make_query(name
, 'A', 'IN')
800 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
801 response
= dns
.message
.make_response(query
)
802 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
803 rrset
= dns
.rrset
.from_text(name
,
808 response
.answer
.append(rrset
)
809 expectedResponse
= dns
.message
.make_response(query
)
810 expectedResponse
.answer
.append(rrset
)
812 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
813 self
.assertTrue(receivedQuery
)
814 self
.assertTrue(receivedResponse
)
815 receivedQuery
.id = expectedQuery
.id
816 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
817 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
819 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
820 self
.assertTrue(receivedQuery
)
821 self
.assertTrue(receivedResponse
)
822 receivedQuery
.id = expectedQuery
.id
823 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
824 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
826 def testWithECSPrefixLengthOverriddenViaRule(self
):
828 ECS Prefix Length: overridden with a rule
830 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
831 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
832 query
= dns
.message
.make_query(name
, 'A', 'IN')
833 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
834 response
= dns
.message
.make_response(expectedQuery
)
835 rrset
= dns
.rrset
.from_text(name
,
840 response
.answer
.append(rrset
)
841 expectedResponse
= dns
.message
.make_response(query
)
842 expectedResponse
.answer
.append(rrset
)
844 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
845 self
.assertTrue(receivedQuery
)
846 self
.assertTrue(receivedResponse
)
847 receivedQuery
.id = expectedQuery
.id
848 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
849 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
851 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
852 self
.assertTrue(receivedQuery
)
853 self
.assertTrue(receivedResponse
)
854 receivedQuery
.id = expectedQuery
.id
855 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
856 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
858 def testWithECSPrefixLengthOverriddenViaLua(self
):
860 ECS Prefix Length: overridden via Lua
862 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
863 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
864 query
= dns
.message
.make_query(name
, 'A', 'IN')
865 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
866 response
= dns
.message
.make_response(expectedQuery
)
867 rrset
= dns
.rrset
.from_text(name
,
872 response
.answer
.append(rrset
)
873 expectedResponse
= dns
.message
.make_response(query
)
874 expectedResponse
.answer
.append(rrset
)
876 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
877 self
.assertTrue(receivedQuery
)
878 self
.assertTrue(receivedResponse
)
879 receivedQuery
.id = expectedQuery
.id
880 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
881 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
883 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
884 self
.assertTrue(receivedQuery
)
885 self
.assertTrue(receivedResponse
)
886 receivedQuery
.id = expectedQuery
.id
887 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
888 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
890 class TestECSPrefixSetByRule(DNSDistTest
):
892 dnsdist is configured to set the EDNS0 Client Subnet
893 option for incoming queries to the actual source IP,
894 but we override it for some queries via SetECSAction().
897 _config_template
= """
898 setECSOverride(false)
899 setECSSourcePrefixV4(32)
900 setECSSourcePrefixV6(128)
901 newServer{address="127.0.0.1:%s", useClientSubnet=true}
902 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
905 def testWithRegularECS(self
):
909 name
= 'notsetecsaction.ecsrules.tests.powerdns.com.'
910 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
911 query
= dns
.message
.make_query(name
, 'A', 'IN')
912 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
913 response
= dns
.message
.make_response(query
)
914 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
915 rrset
= dns
.rrset
.from_text(name
,
920 response
.answer
.append(rrset
)
921 expectedResponse
= dns
.message
.make_response(query
)
922 expectedResponse
.answer
.append(rrset
)
924 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
925 self
.assertTrue(receivedQuery
)
926 self
.assertTrue(receivedResponse
)
927 receivedQuery
.id = expectedQuery
.id
928 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
929 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
931 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
932 self
.assertTrue(receivedQuery
)
933 self
.assertTrue(receivedResponse
)
934 receivedQuery
.id = expectedQuery
.id
935 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
936 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
938 def testWithECSSetByRule(self
):
940 ECS Prefix: set with SetECSAction
942 name
= 'setecsaction.ecsrules.tests.powerdns.com.'
943 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
944 query
= dns
.message
.make_query(name
, 'A', 'IN')
945 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
946 response
= dns
.message
.make_response(expectedQuery
)
947 rrset
= dns
.rrset
.from_text(name
,
952 response
.answer
.append(rrset
)
953 expectedResponse
= dns
.message
.make_response(query
)
954 expectedResponse
.answer
.append(rrset
)
956 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
957 self
.assertTrue(receivedQuery
)
958 self
.assertTrue(receivedResponse
)
959 receivedQuery
.id = expectedQuery
.id
960 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
961 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
963 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
964 self
.assertTrue(receivedQuery
)
965 self
.assertTrue(receivedResponse
)
966 receivedQuery
.id = expectedQuery
.id
967 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
968 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)