]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestEdnsClientSubnet(DNSDistTest
):
8 def compareOptions(self
, a
, b
):
9 self
.assertEquals(len(a
), len(b
))
10 for idx
in xrange(len(a
)):
11 self
.assertEquals(a
[idx
], b
[idx
])
13 def checkMessageNoEDNS(self
, expected
, received
):
14 self
.assertEquals(expected
, received
)
15 self
.assertEquals(received
.edns
, -1)
16 self
.assertEquals(len(received
.options
), 0)
18 def checkMessageEDNSWithoutECS(self
, expected
, received
, withCookies
=0):
19 self
.assertEquals(expected
, received
)
20 self
.assertEquals(received
.edns
, 0)
21 self
.assertEquals(len(received
.options
), withCookies
)
23 for option
in received
.options
:
24 self
.assertEquals(option
.otype
, 10)
26 def checkMessageEDNSWithECS(self
, expected
, received
):
27 self
.assertEquals(expected
, received
)
28 self
.assertEquals(received
.edns
, 0)
29 self
.assertEquals(len(received
.options
), 1)
30 self
.assertEquals(received
.options
[0].otype
, clientsubnetoption
.ASSIGNED_OPTION_CODE
)
31 self
.compareOptions(expected
.options
, received
.options
)
33 def checkQueryEDNSWithECS(self
, expected
, received
):
34 self
.checkMessageEDNSWithECS(expected
, received
)
36 def checkResponseEDNSWithECS(self
, expected
, received
):
37 self
.checkMessageEDNSWithECS(expected
, received
)
39 def checkQueryEDNSWithoutECS(self
, expected
, received
):
40 self
.checkMessageEDNSWithoutECS(expected
, received
)
42 def checkResponseEDNSWithoutECS(self
, expected
, received
, withCookies
=0):
43 self
.checkMessageEDNSWithoutECS(expected
, received
, withCookies
)
45 def checkQueryNoEDNS(self
, expected
, received
):
46 self
.checkMessageNoEDNS(expected
, received
)
48 def checkResponseNoEDNS(self
, expected
, received
):
49 self
.checkMessageNoEDNS(expected
, received
)
51 class TestEdnsClientSubnetNoOverride(TestEdnsClientSubnet
):
53 dnsdist is configured to add the EDNS0 Client Subnet
54 option, but only if it's not already present in the
58 _config_template
= """
60 newServer{address="127.0.0.1:%s", useClientSubnet=true}
63 def testWithoutEDNS(self
):
67 Send a query without EDNS, check that the query
68 received by the responder has the correct ECS value
69 and that the response received from dnsdist does not
70 have an EDNS pseudo-RR.
72 name
= 'withoutedns.ecs.tests.powerdns.com.'
73 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
74 query
= dns
.message
.make_query(name
, 'A', 'IN')
75 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
76 response
= dns
.message
.make_response(expectedQuery
)
77 expectedResponse
= dns
.message
.make_response(query
)
78 rrset
= dns
.rrset
.from_text(name
,
83 response
.answer
.append(rrset
)
84 expectedResponse
.answer
.append(rrset
)
86 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
87 self
.assertTrue(receivedQuery
)
88 self
.assertTrue(receivedResponse
)
89 receivedQuery
.id = expectedQuery
.id
90 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
91 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
93 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
94 self
.assertTrue(receivedQuery
)
95 self
.assertTrue(receivedResponse
)
96 receivedQuery
.id = expectedQuery
.id
97 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
98 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
100 def testWithEDNSNoECS(self
):
102 ECS: Existing EDNS without ECS
104 Send a query with EDNS but no ECS value.
105 Check that the query received by the responder
106 has a valid ECS value and that the response
107 received from dnsdist contains an EDNS pseudo-RR.
109 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
110 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
111 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
112 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
113 response
= dns
.message
.make_response(expectedQuery
)
114 expectedResponse
= dns
.message
.make_response(query
)
115 rrset
= dns
.rrset
.from_text(name
,
120 response
.answer
.append(rrset
)
121 expectedResponse
.answer
.append(rrset
)
123 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
124 self
.assertTrue(receivedQuery
)
125 self
.assertTrue(receivedResponse
)
126 receivedQuery
.id = expectedQuery
.id
127 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
128 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
130 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
131 self
.assertTrue(receivedQuery
)
132 self
.assertTrue(receivedResponse
)
133 receivedQuery
.id = expectedQuery
.id
134 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
135 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
137 def testWithEDNSECS(self
):
139 ECS: Existing EDNS with ECS
141 Send a query with EDNS and a crafted ECS value.
142 Check that the query received by the responder
143 has the initial ECS value (not overwritten)
144 and that the response received from dnsdist contains
147 name
= 'withednsecs.ecs.tests.powerdns.com.'
148 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
149 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
150 response
= dns
.message
.make_response(query
)
151 rrset
= dns
.rrset
.from_text(name
,
156 response
.answer
.append(rrset
)
159 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
160 self
.assertTrue(receivedQuery
)
161 self
.assertTrue(receivedResponse
)
162 receivedQuery
.id = query
.id
163 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
164 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
166 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
167 self
.assertTrue(receivedQuery
)
168 self
.assertTrue(receivedResponse
)
169 receivedQuery
.id = query
.id
170 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
171 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
173 def testWithoutEDNSResponseWithECS(self
):
175 ECS: No existing EDNS (BE returning ECS)
177 Send a query without EDNS, check that the query
178 received by the responder has the correct ECS value
179 and that the response received from dnsdist does not
180 have an EDNS pseudo-RR.
181 This time the response returned by the backend contains
182 an ECS option with scope set.
184 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
185 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
186 query
= dns
.message
.make_query(name
, 'A', 'IN')
187 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
188 response
= dns
.message
.make_response(expectedQuery
)
189 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
190 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
191 expectedResponse
= dns
.message
.make_response(query
)
192 rrset
= dns
.rrset
.from_text(name
,
197 response
.answer
.append(rrset
)
198 expectedResponse
.answer
.append(rrset
)
200 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
201 self
.assertTrue(receivedQuery
)
202 self
.assertTrue(receivedResponse
)
203 receivedQuery
.id = expectedQuery
.id
204 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
205 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
207 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
208 self
.assertTrue(receivedQuery
)
209 self
.assertTrue(receivedResponse
)
210 receivedQuery
.id = expectedQuery
.id
211 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
212 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
214 def testWithEDNSNoECSResponseWithECS(self
):
216 ECS: Existing EDNS without ECS (BE returning only the ECS option)
218 Send a query with EDNS but no ECS value.
219 Check that the query received by the responder
220 has a valid ECS value and that the response
221 received from dnsdist contains an EDNS pseudo-RR.
222 This time the response returned by the backend contains
223 an ECS option with scope set.
225 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
226 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
227 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
228 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
229 response
= dns
.message
.make_response(expectedQuery
)
230 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
231 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
232 expectedResponse
= dns
.message
.make_response(query
)
233 rrset
= dns
.rrset
.from_text(name
,
238 response
.answer
.append(rrset
)
239 expectedResponse
.answer
.append(rrset
)
241 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
242 self
.assertTrue(receivedQuery
)
243 self
.assertTrue(receivedResponse
)
244 receivedQuery
.id = expectedQuery
.id
245 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
246 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
248 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
249 self
.assertTrue(receivedQuery
)
250 self
.assertTrue(receivedResponse
)
251 receivedQuery
.id = expectedQuery
.id
252 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
253 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
255 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
257 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
259 Send a query with EDNS but no ECS value.
260 Check that the query received by the responder
261 has a valid ECS value and that the response
262 received from dnsdist contains an EDNS pseudo-RR.
263 This time the response returned by the backend contains
264 one cookies then one ECS option.
266 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
267 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
268 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
269 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
270 response
= dns
.message
.make_response(expectedQuery
)
271 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
272 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
273 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
274 expectedResponse
= dns
.message
.make_response(query
)
275 rrset
= dns
.rrset
.from_text(name
,
280 response
.answer
.append(rrset
)
281 expectedResponse
.answer
.append(rrset
)
282 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
284 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
285 self
.assertTrue(receivedQuery
)
286 self
.assertTrue(receivedResponse
)
287 receivedQuery
.id = expectedQuery
.id
288 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
289 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
291 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
292 self
.assertTrue(receivedQuery
)
293 self
.assertTrue(receivedResponse
)
294 receivedQuery
.id = expectedQuery
.id
295 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
296 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
298 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
300 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
302 Send a query with EDNS but no ECS value.
303 Check that the query received by the responder
304 has a valid ECS value and that the response
305 received from dnsdist contains an EDNS pseudo-RR.
306 This time the response returned by the backend contains
307 one ECS then one Cookies option.
309 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
310 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
311 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
312 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
313 response
= dns
.message
.make_response(expectedQuery
)
314 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
315 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
316 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
317 expectedResponse
= dns
.message
.make_response(query
)
318 rrset
= dns
.rrset
.from_text(name
,
323 response
.answer
.append(rrset
)
324 expectedResponse
.answer
.append(rrset
)
325 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
327 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
328 self
.assertTrue(receivedQuery
)
329 self
.assertTrue(receivedResponse
)
330 receivedQuery
.id = expectedQuery
.id
331 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
332 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
334 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
335 self
.assertTrue(receivedQuery
)
336 self
.assertTrue(receivedResponse
)
337 receivedQuery
.id = expectedQuery
.id
338 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
339 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
341 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
343 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
345 Send a query with EDNS but no ECS value.
346 Check that the query received by the responder
347 has a valid ECS value and that the response
348 received from dnsdist contains an EDNS pseudo-RR.
349 This time the response returned by the backend contains
350 one Cookies, one ECS then one Cookies option.
352 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
353 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
354 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
355 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
356 response
= dns
.message
.make_response(expectedQuery
)
357 ecoResponse
= cookiesoption
.CookiesOption('deadbeef', 'deadbeef')
358 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
359 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
360 expectedResponse
= dns
.message
.make_response(query
)
361 rrset
= dns
.rrset
.from_text(name
,
366 response
.answer
.append(rrset
)
367 expectedResponse
.answer
.append(rrset
)
369 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
370 self
.assertTrue(receivedQuery
)
371 self
.assertTrue(receivedResponse
)
372 receivedQuery
.id = expectedQuery
.id
373 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
374 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
376 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
377 self
.assertTrue(receivedQuery
)
378 self
.assertTrue(receivedResponse
)
379 receivedQuery
.id = expectedQuery
.id
380 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
381 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
384 class TestEdnsClientSubnetOverride(TestEdnsClientSubnet
):
386 dnsdist is configured to add the EDNS0 Client Subnet
387 option, overwriting any existing value.
390 _config_template
= """
393 setECSSourcePrefixV4(24)
394 setECSSourcePrefixV6(56)
395 newServer{address="127.0.0.1:%s", useClientSubnet=true}
398 def testWithoutEDNS(self
):
400 ECS Override: No existing EDNS
402 Send a query without EDNS, check that the query
403 received by the responder has the correct ECS value
404 and that the response received from dnsdist does not
405 have an EDNS pseudo-RR.
407 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
408 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
409 query
= dns
.message
.make_query(name
, 'A', 'IN')
410 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
411 response
= dns
.message
.make_response(expectedQuery
)
412 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
413 rrset
= dns
.rrset
.from_text(name
,
418 response
.answer
.append(rrset
)
419 expectedResponse
= dns
.message
.make_response(query
)
420 expectedResponse
.answer
.append(rrset
)
422 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
423 self
.assertTrue(receivedQuery
)
424 self
.assertTrue(receivedResponse
)
425 receivedQuery
.id = expectedQuery
.id
426 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
427 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
429 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
430 self
.assertTrue(receivedQuery
)
431 self
.assertTrue(receivedResponse
)
432 receivedQuery
.id = expectedQuery
.id
433 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
434 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
436 def testWithEDNSNoECS(self
):
438 ECS Override: Existing EDNS without ECS
440 Send a query with EDNS but no ECS value.
441 Check that the query received by the responder
442 has a valid ECS value and that the response
443 received from dnsdist contains an EDNS pseudo-RR.
445 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
446 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
447 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
448 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
449 response
= dns
.message
.make_response(expectedQuery
)
450 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
451 rrset
= dns
.rrset
.from_text(name
,
456 response
.answer
.append(rrset
)
457 expectedResponse
= dns
.message
.make_response(query
)
458 expectedResponse
.answer
.append(rrset
)
460 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
461 self
.assertTrue(receivedQuery
)
462 self
.assertTrue(receivedResponse
)
463 receivedQuery
.id = expectedQuery
.id
464 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
465 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
467 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
468 self
.assertTrue(receivedQuery
)
469 self
.assertTrue(receivedResponse
)
470 receivedQuery
.id = expectedQuery
.id
471 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
472 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
474 def testWithEDNSShorterInitialECS(self
):
476 ECS Override: Existing EDNS with ECS (short)
478 Send a query with EDNS and a crafted ECS value.
479 Check that the query received by the responder
480 has an overwritten ECS value (not the initial one)
481 and that the response received from dnsdist contains
483 The initial ECS value is shorter than the one it will be
486 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
487 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
488 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
489 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
490 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
491 response
= dns
.message
.make_response(query
)
492 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
493 rrset
= dns
.rrset
.from_text(name
,
498 response
.answer
.append(rrset
)
500 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
501 self
.assertTrue(receivedQuery
)
502 self
.assertTrue(receivedResponse
)
503 receivedQuery
.id = expectedQuery
.id
504 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
505 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
507 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
508 self
.assertTrue(receivedQuery
)
509 self
.assertTrue(receivedResponse
)
510 receivedQuery
.id = expectedQuery
.id
511 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
512 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
514 def testWithEDNSLongerInitialECS(self
):
516 ECS Override: Existing EDNS with ECS (long)
518 Send a query with EDNS and a crafted ECS value.
519 Check that the query received by the responder
520 has an overwritten ECS value (not the initial one)
521 and that the response received from dnsdist contains
523 The initial ECS value is longer than the one it will
526 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
527 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
528 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
529 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
530 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
531 response
= dns
.message
.make_response(query
)
532 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
533 rrset
= dns
.rrset
.from_text(name
,
538 response
.answer
.append(rrset
)
540 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
541 self
.assertTrue(receivedQuery
)
542 self
.assertTrue(receivedResponse
)
543 receivedQuery
.id = expectedQuery
.id
544 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
545 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
547 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
548 self
.assertTrue(receivedQuery
)
549 self
.assertTrue(receivedResponse
)
550 receivedQuery
.id = expectedQuery
.id
551 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
552 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
554 def testWithEDNSSameSizeInitialECS(self
):
556 ECS Override: Existing EDNS with ECS (same)
558 Send a query with EDNS and a crafted ECS value.
559 Check that the query received by the responder
560 has an overwritten ECS value (not the initial one)
561 and that the response received from dnsdist contains
563 The initial ECS value is exactly the same size as
564 the one it will replaced with.
566 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
567 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
568 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
569 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
570 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
571 response
= dns
.message
.make_response(query
)
572 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
573 rrset
= dns
.rrset
.from_text(name
,
578 response
.answer
.append(rrset
)
580 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
581 self
.assertTrue(receivedQuery
)
582 self
.assertTrue(receivedResponse
)
583 receivedQuery
.id = expectedQuery
.id
584 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
585 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
587 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
588 self
.assertTrue(receivedQuery
)
589 self
.assertTrue(receivedResponse
)
590 receivedQuery
.id = expectedQuery
.id
591 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
592 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
594 class TestECSDisabledByRuleOrLua(TestEdnsClientSubnet
):
596 dnsdist is configured to add the EDNS0 Client Subnet
597 option, but we disable it via DisableECSAction()
601 _config_template
= """
602 setECSOverride(false)
603 setECSSourcePrefixV4(16)
604 setECSSourcePrefixV6(16)
605 newServer{address="127.0.0.1:%s", useClientSubnet=true}
606 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
607 function disableECSViaLua(dq)
609 return DNSAction.None, ""
611 addLuaAction("disabledvialua.ecsrules.tests.powerdns.com.", disableECSViaLua)
614 def testWithECSNotDisabled(self
):
616 ECS Disable: ECS enabled in the backend
618 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
619 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
620 query
= dns
.message
.make_query(name
, 'A', 'IN')
621 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
622 response
= dns
.message
.make_response(expectedQuery
)
623 expectedResponse
= dns
.message
.make_response(query
)
624 rrset
= dns
.rrset
.from_text(name
,
629 response
.answer
.append(rrset
)
630 expectedResponse
.answer
.append(rrset
)
632 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
633 self
.assertTrue(receivedQuery
)
634 self
.assertTrue(receivedResponse
)
635 receivedQuery
.id = expectedQuery
.id
636 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
637 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
639 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
640 self
.assertTrue(receivedQuery
)
641 self
.assertTrue(receivedResponse
)
642 receivedQuery
.id = expectedQuery
.id
643 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
644 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
646 def testWithECSDisabledViaRule(self
):
648 ECS Disable: ECS enabled in the backend, but disabled by a rule
650 name
= 'disabled.ecsrules.tests.powerdns.com.'
651 query
= dns
.message
.make_query(name
, 'A', 'IN')
652 response
= dns
.message
.make_response(query
)
653 rrset
= dns
.rrset
.from_text(name
,
658 response
.answer
.append(rrset
)
660 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
661 self
.assertTrue(receivedQuery
)
662 self
.assertTrue(receivedResponse
)
663 receivedQuery
.id = query
.id
664 self
.checkQueryNoEDNS(query
, receivedQuery
)
665 self
.checkResponseNoEDNS(response
, receivedResponse
)
667 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
668 self
.assertTrue(receivedQuery
)
669 self
.assertTrue(receivedResponse
)
670 receivedQuery
.id = query
.id
671 self
.checkQueryNoEDNS(query
, receivedQuery
)
672 self
.checkResponseNoEDNS(response
, receivedResponse
)
674 def testWithECSDisabledViaLua(self
):
676 ECS Disable: ECS enabled in the backend, but disabled via Lua
678 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
679 query
= dns
.message
.make_query(name
, 'A', 'IN')
680 response
= dns
.message
.make_response(query
)
681 rrset
= dns
.rrset
.from_text(name
,
686 response
.answer
.append(rrset
)
688 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
689 self
.assertTrue(receivedQuery
)
690 self
.assertTrue(receivedResponse
)
691 receivedQuery
.id = query
.id
692 self
.checkQueryNoEDNS(query
, receivedQuery
)
693 self
.checkResponseNoEDNS(response
, receivedResponse
)
695 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
696 self
.assertTrue(receivedQuery
)
697 self
.assertTrue(receivedResponse
)
698 receivedQuery
.id = query
.id
699 self
.checkQueryNoEDNS(query
, receivedQuery
)
700 self
.checkResponseNoEDNS(response
, receivedResponse
)
702 class TestECSOverrideSetByRuleOrLua(TestEdnsClientSubnet
):
704 dnsdist is configured to set the EDNS0 Client Subnet
705 option without overriding an existing one, but we
706 force the overriding via ECSOverrideAction() or Lua.
709 _config_template
= """
710 setECSOverride(false)
711 setECSSourcePrefixV4(24)
712 setECSSourcePrefixV6(56)
713 newServer{address="127.0.0.1:%s", useClientSubnet=true}
714 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
715 function overrideECSViaLua(dq)
716 dq.ecsOverride = true
717 return DNSAction.None, ""
719 addLuaAction("overriddenvialua.ecsrules.tests.powerdns.com.", overrideECSViaLua)
722 def testWithECSOverrideNotSet(self
):
724 ECS Override: not set via Lua or a rule
726 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
727 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
728 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
729 response
= dns
.message
.make_response(query
)
730 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
731 rrset
= dns
.rrset
.from_text(name
,
736 response
.answer
.append(rrset
)
738 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
739 self
.assertTrue(receivedQuery
)
740 self
.assertTrue(receivedResponse
)
741 receivedQuery
.id = query
.id
742 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
743 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
745 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
746 self
.assertTrue(receivedQuery
)
747 self
.assertTrue(receivedResponse
)
748 receivedQuery
.id = query
.id
749 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
750 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
752 def testWithECSOverrideSetViaRule(self
):
754 ECS Override: set with a rule
756 name
= 'overridden.ecsrules.tests.powerdns.com.'
757 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
758 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
759 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
760 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
761 response
= dns
.message
.make_response(query
)
762 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
763 rrset
= dns
.rrset
.from_text(name
,
768 response
.answer
.append(rrset
)
770 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
771 self
.assertTrue(receivedQuery
)
772 self
.assertTrue(receivedResponse
)
773 receivedQuery
.id = expectedQuery
.id
774 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
775 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
777 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
778 self
.assertTrue(receivedQuery
)
779 self
.assertTrue(receivedResponse
)
780 receivedQuery
.id = expectedQuery
.id
781 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
782 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
784 def testWithECSOverrideSetViaLua(self
):
786 ECS Override: set via Lua
788 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
789 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
790 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
791 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
792 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
793 response
= dns
.message
.make_response(query
)
794 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
795 rrset
= dns
.rrset
.from_text(name
,
800 response
.answer
.append(rrset
)
802 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
803 self
.assertTrue(receivedQuery
)
804 self
.assertTrue(receivedResponse
)
805 receivedQuery
.id = expectedQuery
.id
806 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
807 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
809 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
810 self
.assertTrue(receivedQuery
)
811 self
.assertTrue(receivedResponse
)
812 receivedQuery
.id = expectedQuery
.id
813 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
814 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
816 class TestECSPrefixLengthSetByRuleOrLua(TestEdnsClientSubnet
):
818 dnsdist is configured to set the EDNS0 Client Subnet
819 option with a prefix length of 24 for IPv4 and 56 for IPv6,
820 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
823 _config_template
= """
824 setECSOverride(false)
825 setECSSourcePrefixV4(24)
826 setECSSourcePrefixV6(56)
827 newServer{address="127.0.0.1:%s", useClientSubnet=true}
828 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
829 function overrideECSPrefixLengthViaLua(dq)
830 dq.ecsPrefixLength = 32
831 return DNSAction.None, ""
833 addLuaAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", overrideECSPrefixLengthViaLua)
836 def testWithECSPrefixLengthNotOverridden(self
):
838 ECS Prefix Length: not overridden via Lua or a rule
840 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
841 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
842 query
= dns
.message
.make_query(name
, 'A', 'IN')
843 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
844 response
= dns
.message
.make_response(query
)
845 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
846 rrset
= dns
.rrset
.from_text(name
,
851 response
.answer
.append(rrset
)
852 expectedResponse
= dns
.message
.make_response(query
)
853 expectedResponse
.answer
.append(rrset
)
855 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
856 self
.assertTrue(receivedQuery
)
857 self
.assertTrue(receivedResponse
)
858 receivedQuery
.id = expectedQuery
.id
859 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
860 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
862 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
863 self
.assertTrue(receivedQuery
)
864 self
.assertTrue(receivedResponse
)
865 receivedQuery
.id = expectedQuery
.id
866 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
867 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
869 def testWithECSPrefixLengthOverriddenViaRule(self
):
871 ECS Prefix Length: overridden with a rule
873 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
874 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
875 query
= dns
.message
.make_query(name
, 'A', 'IN')
876 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
877 response
= dns
.message
.make_response(expectedQuery
)
878 rrset
= dns
.rrset
.from_text(name
,
883 response
.answer
.append(rrset
)
884 expectedResponse
= dns
.message
.make_response(query
)
885 expectedResponse
.answer
.append(rrset
)
887 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
888 self
.assertTrue(receivedQuery
)
889 self
.assertTrue(receivedResponse
)
890 receivedQuery
.id = expectedQuery
.id
891 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
892 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
894 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
895 self
.assertTrue(receivedQuery
)
896 self
.assertTrue(receivedResponse
)
897 receivedQuery
.id = expectedQuery
.id
898 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
899 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
901 def testWithECSPrefixLengthOverriddenViaLua(self
):
903 ECS Prefix Length: overridden via Lua
905 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
906 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
907 query
= dns
.message
.make_query(name
, 'A', 'IN')
908 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
909 response
= dns
.message
.make_response(expectedQuery
)
910 rrset
= dns
.rrset
.from_text(name
,
915 response
.answer
.append(rrset
)
916 expectedResponse
= dns
.message
.make_response(query
)
917 expectedResponse
.answer
.append(rrset
)
919 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
920 self
.assertTrue(receivedQuery
)
921 self
.assertTrue(receivedResponse
)
922 receivedQuery
.id = expectedQuery
.id
923 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
924 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
926 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
927 self
.assertTrue(receivedQuery
)
928 self
.assertTrue(receivedResponse
)
929 receivedQuery
.id = expectedQuery
.id
930 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
931 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)