]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
6 from datetime
import datetime
, timedelta
8 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
15 _config_template
= """
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
20 def testWithoutEDNS(self
):
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
29 name
= 'withoutedns.ecs.tests.powerdns.com.'
30 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
33 response
= dns
.message
.make_response(expectedQuery
)
34 expectedResponse
= dns
.message
.make_response(query
)
35 rrset
= dns
.rrset
.from_text(name
,
40 response
.answer
.append(rrset
)
41 expectedResponse
.answer
.append(rrset
)
43 for method
in ("sendUDPQuery", "sendTCPQuery"):
44 sender
= getattr(self
, method
)
45 (receivedQuery
, receivedResponse
) = sender(query
, response
)
46 self
.assertTrue(receivedQuery
)
47 self
.assertTrue(receivedResponse
)
48 receivedQuery
.id = expectedQuery
.id
49 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
50 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
52 def testWithEDNSNoECS(self
):
54 ECS: Existing EDNS without ECS
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
61 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
63 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
64 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
65 response
= dns
.message
.make_response(expectedQuery
)
66 expectedResponse
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
73 expectedResponse
.answer
.append(rrset
)
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (receivedQuery
, receivedResponse
) = sender(query
, response
)
78 self
.assertTrue(receivedQuery
)
79 self
.assertTrue(receivedResponse
)
80 receivedQuery
.id = expectedQuery
.id
81 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
82 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
84 def testWithEDNSECS(self
):
86 ECS: Existing EDNS with ECS
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
94 name
= 'withednsecs.ecs.tests.powerdns.com.'
95 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
96 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
97 response
= dns
.message
.make_response(query
)
98 rrset
= dns
.rrset
.from_text(name
,
103 response
.answer
.append(rrset
)
106 for method
in ("sendUDPQuery", "sendTCPQuery"):
107 sender
= getattr(self
, method
)
108 (receivedQuery
, receivedResponse
) = sender(query
, response
)
109 self
.assertTrue(receivedQuery
)
110 self
.assertTrue(receivedResponse
)
111 receivedQuery
.id = query
.id
112 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
113 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
115 def testWithoutEDNSResponseWithECS(self
):
117 ECS: No existing EDNS (BE returning ECS)
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
126 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
128 query
= dns
.message
.make_query(name
, 'A', 'IN')
129 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
130 response
= dns
.message
.make_response(expectedQuery
)
131 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
132 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
133 expectedResponse
= dns
.message
.make_response(query
)
134 rrset
= dns
.rrset
.from_text(name
,
139 response
.answer
.append(rrset
)
140 expectedResponse
.answer
.append(rrset
)
142 for method
in ("sendUDPQuery", "sendTCPQuery"):
143 sender
= getattr(self
, method
)
144 (receivedQuery
, receivedResponse
) = sender(query
, response
)
145 self
.assertTrue(receivedQuery
)
146 self
.assertTrue(receivedResponse
)
147 receivedQuery
.id = expectedQuery
.id
148 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
149 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
151 def testWithEDNSNoECSResponseWithECS(self
):
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
162 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
164 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
165 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
166 response
= dns
.message
.make_response(expectedQuery
)
167 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
168 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
169 expectedResponse
= dns
.message
.make_response(query
)
170 rrset
= dns
.rrset
.from_text(name
,
175 response
.answer
.append(rrset
)
176 expectedResponse
.answer
.append(rrset
)
178 for method
in ("sendUDPQuery", "sendTCPQuery"):
179 sender
= getattr(self
, method
)
180 (receivedQuery
, receivedResponse
) = sender(query
, response
)
181 self
.assertTrue(receivedQuery
)
182 self
.assertTrue(receivedResponse
)
183 receivedQuery
.id = expectedQuery
.id
184 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
185 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
198 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
200 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
201 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
202 response
= dns
.message
.make_response(expectedQuery
)
203 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
204 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
205 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
206 expectedResponse
= dns
.message
.make_response(query
)
207 rrset
= dns
.rrset
.from_text(name
,
212 response
.answer
.append(rrset
)
213 expectedResponse
.answer
.append(rrset
)
214 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
216 for method
in ("sendUDPQuery", "sendTCPQuery"):
217 sender
= getattr(self
, method
)
218 (receivedQuery
, receivedResponse
) = sender(query
, response
)
219 self
.assertTrue(receivedQuery
)
220 self
.assertTrue(receivedResponse
)
221 receivedQuery
.id = expectedQuery
.id
222 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
223 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
225 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
227 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
229 Send a query with EDNS but no ECS value.
230 Check that the query received by the responder
231 has a valid ECS value and that the response
232 received from dnsdist contains an EDNS pseudo-RR.
233 This time the response returned by the backend contains
234 one ECS then one Cookies option.
236 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
237 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
238 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
239 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
240 response
= dns
.message
.make_response(expectedQuery
)
241 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
242 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
243 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
244 expectedResponse
= dns
.message
.make_response(query
)
245 rrset
= dns
.rrset
.from_text(name
,
250 response
.answer
.append(rrset
)
251 expectedResponse
.answer
.append(rrset
)
252 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
254 for method
in ("sendUDPQuery", "sendTCPQuery"):
255 sender
= getattr(self
, method
)
256 (receivedQuery
, receivedResponse
) = sender(query
, response
)
257 self
.assertTrue(receivedQuery
)
258 self
.assertTrue(receivedResponse
)
259 receivedQuery
.id = expectedQuery
.id
260 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
261 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
263 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
265 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
267 Send a query with EDNS but no ECS value.
268 Check that the query received by the responder
269 has a valid ECS value and that the response
270 received from dnsdist contains an EDNS pseudo-RR.
271 This time the response returned by the backend contains
272 one Cookies, one ECS then one Cookies option.
274 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
275 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
276 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
277 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
278 response
= dns
.message
.make_response(expectedQuery
)
279 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
280 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
281 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
282 expectedResponse
= dns
.message
.make_response(query
)
283 rrset
= dns
.rrset
.from_text(name
,
288 response
.answer
.append(rrset
)
289 expectedResponse
.answer
.append(rrset
)
291 for method
in ("sendUDPQuery", "sendTCPQuery"):
292 sender
= getattr(self
, method
)
293 (receivedQuery
, receivedResponse
) = sender(query
, response
)
294 self
.assertTrue(receivedQuery
)
295 self
.assertTrue(receivedResponse
)
296 receivedQuery
.id = expectedQuery
.id
297 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
298 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
300 class TestEdnsClientSubnetOverride(DNSDistTest
):
302 dnsdist is configured to add the EDNS0 Client Subnet
303 option, overwriting any existing value.
306 _config_template
= """
309 setECSSourcePrefixV4(24)
310 setECSSourcePrefixV6(56)
311 newServer{address="127.0.0.1:%s", useClientSubnet=true}
314 def testWithoutEDNS(self
):
316 ECS Override: No existing EDNS
318 Send a query without EDNS, check that the query
319 received by the responder has the correct ECS value
320 and that the response received from dnsdist does not
321 have an EDNS pseudo-RR.
323 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
324 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
325 query
= dns
.message
.make_query(name
, 'A', 'IN')
326 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
327 response
= dns
.message
.make_response(expectedQuery
)
328 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
329 rrset
= dns
.rrset
.from_text(name
,
334 response
.answer
.append(rrset
)
335 expectedResponse
= dns
.message
.make_response(query
)
336 expectedResponse
.answer
.append(rrset
)
338 for method
in ("sendUDPQuery", "sendTCPQuery"):
339 sender
= getattr(self
, method
)
340 (receivedQuery
, receivedResponse
) = sender(query
, response
)
341 self
.assertTrue(receivedQuery
)
342 self
.assertTrue(receivedResponse
)
343 receivedQuery
.id = expectedQuery
.id
344 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
345 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
347 def testWithEDNSNoECS(self
):
349 ECS Override: Existing EDNS without ECS
351 Send a query with EDNS but no ECS value.
352 Check that the query received by the responder
353 has a valid ECS value and that the response
354 received from dnsdist contains an EDNS pseudo-RR.
356 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
357 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
358 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
359 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
360 response
= dns
.message
.make_response(expectedQuery
)
361 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
362 rrset
= dns
.rrset
.from_text(name
,
367 response
.answer
.append(rrset
)
368 expectedResponse
= dns
.message
.make_response(query
)
369 expectedResponse
.answer
.append(rrset
)
371 for method
in ("sendUDPQuery", "sendTCPQuery"):
372 sender
= getattr(self
, method
)
373 (receivedQuery
, receivedResponse
) = sender(query
, response
)
374 self
.assertTrue(receivedQuery
)
375 self
.assertTrue(receivedResponse
)
376 receivedQuery
.id = expectedQuery
.id
377 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
378 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
380 def testWithEDNSShorterInitialECS(self
):
382 ECS Override: Existing EDNS with ECS (short)
384 Send a query with EDNS and a crafted ECS value.
385 Check that the query received by the responder
386 has an overwritten ECS value (not the initial one)
387 and that the response received from dnsdist contains
389 The initial ECS value is shorter than the one it will be
392 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
393 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
394 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
395 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
396 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
397 response
= dns
.message
.make_response(query
)
398 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
399 rrset
= dns
.rrset
.from_text(name
,
404 response
.answer
.append(rrset
)
406 for method
in ("sendUDPQuery", "sendTCPQuery"):
407 sender
= getattr(self
, method
)
408 (receivedQuery
, receivedResponse
) = sender(query
, response
)
409 self
.assertTrue(receivedQuery
)
410 self
.assertTrue(receivedResponse
)
411 receivedQuery
.id = expectedQuery
.id
412 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
413 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
415 def testWithEDNSLongerInitialECS(self
):
417 ECS Override: Existing EDNS with ECS (long)
419 Send a query with EDNS and a crafted ECS value.
420 Check that the query received by the responder
421 has an overwritten ECS value (not the initial one)
422 and that the response received from dnsdist contains
424 The initial ECS value is longer than the one it will
427 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
428 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
429 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
430 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
431 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
432 response
= dns
.message
.make_response(query
)
433 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
434 rrset
= dns
.rrset
.from_text(name
,
439 response
.answer
.append(rrset
)
441 for method
in ("sendUDPQuery", "sendTCPQuery"):
442 sender
= getattr(self
, method
)
443 (receivedQuery
, receivedResponse
) = sender(query
, response
)
444 self
.assertTrue(receivedQuery
)
445 self
.assertTrue(receivedResponse
)
446 receivedQuery
.id = expectedQuery
.id
447 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
448 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
450 def testWithEDNSSameSizeInitialECS(self
):
452 ECS Override: Existing EDNS with ECS (same)
454 Send a query with EDNS and a crafted ECS value.
455 Check that the query received by the responder
456 has an overwritten ECS value (not the initial one)
457 and that the response received from dnsdist contains
459 The initial ECS value is exactly the same size as
460 the one it will replaced with.
462 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
463 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
464 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
465 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
466 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
467 response
= dns
.message
.make_response(query
)
468 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
469 rrset
= dns
.rrset
.from_text(name
,
474 response
.answer
.append(rrset
)
476 for method
in ("sendUDPQuery", "sendTCPQuery"):
477 sender
= getattr(self
, method
)
478 (receivedQuery
, receivedResponse
) = sender(query
, response
)
479 self
.assertTrue(receivedQuery
)
480 self
.assertTrue(receivedResponse
)
481 receivedQuery
.id = expectedQuery
.id
482 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
483 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
485 class TestECSDisabledByRuleOrLua(DNSDistTest
):
487 dnsdist is configured to add the EDNS0 Client Subnet
488 option, but we disable it via DisableECSAction()
492 _config_template
= """
493 setECSOverride(false)
494 setECSSourcePrefixV4(16)
495 setECSSourcePrefixV6(16)
496 newServer{address="127.0.0.1:%s", useClientSubnet=true}
497 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
498 function disableECSViaLua(dq)
500 return DNSAction.None, ""
502 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
505 def testWithECSNotDisabled(self
):
507 ECS Disable: ECS enabled in the backend
509 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
510 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
511 query
= dns
.message
.make_query(name
, 'A', 'IN')
512 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
513 response
= dns
.message
.make_response(expectedQuery
)
514 expectedResponse
= dns
.message
.make_response(query
)
515 rrset
= dns
.rrset
.from_text(name
,
520 response
.answer
.append(rrset
)
521 expectedResponse
.answer
.append(rrset
)
523 for method
in ("sendUDPQuery", "sendTCPQuery"):
524 sender
= getattr(self
, method
)
525 (receivedQuery
, receivedResponse
) = sender(query
, response
)
526 self
.assertTrue(receivedQuery
)
527 self
.assertTrue(receivedResponse
)
528 receivedQuery
.id = expectedQuery
.id
529 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
530 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
532 def testWithECSDisabledViaRule(self
):
534 ECS Disable: ECS enabled in the backend, but disabled by a rule
536 name
= 'disabled.ecsrules.tests.powerdns.com.'
537 query
= dns
.message
.make_query(name
, 'A', 'IN')
538 response
= dns
.message
.make_response(query
)
539 rrset
= dns
.rrset
.from_text(name
,
544 response
.answer
.append(rrset
)
546 for method
in ("sendUDPQuery", "sendTCPQuery"):
547 sender
= getattr(self
, method
)
548 (receivedQuery
, receivedResponse
) = sender(query
, response
)
549 self
.assertTrue(receivedQuery
)
550 self
.assertTrue(receivedResponse
)
551 receivedQuery
.id = query
.id
552 self
.checkQueryNoEDNS(query
, receivedQuery
)
553 self
.checkResponseNoEDNS(response
, receivedResponse
)
555 def testWithECSDisabledViaLua(self
):
557 ECS Disable: ECS enabled in the backend, but disabled via Lua
559 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
560 query
= dns
.message
.make_query(name
, 'A', 'IN')
561 response
= dns
.message
.make_response(query
)
562 rrset
= dns
.rrset
.from_text(name
,
567 response
.answer
.append(rrset
)
569 for method
in ("sendUDPQuery", "sendTCPQuery"):
570 sender
= getattr(self
, method
)
571 (receivedQuery
, receivedResponse
) = sender(query
, response
)
572 self
.assertTrue(receivedQuery
)
573 self
.assertTrue(receivedResponse
)
574 receivedQuery
.id = query
.id
575 self
.checkQueryNoEDNS(query
, receivedQuery
)
576 self
.checkResponseNoEDNS(response
, receivedResponse
)
578 class TestECSOverrideSetByRuleOrLua(DNSDistTest
):
580 dnsdist is configured to set the EDNS0 Client Subnet
581 option without overriding an existing one, but we
582 force the overriding via ECSOverrideAction() or Lua.
585 _config_template
= """
586 setECSOverride(false)
587 setECSSourcePrefixV4(24)
588 setECSSourcePrefixV6(56)
589 newServer{address="127.0.0.1:%s", useClientSubnet=true}
590 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
591 function overrideECSViaLua(dq)
592 dq.ecsOverride = true
593 return DNSAction.None, ""
595 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
598 def testWithECSOverrideNotSet(self
):
600 ECS Override: not set via Lua or a rule
602 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
603 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
604 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
605 response
= dns
.message
.make_response(query
)
606 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
607 rrset
= dns
.rrset
.from_text(name
,
612 response
.answer
.append(rrset
)
614 for method
in ("sendUDPQuery", "sendTCPQuery"):
615 sender
= getattr(self
, method
)
616 (receivedQuery
, receivedResponse
) = sender(query
, response
)
617 self
.assertTrue(receivedQuery
)
618 self
.assertTrue(receivedResponse
)
619 receivedQuery
.id = query
.id
620 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
621 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
623 def testWithECSOverrideSetViaRule(self
):
625 ECS Override: set with a rule
627 name
= 'overridden.ecsrules.tests.powerdns.com.'
628 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
629 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
630 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
631 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
632 response
= dns
.message
.make_response(query
)
633 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
634 rrset
= dns
.rrset
.from_text(name
,
639 response
.answer
.append(rrset
)
641 for method
in ("sendUDPQuery", "sendTCPQuery"):
642 sender
= getattr(self
, method
)
643 (receivedQuery
, receivedResponse
) = sender(query
, response
)
644 self
.assertTrue(receivedQuery
)
645 self
.assertTrue(receivedResponse
)
646 receivedQuery
.id = expectedQuery
.id
647 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
648 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
650 def testWithECSOverrideSetViaLua(self
):
652 ECS Override: set via Lua
654 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
655 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
656 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
657 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
658 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
659 response
= dns
.message
.make_response(query
)
660 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
661 rrset
= dns
.rrset
.from_text(name
,
666 response
.answer
.append(rrset
)
668 for method
in ("sendUDPQuery", "sendTCPQuery"):
669 sender
= getattr(self
, method
)
670 (receivedQuery
, receivedResponse
) = sender(query
, response
)
671 self
.assertTrue(receivedQuery
)
672 self
.assertTrue(receivedResponse
)
673 receivedQuery
.id = expectedQuery
.id
674 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
675 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
677 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest
):
679 dnsdist is configured to set the EDNS0 Client Subnet
680 option with a prefix length of 24 for IPv4 and 56 for IPv6,
681 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
684 _config_template
= """
685 setECSOverride(false)
686 setECSSourcePrefixV4(24)
687 setECSSourcePrefixV6(56)
688 newServer{address="127.0.0.1:%s", useClientSubnet=true}
689 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
690 function overrideECSPrefixLengthViaLua(dq)
691 dq.ecsPrefixLength = 32
692 return DNSAction.None, ""
694 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
697 def testWithECSPrefixLengthNotOverridden(self
):
699 ECS Prefix Length: not overridden via Lua or a rule
701 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
702 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
703 query
= dns
.message
.make_query(name
, 'A', 'IN')
704 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
705 response
= dns
.message
.make_response(query
)
706 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
707 rrset
= dns
.rrset
.from_text(name
,
712 response
.answer
.append(rrset
)
713 expectedResponse
= dns
.message
.make_response(query
)
714 expectedResponse
.answer
.append(rrset
)
716 for method
in ("sendUDPQuery", "sendTCPQuery"):
717 sender
= getattr(self
, method
)
718 (receivedQuery
, receivedResponse
) = sender(query
, response
)
719 self
.assertTrue(receivedQuery
)
720 self
.assertTrue(receivedResponse
)
721 receivedQuery
.id = expectedQuery
.id
722 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
723 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
725 def testWithECSPrefixLengthOverriddenViaRule(self
):
727 ECS Prefix Length: overridden with a rule
729 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
730 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
731 query
= dns
.message
.make_query(name
, 'A', 'IN')
732 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
733 response
= dns
.message
.make_response(expectedQuery
)
734 rrset
= dns
.rrset
.from_text(name
,
739 response
.answer
.append(rrset
)
740 expectedResponse
= dns
.message
.make_response(query
)
741 expectedResponse
.answer
.append(rrset
)
743 for method
in ("sendUDPQuery", "sendTCPQuery"):
744 sender
= getattr(self
, method
)
745 (receivedQuery
, receivedResponse
) = sender(query
, response
)
746 self
.assertTrue(receivedQuery
)
747 self
.assertTrue(receivedResponse
)
748 receivedQuery
.id = expectedQuery
.id
749 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
750 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
752 def testWithECSPrefixLengthOverriddenViaLua(self
):
754 ECS Prefix Length: overridden via Lua
756 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
757 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
758 query
= dns
.message
.make_query(name
, 'A', 'IN')
759 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
760 response
= dns
.message
.make_response(expectedQuery
)
761 rrset
= dns
.rrset
.from_text(name
,
766 response
.answer
.append(rrset
)
767 expectedResponse
= dns
.message
.make_response(query
)
768 expectedResponse
.answer
.append(rrset
)
770 for method
in ("sendUDPQuery", "sendTCPQuery"):
771 sender
= getattr(self
, method
)
772 (receivedQuery
, receivedResponse
) = sender(query
, response
)
773 self
.assertTrue(receivedQuery
)
774 self
.assertTrue(receivedResponse
)
775 receivedQuery
.id = expectedQuery
.id
776 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
777 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
779 class TestECSPrefixSetByRule(DNSDistTest
):
781 dnsdist is configured to set the EDNS0 Client Subnet
782 option for incoming queries to the actual source IP,
783 but we override it for some queries via SetECSAction().
786 _config_template
= """
787 setECSOverride(false)
788 setECSSourcePrefixV4(32)
789 setECSSourcePrefixV6(128)
790 newServer{address="127.0.0.1:%s", useClientSubnet=true}
791 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
794 def testWithRegularECS(self
):
798 name
= 'notsetecsaction.ecsrules.tests.powerdns.com.'
799 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
800 query
= dns
.message
.make_query(name
, 'A', 'IN')
801 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
802 response
= dns
.message
.make_response(query
)
803 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
804 rrset
= dns
.rrset
.from_text(name
,
809 response
.answer
.append(rrset
)
810 expectedResponse
= dns
.message
.make_response(query
)
811 expectedResponse
.answer
.append(rrset
)
813 for method
in ("sendUDPQuery", "sendTCPQuery"):
814 sender
= getattr(self
, method
)
815 (receivedQuery
, receivedResponse
) = sender(query
, response
)
816 self
.assertTrue(receivedQuery
)
817 self
.assertTrue(receivedResponse
)
818 receivedQuery
.id = expectedQuery
.id
819 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
820 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
822 def testWithECSSetByRule(self
):
824 ECS Prefix: set with SetECSAction
826 name
= 'setecsaction.ecsrules.tests.powerdns.com.'
827 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
828 query
= dns
.message
.make_query(name
, 'A', 'IN')
829 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
830 response
= dns
.message
.make_response(expectedQuery
)
831 rrset
= dns
.rrset
.from_text(name
,
836 response
.answer
.append(rrset
)
837 expectedResponse
= dns
.message
.make_response(query
)
838 expectedResponse
.answer
.append(rrset
)
840 for method
in ("sendUDPQuery", "sendTCPQuery"):
841 sender
= getattr(self
, method
)
842 (receivedQuery
, receivedResponse
) = sender(query
, response
)
843 self
.assertTrue(receivedQuery
)
844 self
.assertTrue(receivedResponse
)
845 receivedQuery
.id = expectedQuery
.id
846 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
847 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)