]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
6 from datetime
import datetime
, timedelta
8 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
15 _config_template
= """
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
20 def testWithoutEDNS(self
):
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
29 name
= 'withoutedns.ecs.tests.powerdns.com.'
30 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
33 response
= dns
.message
.make_response(expectedQuery
)
34 expectedResponse
= dns
.message
.make_response(query
)
35 rrset
= dns
.rrset
.from_text(name
,
40 response
.answer
.append(rrset
)
41 expectedResponse
.answer
.append(rrset
)
43 for method
in ("sendUDPQuery", "sendTCPQuery"):
44 sender
= getattr(self
, method
)
45 (receivedQuery
, receivedResponse
) = sender(query
, response
)
46 self
.assertTrue(receivedQuery
)
47 self
.assertTrue(receivedResponse
)
48 receivedQuery
.id = expectedQuery
.id
49 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
50 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
52 def testWithEDNSNoECS(self
):
54 ECS: Existing EDNS without ECS
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
61 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
63 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
64 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
65 response
= dns
.message
.make_response(expectedQuery
)
66 expectedResponse
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
73 expectedResponse
.answer
.append(rrset
)
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (receivedQuery
, receivedResponse
) = sender(query
, response
)
78 self
.assertTrue(receivedQuery
)
79 self
.assertTrue(receivedResponse
)
80 receivedQuery
.id = expectedQuery
.id
81 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
82 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
84 def testWithEDNSECS(self
):
86 ECS: Existing EDNS with ECS
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
94 name
= 'withednsecs.ecs.tests.powerdns.com.'
95 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
96 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
97 response
= dns
.message
.make_response(query
)
98 rrset
= dns
.rrset
.from_text(name
,
103 response
.answer
.append(rrset
)
106 for method
in ("sendUDPQuery", "sendTCPQuery"):
107 sender
= getattr(self
, method
)
108 (receivedQuery
, receivedResponse
) = sender(query
, response
)
109 self
.assertTrue(receivedQuery
)
110 self
.assertTrue(receivedResponse
)
111 receivedQuery
.id = query
.id
112 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
113 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
115 def testWithoutEDNSResponseWithECS(self
):
117 ECS: No existing EDNS (BE returning ECS)
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
126 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
128 query
= dns
.message
.make_query(name
, 'A', 'IN')
129 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
130 response
= dns
.message
.make_response(expectedQuery
)
131 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
132 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
133 expectedResponse
= dns
.message
.make_response(query
)
134 rrset
= dns
.rrset
.from_text(name
,
139 response
.answer
.append(rrset
)
140 expectedResponse
.answer
.append(rrset
)
142 for method
in ("sendUDPQuery", "sendTCPQuery"):
143 sender
= getattr(self
, method
)
144 (receivedQuery
, receivedResponse
) = sender(query
, response
)
145 self
.assertTrue(receivedQuery
)
146 self
.assertTrue(receivedResponse
)
147 receivedQuery
.id = expectedQuery
.id
148 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
149 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
151 def testWithEDNSNoECSResponseWithECS(self
):
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
162 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
164 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
165 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
166 response
= dns
.message
.make_response(expectedQuery
)
167 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
168 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
169 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
170 rrset
= dns
.rrset
.from_text(name
,
175 response
.answer
.append(rrset
)
176 expectedResponse
.answer
.append(rrset
)
178 for method
in ("sendUDPQuery", "sendTCPQuery"):
179 sender
= getattr(self
, method
)
180 (receivedQuery
, receivedResponse
) = sender(query
, response
)
181 self
.assertTrue(receivedQuery
)
182 self
.assertTrue(receivedResponse
)
183 receivedQuery
.id = expectedQuery
.id
184 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
185 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
198 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
200 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
201 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
202 response
= dns
.message
.make_response(expectedQuery
)
203 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
204 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
205 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
206 expectedResponse
= dns
.message
.make_response(query
)
207 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
208 rrset
= dns
.rrset
.from_text(name
,
213 response
.answer
.append(rrset
)
214 expectedResponse
.answer
.append(rrset
)
215 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
217 for method
in ("sendUDPQuery", "sendTCPQuery"):
218 sender
= getattr(self
, method
)
219 (receivedQuery
, receivedResponse
) = sender(query
, response
)
220 self
.assertTrue(receivedQuery
)
221 self
.assertTrue(receivedResponse
)
222 receivedQuery
.id = expectedQuery
.id
223 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
224 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
226 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
228 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
230 Send a query with EDNS but no ECS value.
231 Check that the query received by the responder
232 has a valid ECS value and that the response
233 received from dnsdist contains an EDNS pseudo-RR.
234 This time the response returned by the backend contains
235 one ECS then one Cookies option.
237 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
238 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
239 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
240 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
241 response
= dns
.message
.make_response(expectedQuery
)
242 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
243 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
244 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
245 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
246 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
247 rrset
= dns
.rrset
.from_text(name
,
252 response
.answer
.append(rrset
)
253 expectedResponse
.answer
.append(rrset
)
254 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
256 for method
in ("sendUDPQuery", "sendTCPQuery"):
257 sender
= getattr(self
, method
)
258 (receivedQuery
, receivedResponse
) = sender(query
, response
)
259 self
.assertTrue(receivedQuery
)
260 self
.assertTrue(receivedResponse
)
261 receivedQuery
.id = expectedQuery
.id
262 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
263 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
265 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
267 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
269 Send a query with EDNS but no ECS value.
270 Check that the query received by the responder
271 has a valid ECS value and that the response
272 received from dnsdist contains an EDNS pseudo-RR.
273 This time the response returned by the backend contains
274 one Cookies, one ECS then one Cookies option.
276 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
277 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
278 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
279 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
280 response
= dns
.message
.make_response(expectedQuery
)
281 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
282 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
283 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
284 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
285 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecoResponse
])
286 rrset
= dns
.rrset
.from_text(name
,
291 response
.answer
.append(rrset
)
292 expectedResponse
.answer
.append(rrset
)
294 for method
in ("sendUDPQuery", "sendTCPQuery"):
295 sender
= getattr(self
, method
)
296 (receivedQuery
, receivedResponse
) = sender(query
, response
)
297 self
.assertTrue(receivedQuery
)
298 self
.assertTrue(receivedResponse
)
299 receivedQuery
.id = expectedQuery
.id
300 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
301 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
303 class TestEdnsClientSubnetOverride(DNSDistTest
):
305 dnsdist is configured to add the EDNS0 Client Subnet
306 option, overwriting any existing value.
309 _config_template
= """
312 setECSSourcePrefixV4(24)
313 setECSSourcePrefixV6(56)
314 newServer{address="127.0.0.1:%s", useClientSubnet=true}
317 def testWithoutEDNS(self
):
319 ECS Override: No existing EDNS
321 Send a query without EDNS, check that the query
322 received by the responder has the correct ECS value
323 and that the response received from dnsdist does not
324 have an EDNS pseudo-RR.
326 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
327 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
328 query
= dns
.message
.make_query(name
, 'A', 'IN')
329 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
330 response
= dns
.message
.make_response(expectedQuery
)
331 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
332 rrset
= dns
.rrset
.from_text(name
,
337 response
.answer
.append(rrset
)
338 expectedResponse
= dns
.message
.make_response(query
)
339 expectedResponse
.answer
.append(rrset
)
341 for method
in ("sendUDPQuery", "sendTCPQuery"):
342 sender
= getattr(self
, method
)
343 (receivedQuery
, receivedResponse
) = sender(query
, response
)
344 self
.assertTrue(receivedQuery
)
345 self
.assertTrue(receivedResponse
)
346 receivedQuery
.id = expectedQuery
.id
347 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
348 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
350 def testWithEDNSNoECS(self
):
352 ECS Override: Existing EDNS without ECS
354 Send a query with EDNS but no ECS value.
355 Check that the query received by the responder
356 has a valid ECS value and that the response
357 received from dnsdist contains an EDNS pseudo-RR.
359 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
360 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
361 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
362 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
363 response
= dns
.message
.make_response(expectedQuery
)
364 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
365 rrset
= dns
.rrset
.from_text(name
,
370 response
.answer
.append(rrset
)
371 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
372 expectedResponse
.answer
.append(rrset
)
374 for method
in ("sendUDPQuery", "sendTCPQuery"):
375 sender
= getattr(self
, method
)
376 (receivedQuery
, receivedResponse
) = sender(query
, response
)
377 self
.assertTrue(receivedQuery
)
378 self
.assertTrue(receivedResponse
)
379 receivedQuery
.id = expectedQuery
.id
380 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
381 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
383 def testWithEDNSShorterInitialECS(self
):
385 ECS Override: Existing EDNS with ECS (short)
387 Send a query with EDNS and a crafted ECS value.
388 Check that the query received by the responder
389 has an overwritten ECS value (not the initial one)
390 and that the response received from dnsdist contains
392 The initial ECS value is shorter than the one it will be
395 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
396 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
397 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
398 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
399 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
400 response
= dns
.message
.make_response(query
)
401 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
402 rrset
= dns
.rrset
.from_text(name
,
407 response
.answer
.append(rrset
)
409 for method
in ("sendUDPQuery", "sendTCPQuery"):
410 sender
= getattr(self
, method
)
411 (receivedQuery
, receivedResponse
) = sender(query
, response
)
412 self
.assertTrue(receivedQuery
)
413 self
.assertTrue(receivedResponse
)
414 receivedQuery
.id = expectedQuery
.id
415 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
416 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
418 def testWithEDNSLongerInitialECS(self
):
420 ECS Override: Existing EDNS with ECS (long)
422 Send a query with EDNS and a crafted ECS value.
423 Check that the query received by the responder
424 has an overwritten ECS value (not the initial one)
425 and that the response received from dnsdist contains
427 The initial ECS value is longer than the one it will
430 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
431 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
432 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
433 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
434 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
435 response
= dns
.message
.make_response(query
)
436 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
437 rrset
= dns
.rrset
.from_text(name
,
442 response
.answer
.append(rrset
)
444 for method
in ("sendUDPQuery", "sendTCPQuery"):
445 sender
= getattr(self
, method
)
446 (receivedQuery
, receivedResponse
) = sender(query
, response
)
447 self
.assertTrue(receivedQuery
)
448 self
.assertTrue(receivedResponse
)
449 receivedQuery
.id = expectedQuery
.id
450 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
451 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
453 def testWithEDNSSameSizeInitialECS(self
):
455 ECS Override: Existing EDNS with ECS (same)
457 Send a query with EDNS and a crafted ECS value.
458 Check that the query received by the responder
459 has an overwritten ECS value (not the initial one)
460 and that the response received from dnsdist contains
462 The initial ECS value is exactly the same size as
463 the one it will replaced with.
465 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
466 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
467 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
468 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
469 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
470 response
= dns
.message
.make_response(query
)
471 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
472 rrset
= dns
.rrset
.from_text(name
,
477 response
.answer
.append(rrset
)
479 for method
in ("sendUDPQuery", "sendTCPQuery"):
480 sender
= getattr(self
, method
)
481 (receivedQuery
, receivedResponse
) = sender(query
, response
)
482 self
.assertTrue(receivedQuery
)
483 self
.assertTrue(receivedResponse
)
484 receivedQuery
.id = expectedQuery
.id
485 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
486 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
488 def testWithECSFollowedByAnother(self
):
490 ECS: Existing EDNS with ECS, followed by another record
492 Send a query with EDNS and an existing ECS value.
493 The OPT record is not the last one in the query
494 and is followed by another one.
495 Check that the query received by the responder
496 has a valid ECS value and that the response
497 received from dnsdist contains an EDNS pseudo-RR.
499 name
= 'withecs-followedbyanother.ecs.tests.powerdns.com.'
500 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
501 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
502 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
503 rrset
= dns
.rrset
.from_text(name
,
509 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,ecso
,eco
])
510 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
511 # it while parsing the message in the receiver :-/
512 query
.additional
.append(rrset
)
513 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,eco
,rewrittenEcso
])
514 expectedQuery
.additional
.append(rrset
)
516 response
= dns
.message
.make_response(expectedQuery
)
517 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
518 expectedResponse
= dns
.message
.make_response(query
)
519 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
520 response
.answer
.append(rrset
)
521 response
.additional
.append(rrset
)
522 expectedResponse
.answer
.append(rrset
)
523 expectedResponse
.additional
.append(rrset
)
525 for method
in ("sendUDPQuery", "sendTCPQuery"):
526 sender
= getattr(self
, method
)
527 (receivedQuery
, receivedResponse
) = sender(query
, response
)
528 self
.assertTrue(receivedQuery
)
529 self
.assertTrue(receivedResponse
)
530 receivedQuery
.id = expectedQuery
.id
531 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 2)
532 self
.checkResponseEDNSWithECS(expectedResponse
, receivedResponse
, 2)
534 def testWithEDNSNoECSFollowedByAnother(self
):
536 ECS: Existing EDNS without ECS, followed by another record
538 Send a query with EDNS but no ECS value.
539 The OPT record is not the last one in the query
540 and is followed by another one.
541 Check that the query received by the responder
542 has a valid ECS value and that the response
543 received from dnsdist contains an EDNS pseudo-RR.
545 name
= 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
546 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
547 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
548 rrset
= dns
.rrset
.from_text(name
,
554 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
555 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
556 # it while parsing the message in the receiver :-/
557 query
.additional
.append(rrset
)
558 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,rewrittenEcso
])
559 expectedQuery
.additional
.append(rrset
)
561 response
= dns
.message
.make_response(expectedQuery
)
562 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, rewrittenEcso
, eco
])
563 expectedResponse
= dns
.message
.make_response(query
)
564 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, eco
])
565 response
.answer
.append(rrset
)
566 response
.additional
.append(rrset
)
567 expectedResponse
.answer
.append(rrset
)
568 expectedResponse
.additional
.append(rrset
)
570 for method
in ("sendUDPQuery", "sendTCPQuery"):
571 sender
= getattr(self
, method
)
572 (receivedQuery
, receivedResponse
) = sender(query
, response
)
573 self
.assertTrue(receivedQuery
)
574 self
.assertTrue(receivedResponse
)
575 receivedQuery
.id = expectedQuery
.id
576 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
577 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, 2)
579 class TestECSDisabledByRuleOrLua(DNSDistTest
):
581 dnsdist is configured to add the EDNS0 Client Subnet
582 option, but we disable it via DisableECSAction()
586 _config_template
= """
587 setECSOverride(false)
588 setECSSourcePrefixV4(16)
589 setECSSourcePrefixV6(16)
590 newServer{address="127.0.0.1:%s", useClientSubnet=true}
591 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
592 function disableECSViaLua(dq)
594 return DNSAction.None, ""
596 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
599 def testWithECSNotDisabled(self
):
601 ECS Disable: ECS enabled in the backend
603 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
604 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
605 query
= dns
.message
.make_query(name
, 'A', 'IN')
606 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
607 response
= dns
.message
.make_response(expectedQuery
)
608 expectedResponse
= dns
.message
.make_response(query
)
609 rrset
= dns
.rrset
.from_text(name
,
614 response
.answer
.append(rrset
)
615 expectedResponse
.answer
.append(rrset
)
617 for method
in ("sendUDPQuery", "sendTCPQuery"):
618 sender
= getattr(self
, method
)
619 (receivedQuery
, receivedResponse
) = sender(query
, response
)
620 self
.assertTrue(receivedQuery
)
621 self
.assertTrue(receivedResponse
)
622 receivedQuery
.id = expectedQuery
.id
623 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
624 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
626 def testWithECSDisabledViaRule(self
):
628 ECS Disable: ECS enabled in the backend, but disabled by a rule
630 name
= 'disabled.ecsrules.tests.powerdns.com.'
631 query
= dns
.message
.make_query(name
, 'A', 'IN')
632 response
= dns
.message
.make_response(query
)
633 rrset
= dns
.rrset
.from_text(name
,
638 response
.answer
.append(rrset
)
640 for method
in ("sendUDPQuery", "sendTCPQuery"):
641 sender
= getattr(self
, method
)
642 (receivedQuery
, receivedResponse
) = sender(query
, response
)
643 self
.assertTrue(receivedQuery
)
644 self
.assertTrue(receivedResponse
)
645 receivedQuery
.id = query
.id
646 self
.checkQueryNoEDNS(query
, receivedQuery
)
647 self
.checkResponseNoEDNS(response
, receivedResponse
)
649 def testWithECSDisabledViaLua(self
):
651 ECS Disable: ECS enabled in the backend, but disabled via Lua
653 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
654 query
= dns
.message
.make_query(name
, 'A', 'IN')
655 response
= dns
.message
.make_response(query
)
656 rrset
= dns
.rrset
.from_text(name
,
661 response
.answer
.append(rrset
)
663 for method
in ("sendUDPQuery", "sendTCPQuery"):
664 sender
= getattr(self
, method
)
665 (receivedQuery
, receivedResponse
) = sender(query
, response
)
666 self
.assertTrue(receivedQuery
)
667 self
.assertTrue(receivedResponse
)
668 receivedQuery
.id = query
.id
669 self
.checkQueryNoEDNS(query
, receivedQuery
)
670 self
.checkResponseNoEDNS(response
, receivedResponse
)
672 class TestECSOverrideSetByRuleOrLua(DNSDistTest
):
674 dnsdist is configured to set the EDNS0 Client Subnet
675 option without overriding an existing one, but we
676 force the overriding via ECSOverrideAction() or Lua.
679 _config_template
= """
680 setECSOverride(false)
681 setECSSourcePrefixV4(24)
682 setECSSourcePrefixV6(56)
683 newServer{address="127.0.0.1:%s", useClientSubnet=true}
684 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
685 function overrideECSViaLua(dq)
686 dq.ecsOverride = true
687 return DNSAction.None, ""
689 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
692 def testWithECSOverrideNotSet(self
):
694 ECS Override: not set via Lua or a rule
696 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
697 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
698 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
699 response
= dns
.message
.make_response(query
)
700 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
701 rrset
= dns
.rrset
.from_text(name
,
706 response
.answer
.append(rrset
)
708 for method
in ("sendUDPQuery", "sendTCPQuery"):
709 sender
= getattr(self
, method
)
710 (receivedQuery
, receivedResponse
) = sender(query
, response
)
711 self
.assertTrue(receivedQuery
)
712 self
.assertTrue(receivedResponse
)
713 receivedQuery
.id = query
.id
714 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
715 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
717 def testWithECSOverrideSetViaRule(self
):
719 ECS Override: set with a rule
721 name
= 'overridden.ecsrules.tests.powerdns.com.'
722 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
723 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
724 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
725 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
726 response
= dns
.message
.make_response(query
)
727 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
728 rrset
= dns
.rrset
.from_text(name
,
733 response
.answer
.append(rrset
)
735 for method
in ("sendUDPQuery", "sendTCPQuery"):
736 sender
= getattr(self
, method
)
737 (receivedQuery
, receivedResponse
) = sender(query
, response
)
738 self
.assertTrue(receivedQuery
)
739 self
.assertTrue(receivedResponse
)
740 receivedQuery
.id = expectedQuery
.id
741 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
742 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
744 def testWithECSOverrideSetViaLua(self
):
746 ECS Override: set via Lua
748 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
749 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
750 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
751 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
752 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
753 response
= dns
.message
.make_response(query
)
754 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
755 rrset
= dns
.rrset
.from_text(name
,
760 response
.answer
.append(rrset
)
762 for method
in ("sendUDPQuery", "sendTCPQuery"):
763 sender
= getattr(self
, method
)
764 (receivedQuery
, receivedResponse
) = sender(query
, response
)
765 self
.assertTrue(receivedQuery
)
766 self
.assertTrue(receivedResponse
)
767 receivedQuery
.id = expectedQuery
.id
768 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
769 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
771 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest
):
773 dnsdist is configured to set the EDNS0 Client Subnet
774 option with a prefix length of 24 for IPv4 and 56 for IPv6,
775 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
778 _config_template
= """
779 setECSOverride(false)
780 setECSSourcePrefixV4(24)
781 setECSSourcePrefixV6(56)
782 newServer{address="127.0.0.1:%s", useClientSubnet=true}
783 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
784 function overrideECSPrefixLengthViaLua(dq)
785 dq.ecsPrefixLength = 32
786 return DNSAction.None, ""
788 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
791 def testWithECSPrefixLengthNotOverridden(self
):
793 ECS Prefix Length: not overridden via Lua or a rule
795 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
796 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
797 query
= dns
.message
.make_query(name
, 'A', 'IN')
798 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
799 response
= dns
.message
.make_response(query
)
800 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
801 rrset
= dns
.rrset
.from_text(name
,
806 response
.answer
.append(rrset
)
807 expectedResponse
= dns
.message
.make_response(query
)
808 expectedResponse
.answer
.append(rrset
)
810 for method
in ("sendUDPQuery", "sendTCPQuery"):
811 sender
= getattr(self
, method
)
812 (receivedQuery
, receivedResponse
) = sender(query
, response
)
813 self
.assertTrue(receivedQuery
)
814 self
.assertTrue(receivedResponse
)
815 receivedQuery
.id = expectedQuery
.id
816 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
817 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
819 def testWithECSPrefixLengthOverriddenViaRule(self
):
821 ECS Prefix Length: overridden with a rule
823 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
824 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
825 query
= dns
.message
.make_query(name
, 'A', 'IN')
826 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
827 response
= dns
.message
.make_response(expectedQuery
)
828 rrset
= dns
.rrset
.from_text(name
,
833 response
.answer
.append(rrset
)
834 expectedResponse
= dns
.message
.make_response(query
)
835 expectedResponse
.answer
.append(rrset
)
837 for method
in ("sendUDPQuery", "sendTCPQuery"):
838 sender
= getattr(self
, method
)
839 (receivedQuery
, receivedResponse
) = sender(query
, response
)
840 self
.assertTrue(receivedQuery
)
841 self
.assertTrue(receivedResponse
)
842 receivedQuery
.id = expectedQuery
.id
843 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
844 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
846 def testWithECSPrefixLengthOverriddenViaLua(self
):
848 ECS Prefix Length: overridden via Lua
850 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
851 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
852 query
= dns
.message
.make_query(name
, 'A', 'IN')
853 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
854 response
= dns
.message
.make_response(expectedQuery
)
855 rrset
= dns
.rrset
.from_text(name
,
860 response
.answer
.append(rrset
)
861 expectedResponse
= dns
.message
.make_response(query
)
862 expectedResponse
.answer
.append(rrset
)
864 for method
in ("sendUDPQuery", "sendTCPQuery"):
865 sender
= getattr(self
, method
)
866 (receivedQuery
, receivedResponse
) = sender(query
, response
)
867 self
.assertTrue(receivedQuery
)
868 self
.assertTrue(receivedResponse
)
869 receivedQuery
.id = expectedQuery
.id
870 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
871 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
873 class TestECSPrefixSetByRule(DNSDistTest
):
875 dnsdist is configured to set the EDNS0 Client Subnet
876 option for incoming queries to the actual source IP,
877 but we override it for some queries via SetECSAction().
880 _config_template
= """
881 setECSOverride(false)
882 setECSSourcePrefixV4(32)
883 setECSSourcePrefixV6(128)
884 newServer{address="127.0.0.1:%s", useClientSubnet=true}
885 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
888 def testWithRegularECS(self
):
892 name
= 'notsetecsaction.ecsrules.tests.powerdns.com.'
893 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
894 query
= dns
.message
.make_query(name
, 'A', 'IN')
895 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
896 response
= dns
.message
.make_response(query
)
897 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
898 rrset
= dns
.rrset
.from_text(name
,
903 response
.answer
.append(rrset
)
904 expectedResponse
= dns
.message
.make_response(query
)
905 expectedResponse
.answer
.append(rrset
)
907 for method
in ("sendUDPQuery", "sendTCPQuery"):
908 sender
= getattr(self
, method
)
909 (receivedQuery
, receivedResponse
) = sender(query
, response
)
910 self
.assertTrue(receivedQuery
)
911 self
.assertTrue(receivedResponse
)
912 receivedQuery
.id = expectedQuery
.id
913 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
914 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
916 def testWithECSSetByRule(self
):
918 ECS Prefix: set with SetECSAction
920 name
= 'setecsaction.ecsrules.tests.powerdns.com.'
921 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
922 query
= dns
.message
.make_query(name
, 'A', 'IN')
923 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
924 response
= dns
.message
.make_response(expectedQuery
)
925 rrset
= dns
.rrset
.from_text(name
,
930 response
.answer
.append(rrset
)
931 expectedResponse
= dns
.message
.make_response(query
)
932 expectedResponse
.answer
.append(rrset
)
934 for method
in ("sendUDPQuery", "sendTCPQuery"):
935 sender
= getattr(self
, method
)
936 (receivedQuery
, receivedResponse
) = sender(query
, response
)
937 self
.assertTrue(receivedQuery
)
938 self
.assertTrue(receivedResponse
)
939 receivedQuery
.id = expectedQuery
.id
940 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
941 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)