]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
6 from datetime
import datetime
, timedelta
8 class TestEdnsClientSubnetNoOverride(DNSDistTest
):
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
15 _config_template
= """
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
20 def testWithoutEDNS(self
):
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
29 name
= 'withoutedns.ecs.tests.powerdns.com.'
30 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
33 response
= dns
.message
.make_response(expectedQuery
)
34 expectedResponse
= dns
.message
.make_response(query
)
35 rrset
= dns
.rrset
.from_text(name
,
40 response
.answer
.append(rrset
)
41 expectedResponse
.answer
.append(rrset
)
43 for method
in ("sendUDPQuery", "sendTCPQuery"):
44 sender
= getattr(self
, method
)
45 (receivedQuery
, receivedResponse
) = sender(query
, response
)
46 self
.assertTrue(receivedQuery
)
47 self
.assertTrue(receivedResponse
)
48 receivedQuery
.id = expectedQuery
.id
49 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
50 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
52 def testWithEDNSNoECS(self
):
54 ECS: Existing EDNS without ECS
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
61 name
= 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
63 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
64 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
65 response
= dns
.message
.make_response(expectedQuery
)
66 expectedResponse
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
73 expectedResponse
.answer
.append(rrset
)
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (receivedQuery
, receivedResponse
) = sender(query
, response
)
78 self
.assertTrue(receivedQuery
)
79 self
.assertTrue(receivedResponse
)
80 receivedQuery
.id = expectedQuery
.id
81 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
82 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
84 def testWithEDNSECS(self
):
86 ECS: Existing EDNS with ECS
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
94 name
= 'withednsecs.ecs.tests.powerdns.com.'
95 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24)
96 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
97 response
= dns
.message
.make_response(query
)
98 rrset
= dns
.rrset
.from_text(name
,
103 response
.answer
.append(rrset
)
106 for method
in ("sendUDPQuery", "sendTCPQuery"):
107 sender
= getattr(self
, method
)
108 (receivedQuery
, receivedResponse
) = sender(query
, response
)
109 self
.assertTrue(receivedQuery
)
110 self
.assertTrue(receivedResponse
)
111 receivedQuery
.id = query
.id
112 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
113 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
115 def testWithoutEDNSResponseWithECS(self
):
117 ECS: No existing EDNS (BE returning ECS)
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
126 name
= 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
128 query
= dns
.message
.make_query(name
, 'A', 'IN')
129 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
130 response
= dns
.message
.make_response(expectedQuery
)
131 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
132 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
133 expectedResponse
= dns
.message
.make_response(query
)
134 rrset
= dns
.rrset
.from_text(name
,
139 response
.answer
.append(rrset
)
140 expectedResponse
.answer
.append(rrset
)
142 for method
in ("sendUDPQuery", "sendTCPQuery"):
143 sender
= getattr(self
, method
)
144 (receivedQuery
, receivedResponse
) = sender(query
, response
)
145 self
.assertTrue(receivedQuery
)
146 self
.assertTrue(receivedResponse
)
147 receivedQuery
.id = expectedQuery
.id
148 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
149 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
151 def testWithEDNSNoECSResponseWithECS(self
):
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
162 name
= 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
164 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
165 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
166 response
= dns
.message
.make_response(expectedQuery
)
167 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
168 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
169 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
170 rrset
= dns
.rrset
.from_text(name
,
175 response
.answer
.append(rrset
)
176 expectedResponse
.answer
.append(rrset
)
178 for method
in ("sendUDPQuery", "sendTCPQuery"):
179 sender
= getattr(self
, method
)
180 (receivedQuery
, receivedResponse
) = sender(query
, response
)
181 self
.assertTrue(receivedQuery
)
182 self
.assertTrue(receivedResponse
)
183 receivedQuery
.id = expectedQuery
.id
184 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
185 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self
):
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
198 name
= 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
200 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
201 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
202 response
= dns
.message
.make_response(expectedQuery
)
203 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
204 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
205 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
])
206 expectedResponse
= dns
.message
.make_response(query
)
207 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
208 rrset
= dns
.rrset
.from_text(name
,
213 response
.answer
.append(rrset
)
214 expectedResponse
.answer
.append(rrset
)
215 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
217 for method
in ("sendUDPQuery", "sendTCPQuery"):
218 sender
= getattr(self
, method
)
219 (receivedQuery
, receivedResponse
) = sender(query
, response
)
220 self
.assertTrue(receivedQuery
)
221 self
.assertTrue(receivedResponse
)
222 receivedQuery
.id = expectedQuery
.id
223 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
224 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
226 def testWithEDNSNoECSResponseWithECSThenCookies(self
):
228 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
230 Send a query with EDNS but no ECS value.
231 Check that the query received by the responder
232 has a valid ECS value and that the response
233 received from dnsdist contains an EDNS pseudo-RR.
234 This time the response returned by the backend contains
235 one ECS then one Cookies option.
237 name
= 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
238 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
239 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
240 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
241 response
= dns
.message
.make_response(expectedQuery
)
242 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
243 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
244 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
, ecoResponse
])
245 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
246 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
247 rrset
= dns
.rrset
.from_text(name
,
252 response
.answer
.append(rrset
)
253 expectedResponse
.answer
.append(rrset
)
254 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
])
256 for method
in ("sendUDPQuery", "sendTCPQuery"):
257 sender
= getattr(self
, method
)
258 (receivedQuery
, receivedResponse
) = sender(query
, response
)
259 self
.assertTrue(receivedQuery
)
260 self
.assertTrue(receivedResponse
)
261 receivedQuery
.id = expectedQuery
.id
262 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
263 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=1)
265 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self
):
267 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
269 Send a query with EDNS but no ECS value.
270 Check that the query received by the responder
271 has a valid ECS value and that the response
272 received from dnsdist contains an EDNS pseudo-RR.
273 This time the response returned by the backend contains
274 one Cookies, one ECS then one Cookies option.
276 name
= 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
277 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
278 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
279 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
280 response
= dns
.message
.make_response(expectedQuery
)
281 ecoResponse
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
282 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24, scope
=24)
283 response
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecsoResponse
, ecoResponse
])
284 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
285 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ecoResponse
, ecoResponse
])
286 rrset
= dns
.rrset
.from_text(name
,
291 response
.answer
.append(rrset
)
292 expectedResponse
.answer
.append(rrset
)
294 for method
in ("sendUDPQuery", "sendTCPQuery"):
295 sender
= getattr(self
, method
)
296 (receivedQuery
, receivedResponse
) = sender(query
, response
)
297 self
.assertTrue(receivedQuery
)
298 self
.assertTrue(receivedResponse
)
299 receivedQuery
.id = expectedQuery
.id
300 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
301 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, withCookies
=2)
303 class TestEdnsClientSubnetOverride(DNSDistTest
):
305 dnsdist is configured to add the EDNS0 Client Subnet
306 option, overwriting any existing value.
309 _config_template
= """
312 setECSSourcePrefixV4(24)
313 setECSSourcePrefixV6(56)
314 newServer{address="127.0.0.1:%s", useClientSubnet=true}
317 def testWithoutEDNS(self
):
319 ECS Override: No existing EDNS
321 Send a query without EDNS, check that the query
322 received by the responder has the correct ECS value
323 and that the response received from dnsdist does not
324 have an EDNS pseudo-RR.
326 name
= 'withoutedns.overridden.ecs.tests.powerdns.com.'
327 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
328 query
= dns
.message
.make_query(name
, 'A', 'IN')
329 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
330 response
= dns
.message
.make_response(expectedQuery
)
331 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
332 rrset
= dns
.rrset
.from_text(name
,
337 response
.answer
.append(rrset
)
338 expectedResponse
= dns
.message
.make_response(query
)
339 expectedResponse
.answer
.append(rrset
)
341 for method
in ("sendUDPQuery", "sendTCPQuery"):
342 sender
= getattr(self
, method
)
343 (receivedQuery
, receivedResponse
) = sender(query
, response
)
344 self
.assertTrue(receivedQuery
)
345 self
.assertTrue(receivedResponse
)
346 receivedQuery
.id = expectedQuery
.id
347 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
348 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
350 def testWithEDNSNoECS(self
):
352 ECS Override: Existing EDNS without ECS
354 Send a query with EDNS but no ECS value.
355 Check that the query received by the responder
356 has a valid ECS value and that the response
357 received from dnsdist contains an EDNS pseudo-RR.
359 name
= 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
360 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
361 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
362 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
363 response
= dns
.message
.make_response(expectedQuery
)
364 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
365 rrset
= dns
.rrset
.from_text(name
,
370 response
.answer
.append(rrset
)
371 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4096)
372 expectedResponse
.answer
.append(rrset
)
374 for method
in ("sendUDPQuery", "sendTCPQuery"):
375 sender
= getattr(self
, method
)
376 (receivedQuery
, receivedResponse
) = sender(query
, response
)
377 self
.assertTrue(receivedQuery
)
378 self
.assertTrue(receivedResponse
)
379 receivedQuery
.id = expectedQuery
.id
380 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
381 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
)
383 def testWithEDNSShorterInitialECS(self
):
385 ECS Override: Existing EDNS with ECS (short)
387 Send a query with EDNS and a crafted ECS value.
388 Check that the query received by the responder
389 has an overwritten ECS value (not the initial one)
390 and that the response received from dnsdist contains
392 The initial ECS value is shorter than the one it will be
395 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
396 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 8)
397 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
398 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
399 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
400 response
= dns
.message
.make_response(query
)
401 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
402 rrset
= dns
.rrset
.from_text(name
,
407 response
.answer
.append(rrset
)
409 for method
in ("sendUDPQuery", "sendTCPQuery"):
410 sender
= getattr(self
, method
)
411 (receivedQuery
, receivedResponse
) = sender(query
, response
)
412 self
.assertTrue(receivedQuery
)
413 self
.assertTrue(receivedResponse
)
414 receivedQuery
.id = expectedQuery
.id
415 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
416 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
418 def testWithEDNSLongerInitialECS(self
):
420 ECS Override: Existing EDNS with ECS (long)
422 Send a query with EDNS and a crafted ECS value.
423 Check that the query received by the responder
424 has an overwritten ECS value (not the initial one)
425 and that the response received from dnsdist contains
427 The initial ECS value is longer than the one it will
430 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
431 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
432 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
433 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
434 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
435 response
= dns
.message
.make_response(query
)
436 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
437 rrset
= dns
.rrset
.from_text(name
,
442 response
.answer
.append(rrset
)
444 for method
in ("sendUDPQuery", "sendTCPQuery"):
445 sender
= getattr(self
, method
)
446 (receivedQuery
, receivedResponse
) = sender(query
, response
)
447 self
.assertTrue(receivedQuery
)
448 self
.assertTrue(receivedResponse
)
449 receivedQuery
.id = expectedQuery
.id
450 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
451 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
453 def testWithEDNSSameSizeInitialECS(self
):
455 ECS Override: Existing EDNS with ECS (same)
457 Send a query with EDNS and a crafted ECS value.
458 Check that the query received by the responder
459 has an overwritten ECS value (not the initial one)
460 and that the response received from dnsdist contains
462 The initial ECS value is exactly the same size as
463 the one it will replaced with.
465 name
= 'withednsecs.overridden.ecs.tests.powerdns.com.'
466 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
467 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
468 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
469 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
470 response
= dns
.message
.make_response(query
)
471 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
472 rrset
= dns
.rrset
.from_text(name
,
477 response
.answer
.append(rrset
)
479 for method
in ("sendUDPQuery", "sendTCPQuery"):
480 sender
= getattr(self
, method
)
481 (receivedQuery
, receivedResponse
) = sender(query
, response
)
482 self
.assertTrue(receivedQuery
)
483 self
.assertTrue(receivedResponse
)
484 receivedQuery
.id = expectedQuery
.id
485 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
486 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
488 def testWithECSFollowedByAnother(self
):
490 ECS: Existing EDNS with ECS, followed by another record
492 Send a query with EDNS and an existing ECS value.
493 The OPT record is not the last one in the query
494 and is followed by another one.
495 Check that the query received by the responder
496 has a valid ECS value and that the response
497 received from dnsdist contains an EDNS pseudo-RR.
499 name
= 'withecs-followedbyanother.ecs.tests.powerdns.com.'
500 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
501 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
502 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
503 rrset
= dns
.rrset
.from_text(name
,
509 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,ecso
,eco
])
510 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
511 # it while parsing the message in the receiver :-/
512 query
.additional
.append(rrset
)
513 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,eco
,rewrittenEcso
])
514 expectedQuery
.additional
.append(rrset
)
516 response
= dns
.message
.make_response(expectedQuery
)
517 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
518 expectedResponse
= dns
.message
.make_response(query
)
519 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
520 response
.answer
.append(rrset
)
521 response
.additional
.append(rrset
)
522 expectedResponse
.answer
.append(rrset
)
523 expectedResponse
.additional
.append(rrset
)
525 for method
in ("sendUDPQuery", "sendTCPQuery"):
526 sender
= getattr(self
, method
)
527 (receivedQuery
, receivedResponse
) = sender(query
, response
)
528 self
.assertTrue(receivedQuery
)
529 self
.assertTrue(receivedResponse
)
530 receivedQuery
.id = expectedQuery
.id
531 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 2)
532 self
.checkResponseEDNSWithECS(expectedResponse
, receivedResponse
, 2)
534 def testWithAnswerThenECS(self
):
536 ECS: Record in answer followed by an existing EDNS with ECS
538 Send a query with a record in the answer section, EDNS and an existing ECS value.
539 Check that the query received by the responder
540 has a valid ECS value and that the response
541 received from dnsdist contains an EDNS pseudo-RR.
543 name
= 'record-in-an-withecs.ecs.tests.powerdns.com.'
544 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
545 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
546 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
547 rrset
= dns
.rrset
.from_text(name
,
553 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,ecso
,eco
])
554 query
.answer
.append(rrset
)
555 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,eco
,rewrittenEcso
])
556 expectedQuery
.answer
.append(rrset
)
558 response
= dns
.message
.make_response(expectedQuery
)
559 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
560 expectedResponse
= dns
.message
.make_response(query
)
561 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
562 response
.answer
.append(rrset
)
563 response
.additional
.append(rrset
)
564 expectedResponse
.answer
.append(rrset
)
565 expectedResponse
.additional
.append(rrset
)
567 for method
in ("sendUDPQuery", "sendTCPQuery"):
568 sender
= getattr(self
, method
)
569 (receivedQuery
, receivedResponse
) = sender(query
, response
)
570 self
.assertTrue(receivedQuery
)
571 self
.assertTrue(receivedResponse
)
572 receivedQuery
.id = expectedQuery
.id
573 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 2)
574 self
.checkResponseEDNSWithECS(expectedResponse
, receivedResponse
, 2)
576 def testWithAuthThenECS(self
):
578 ECS: Record in authority followed by an existing EDNS with ECS
580 Send a query with a record in the authority section, EDNS and an existing ECS value.
581 Check that the query received by the responder
582 has a valid ECS value and that the response
583 received from dnsdist contains an EDNS pseudo-RR.
585 name
= 'record-in-an-withecs.ecs.tests.powerdns.com.'
586 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
587 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
588 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
589 rrset
= dns
.rrset
.from_text(name
,
595 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,ecso
,eco
])
596 query
.authority
.append(rrset
)
597 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,eco
,rewrittenEcso
])
598 expectedQuery
.authority
.append(rrset
)
600 response
= dns
.message
.make_response(expectedQuery
)
601 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
602 expectedResponse
= dns
.message
.make_response(query
)
603 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, ecso
, eco
])
604 response
.answer
.append(rrset
)
605 response
.additional
.append(rrset
)
606 expectedResponse
.answer
.append(rrset
)
607 expectedResponse
.additional
.append(rrset
)
609 for method
in ("sendUDPQuery", "sendTCPQuery"):
610 sender
= getattr(self
, method
)
611 (receivedQuery
, receivedResponse
) = sender(query
, response
)
612 self
.assertTrue(receivedQuery
)
613 self
.assertTrue(receivedResponse
)
614 receivedQuery
.id = expectedQuery
.id
615 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 2)
616 self
.checkResponseEDNSWithECS(expectedResponse
, receivedResponse
, 2)
618 def testWithEDNSNoECSFollowedByAnother(self
):
620 ECS: Existing EDNS without ECS, followed by another record
622 Send a query with EDNS but no ECS value.
623 The OPT record is not the last one in the query
624 and is followed by another one.
625 Check that the query received by the responder
626 has a valid ECS value and that the response
627 received from dnsdist contains an EDNS pseudo-RR.
629 name
= 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
630 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
631 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
632 rrset
= dns
.rrset
.from_text(name
,
638 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
639 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
640 # it while parsing the message in the receiver :-/
641 query
.additional
.append(rrset
)
642 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
,rewrittenEcso
])
643 expectedQuery
.additional
.append(rrset
)
645 response
= dns
.message
.make_response(expectedQuery
)
646 response
.use_edns(edns
=True, payload
=4096, options
=[eco
, rewrittenEcso
, eco
])
647 expectedResponse
= dns
.message
.make_response(query
)
648 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[eco
, eco
])
649 response
.answer
.append(rrset
)
650 response
.additional
.append(rrset
)
651 expectedResponse
.answer
.append(rrset
)
652 expectedResponse
.additional
.append(rrset
)
654 for method
in ("sendUDPQuery", "sendTCPQuery"):
655 sender
= getattr(self
, method
)
656 (receivedQuery
, receivedResponse
) = sender(query
, response
)
657 self
.assertTrue(receivedQuery
)
658 self
.assertTrue(receivedResponse
)
659 receivedQuery
.id = expectedQuery
.id
660 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
661 self
.checkResponseEDNSWithoutECS(expectedResponse
, receivedResponse
, 2)
663 class TestECSDisabledByRuleOrLua(DNSDistTest
):
665 dnsdist is configured to add the EDNS0 Client Subnet
666 option, but we disable it via DisableECSAction()
670 _config_template
= """
671 setECSOverride(false)
672 setECSSourcePrefixV4(16)
673 setECSSourcePrefixV6(16)
674 newServer{address="127.0.0.1:%s", useClientSubnet=true}
675 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
676 function disableECSViaLua(dq)
678 return DNSAction.None, ""
680 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
683 def testWithECSNotDisabled(self
):
685 ECS Disable: ECS enabled in the backend
687 name
= 'notdisabled.ecsrules.tests.powerdns.com.'
688 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 16)
689 query
= dns
.message
.make_query(name
, 'A', 'IN')
690 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
691 response
= dns
.message
.make_response(expectedQuery
)
692 expectedResponse
= dns
.message
.make_response(query
)
693 rrset
= dns
.rrset
.from_text(name
,
698 response
.answer
.append(rrset
)
699 expectedResponse
.answer
.append(rrset
)
701 for method
in ("sendUDPQuery", "sendTCPQuery"):
702 sender
= getattr(self
, method
)
703 (receivedQuery
, receivedResponse
) = sender(query
, response
)
704 self
.assertTrue(receivedQuery
)
705 self
.assertTrue(receivedResponse
)
706 receivedQuery
.id = expectedQuery
.id
707 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
708 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
710 def testWithECSDisabledViaRule(self
):
712 ECS Disable: ECS enabled in the backend, but disabled by a rule
714 name
= 'disabled.ecsrules.tests.powerdns.com.'
715 query
= dns
.message
.make_query(name
, 'A', 'IN')
716 response
= dns
.message
.make_response(query
)
717 rrset
= dns
.rrset
.from_text(name
,
722 response
.answer
.append(rrset
)
724 for method
in ("sendUDPQuery", "sendTCPQuery"):
725 sender
= getattr(self
, method
)
726 (receivedQuery
, receivedResponse
) = sender(query
, response
)
727 self
.assertTrue(receivedQuery
)
728 self
.assertTrue(receivedResponse
)
729 receivedQuery
.id = query
.id
730 self
.checkQueryNoEDNS(query
, receivedQuery
)
731 self
.checkResponseNoEDNS(response
, receivedResponse
)
733 def testWithECSDisabledViaLua(self
):
735 ECS Disable: ECS enabled in the backend, but disabled via Lua
737 name
= 'disabledvialua.ecsrules.tests.powerdns.com.'
738 query
= dns
.message
.make_query(name
, 'A', 'IN')
739 response
= dns
.message
.make_response(query
)
740 rrset
= dns
.rrset
.from_text(name
,
745 response
.answer
.append(rrset
)
747 for method
in ("sendUDPQuery", "sendTCPQuery"):
748 sender
= getattr(self
, method
)
749 (receivedQuery
, receivedResponse
) = sender(query
, response
)
750 self
.assertTrue(receivedQuery
)
751 self
.assertTrue(receivedResponse
)
752 receivedQuery
.id = query
.id
753 self
.checkQueryNoEDNS(query
, receivedQuery
)
754 self
.checkResponseNoEDNS(response
, receivedResponse
)
756 class TestECSOverrideSetByRuleOrLua(DNSDistTest
):
758 dnsdist is configured to set the EDNS0 Client Subnet
759 option without overriding an existing one, but we
760 force the overriding via ECSOverrideAction() or Lua.
763 _config_template
= """
764 setECSOverride(false)
765 setECSSourcePrefixV4(24)
766 setECSSourcePrefixV6(56)
767 newServer{address="127.0.0.1:%s", useClientSubnet=true}
768 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
769 function overrideECSViaLua(dq)
770 dq.ecsOverride = true
771 return DNSAction.None, ""
773 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
776 def testWithECSOverrideNotSet(self
):
778 ECS Override: not set via Lua or a rule
780 name
= 'notoverridden.ecsrules.tests.powerdns.com.'
781 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
782 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
783 response
= dns
.message
.make_response(query
)
784 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
785 rrset
= dns
.rrset
.from_text(name
,
790 response
.answer
.append(rrset
)
792 for method
in ("sendUDPQuery", "sendTCPQuery"):
793 sender
= getattr(self
, method
)
794 (receivedQuery
, receivedResponse
) = sender(query
, response
)
795 self
.assertTrue(receivedQuery
)
796 self
.assertTrue(receivedResponse
)
797 receivedQuery
.id = query
.id
798 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
799 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
801 def testWithECSOverrideSetViaRule(self
):
803 ECS Override: set with a rule
805 name
= 'overridden.ecsrules.tests.powerdns.com.'
806 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
807 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
808 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
809 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
810 response
= dns
.message
.make_response(query
)
811 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
812 rrset
= dns
.rrset
.from_text(name
,
817 response
.answer
.append(rrset
)
819 for method
in ("sendUDPQuery", "sendTCPQuery"):
820 sender
= getattr(self
, method
)
821 (receivedQuery
, receivedResponse
) = sender(query
, response
)
822 self
.assertTrue(receivedQuery
)
823 self
.assertTrue(receivedResponse
)
824 receivedQuery
.id = expectedQuery
.id
825 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
826 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
828 def testWithECSOverrideSetViaLua(self
):
830 ECS Override: set via Lua
832 name
= 'overriddenvialua.ecsrules.tests.powerdns.com.'
833 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
834 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
835 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
836 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
837 response
= dns
.message
.make_response(query
)
838 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
839 rrset
= dns
.rrset
.from_text(name
,
844 response
.answer
.append(rrset
)
846 for method
in ("sendUDPQuery", "sendTCPQuery"):
847 sender
= getattr(self
, method
)
848 (receivedQuery
, receivedResponse
) = sender(query
, response
)
849 self
.assertTrue(receivedQuery
)
850 self
.assertTrue(receivedResponse
)
851 receivedQuery
.id = expectedQuery
.id
852 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
853 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
855 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest
):
857 dnsdist is configured to set the EDNS0 Client Subnet
858 option with a prefix length of 24 for IPv4 and 56 for IPv6,
859 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
862 _config_template
= """
863 setECSOverride(false)
864 setECSSourcePrefixV4(24)
865 setECSSourcePrefixV6(56)
866 newServer{address="127.0.0.1:%s", useClientSubnet=true}
867 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
868 function overrideECSPrefixLengthViaLua(dq)
869 dq.ecsPrefixLength = 32
870 return DNSAction.None, ""
872 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
875 def testWithECSPrefixLengthNotOverridden(self
):
877 ECS Prefix Length: not overridden via Lua or a rule
879 name
= 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
880 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
881 query
= dns
.message
.make_query(name
, 'A', 'IN')
882 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
883 response
= dns
.message
.make_response(query
)
884 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
885 rrset
= dns
.rrset
.from_text(name
,
890 response
.answer
.append(rrset
)
891 expectedResponse
= dns
.message
.make_response(query
)
892 expectedResponse
.answer
.append(rrset
)
894 for method
in ("sendUDPQuery", "sendTCPQuery"):
895 sender
= getattr(self
, method
)
896 (receivedQuery
, receivedResponse
) = sender(query
, response
)
897 self
.assertTrue(receivedQuery
)
898 self
.assertTrue(receivedResponse
)
899 receivedQuery
.id = expectedQuery
.id
900 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
901 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
903 def testWithECSPrefixLengthOverriddenViaRule(self
):
905 ECS Prefix Length: overridden with a rule
907 name
= 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
908 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
909 query
= dns
.message
.make_query(name
, 'A', 'IN')
910 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
911 response
= dns
.message
.make_response(expectedQuery
)
912 rrset
= dns
.rrset
.from_text(name
,
917 response
.answer
.append(rrset
)
918 expectedResponse
= dns
.message
.make_response(query
)
919 expectedResponse
.answer
.append(rrset
)
921 for method
in ("sendUDPQuery", "sendTCPQuery"):
922 sender
= getattr(self
, method
)
923 (receivedQuery
, receivedResponse
) = sender(query
, response
)
924 self
.assertTrue(receivedQuery
)
925 self
.assertTrue(receivedResponse
)
926 receivedQuery
.id = expectedQuery
.id
927 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
928 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
930 def testWithECSPrefixLengthOverriddenViaLua(self
):
932 ECS Prefix Length: overridden via Lua
934 name
= 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
935 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
936 query
= dns
.message
.make_query(name
, 'A', 'IN')
937 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
938 response
= dns
.message
.make_response(expectedQuery
)
939 rrset
= dns
.rrset
.from_text(name
,
944 response
.answer
.append(rrset
)
945 expectedResponse
= dns
.message
.make_response(query
)
946 expectedResponse
.answer
.append(rrset
)
948 for method
in ("sendUDPQuery", "sendTCPQuery"):
949 sender
= getattr(self
, method
)
950 (receivedQuery
, receivedResponse
) = sender(query
, response
)
951 self
.assertTrue(receivedQuery
)
952 self
.assertTrue(receivedResponse
)
953 receivedQuery
.id = expectedQuery
.id
954 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
955 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
957 class TestECSPrefixSetByRule(DNSDistTest
):
959 dnsdist is configured to set the EDNS0 Client Subnet
960 option for incoming queries to the actual source IP,
961 but we override it for some queries via SetECSAction().
964 _config_template
= """
965 setECSOverride(false)
966 setECSSourcePrefixV4(32)
967 setECSSourcePrefixV6(128)
968 newServer{address="127.0.0.1:%s", useClientSubnet=true}
969 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
972 def testWithRegularECS(self
):
976 name
= 'notsetecsaction.ecsrules.tests.powerdns.com.'
977 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
978 query
= dns
.message
.make_query(name
, 'A', 'IN')
979 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
980 response
= dns
.message
.make_response(query
)
981 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
982 rrset
= dns
.rrset
.from_text(name
,
987 response
.answer
.append(rrset
)
988 expectedResponse
= dns
.message
.make_response(query
)
989 expectedResponse
.answer
.append(rrset
)
991 for method
in ("sendUDPQuery", "sendTCPQuery"):
992 sender
= getattr(self
, method
)
993 (receivedQuery
, receivedResponse
) = sender(query
, response
)
994 self
.assertTrue(receivedQuery
)
995 self
.assertTrue(receivedResponse
)
996 receivedQuery
.id = expectedQuery
.id
997 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
998 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)
1000 def testWithECSSetByRule(self
):
1002 ECS Prefix: set with SetECSAction
1004 name
= 'setecsaction.ecsrules.tests.powerdns.com.'
1005 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
1006 query
= dns
.message
.make_query(name
, 'A', 'IN')
1007 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
1008 response
= dns
.message
.make_response(expectedQuery
)
1009 rrset
= dns
.rrset
.from_text(name
,
1014 response
.answer
.append(rrset
)
1015 expectedResponse
= dns
.message
.make_response(query
)
1016 expectedResponse
.answer
.append(rrset
)
1018 for method
in ("sendUDPQuery", "sendTCPQuery"):
1019 sender
= getattr(self
, method
)
1020 (receivedQuery
, receivedResponse
) = sender(query
, response
)
1021 self
.assertTrue(receivedQuery
)
1022 self
.assertTrue(receivedResponse
)
1023 receivedQuery
.id = expectedQuery
.id
1024 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
1025 self
.checkResponseNoEDNS(expectedResponse
, receivedResponse
)