]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #8945 from rgacogne/ddist-x-forwarded-for
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6 from datetime import datetime, timedelta
7
8 class TestEdnsClientSubnetNoOverride(DNSDistTest):
9 """
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
12 original query.
13 """
14
15 _config_template = """
16 truncateTC(true)
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
20 def testWithoutEDNS(self):
21 """
22 ECS: No existing EDNS
23
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
43 for method in ("sendUDPQuery", "sendTCPQuery"):
44 sender = getattr(self, method)
45 (receivedQuery, receivedResponse) = sender(query, response)
46 self.assertTrue(receivedQuery)
47 self.assertTrue(receivedResponse)
48 receivedQuery.id = expectedQuery.id
49 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
50 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
51
52 def testWithEDNSNoECS(self):
53 """
54 ECS: Existing EDNS without ECS
55
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
60 """
61 name = 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
63 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
64 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
65 response = dns.message.make_response(expectedQuery)
66 expectedResponse = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73 expectedResponse.answer.append(rrset)
74
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = expectedQuery.id
81 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
82 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
83
84 def testWithEDNSECS(self):
85 """
86 ECS: Existing EDNS with ECS
87
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
92 an EDNS pseudo-RR.
93 """
94 name = 'withednsecs.ecs.tests.powerdns.com.'
95 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
96 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
97 response = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '127.0.0.1')
103 response.answer.append(rrset)
104
105
106 for method in ("sendUDPQuery", "sendTCPQuery"):
107 sender = getattr(self, method)
108 (receivedQuery, receivedResponse) = sender(query, response)
109 self.assertTrue(receivedQuery)
110 self.assertTrue(receivedResponse)
111 receivedQuery.id = query.id
112 self.checkQueryEDNSWithECS(query, receivedQuery)
113 self.checkResponseEDNSWithoutECS(response, receivedResponse)
114
115 def testWithoutEDNSResponseWithECS(self):
116 """
117 ECS: No existing EDNS (BE returning ECS)
118
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
125 """
126 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
128 query = dns.message.make_query(name, 'A', 'IN')
129 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
130 response = dns.message.make_response(expectedQuery)
131 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
132 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
133 expectedResponse = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 3600,
136 dns.rdataclass.IN,
137 dns.rdatatype.A,
138 '127.0.0.1')
139 response.answer.append(rrset)
140 expectedResponse.answer.append(rrset)
141
142 for method in ("sendUDPQuery", "sendTCPQuery"):
143 sender = getattr(self, method)
144 (receivedQuery, receivedResponse) = sender(query, response)
145 self.assertTrue(receivedQuery)
146 self.assertTrue(receivedResponse)
147 receivedQuery.id = expectedQuery.id
148 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
149 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
150
151 def testWithEDNSNoECSResponseWithECS(self):
152 """
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
154
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
161 """
162 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
165 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
166 response = dns.message.make_response(expectedQuery)
167 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
168 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
169 expectedResponse = dns.message.make_response(query, our_payload=4096)
170 rrset = dns.rrset.from_text(name,
171 3600,
172 dns.rdataclass.IN,
173 dns.rdatatype.A,
174 '127.0.0.1')
175 response.answer.append(rrset)
176 expectedResponse.answer.append(rrset)
177
178 for method in ("sendUDPQuery", "sendTCPQuery"):
179 sender = getattr(self, method)
180 (receivedQuery, receivedResponse) = sender(query, response)
181 self.assertTrue(receivedQuery)
182 self.assertTrue(receivedResponse)
183 receivedQuery.id = expectedQuery.id
184 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
185 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
186
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
188 """
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
190
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
197 """
198 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
201 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
202 response = dns.message.make_response(expectedQuery)
203 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
204 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
205 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
206 expectedResponse = dns.message.make_response(query)
207 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
208 rrset = dns.rrset.from_text(name,
209 3600,
210 dns.rdataclass.IN,
211 dns.rdatatype.A,
212 '127.0.0.1')
213 response.answer.append(rrset)
214 expectedResponse.answer.append(rrset)
215 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
216
217 for method in ("sendUDPQuery", "sendTCPQuery"):
218 sender = getattr(self, method)
219 (receivedQuery, receivedResponse) = sender(query, response)
220 self.assertTrue(receivedQuery)
221 self.assertTrue(receivedResponse)
222 receivedQuery.id = expectedQuery.id
223 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
224 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
225
226 def testWithEDNSNoECSResponseWithECSThenCookies(self):
227 """
228 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
229
230 Send a query with EDNS but no ECS value.
231 Check that the query received by the responder
232 has a valid ECS value and that the response
233 received from dnsdist contains an EDNS pseudo-RR.
234 This time the response returned by the backend contains
235 one ECS then one Cookies option.
236 """
237 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
238 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
239 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
240 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
241 response = dns.message.make_response(expectedQuery)
242 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
243 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
244 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
245 expectedResponse = dns.message.make_response(query, our_payload=4096)
246 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
247 rrset = dns.rrset.from_text(name,
248 3600,
249 dns.rdataclass.IN,
250 dns.rdatatype.A,
251 '127.0.0.1')
252 response.answer.append(rrset)
253 expectedResponse.answer.append(rrset)
254 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
255
256 for method in ("sendUDPQuery", "sendTCPQuery"):
257 sender = getattr(self, method)
258 (receivedQuery, receivedResponse) = sender(query, response)
259 self.assertTrue(receivedQuery)
260 self.assertTrue(receivedResponse)
261 receivedQuery.id = expectedQuery.id
262 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
263 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
264
265 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
266 """
267 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
268
269 Send a query with EDNS but no ECS value.
270 Check that the query received by the responder
271 has a valid ECS value and that the response
272 received from dnsdist contains an EDNS pseudo-RR.
273 This time the response returned by the backend contains
274 one Cookies, one ECS then one Cookies option.
275 """
276 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
277 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
278 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
279 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
280 response = dns.message.make_response(expectedQuery)
281 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
282 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
283 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
284 expectedResponse = dns.message.make_response(query, our_payload=4096)
285 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse, ecoResponse])
286 rrset = dns.rrset.from_text(name,
287 3600,
288 dns.rdataclass.IN,
289 dns.rdatatype.A,
290 '127.0.0.1')
291 response.answer.append(rrset)
292 expectedResponse.answer.append(rrset)
293
294 for method in ("sendUDPQuery", "sendTCPQuery"):
295 sender = getattr(self, method)
296 (receivedQuery, receivedResponse) = sender(query, response)
297 self.assertTrue(receivedQuery)
298 self.assertTrue(receivedResponse)
299 receivedQuery.id = expectedQuery.id
300 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
301 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
302
303 class TestEdnsClientSubnetOverride(DNSDistTest):
304 """
305 dnsdist is configured to add the EDNS0 Client Subnet
306 option, overwriting any existing value.
307 """
308
309 _config_template = """
310 truncateTC(true)
311 setECSOverride(true)
312 setECSSourcePrefixV4(24)
313 setECSSourcePrefixV6(56)
314 newServer{address="127.0.0.1:%s", useClientSubnet=true}
315 """
316
317 def testWithoutEDNS(self):
318 """
319 ECS Override: No existing EDNS
320
321 Send a query without EDNS, check that the query
322 received by the responder has the correct ECS value
323 and that the response received from dnsdist does not
324 have an EDNS pseudo-RR.
325 """
326 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
327 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
328 query = dns.message.make_query(name, 'A', 'IN')
329 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
330 response = dns.message.make_response(expectedQuery)
331 response.use_edns(edns=True, payload=4096, options=[ecso])
332 rrset = dns.rrset.from_text(name,
333 3600,
334 dns.rdataclass.IN,
335 dns.rdatatype.A,
336 '127.0.0.1')
337 response.answer.append(rrset)
338 expectedResponse = dns.message.make_response(query)
339 expectedResponse.answer.append(rrset)
340
341 for method in ("sendUDPQuery", "sendTCPQuery"):
342 sender = getattr(self, method)
343 (receivedQuery, receivedResponse) = sender(query, response)
344 self.assertTrue(receivedQuery)
345 self.assertTrue(receivedResponse)
346 receivedQuery.id = expectedQuery.id
347 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
348 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
349
350 def testWithEDNSNoECS(self):
351 """
352 ECS Override: Existing EDNS without ECS
353
354 Send a query with EDNS but no ECS value.
355 Check that the query received by the responder
356 has a valid ECS value and that the response
357 received from dnsdist contains an EDNS pseudo-RR.
358 """
359 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
360 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
361 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
362 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
363 response = dns.message.make_response(expectedQuery)
364 response.use_edns(edns=True, payload=4096, options=[ecso])
365 rrset = dns.rrset.from_text(name,
366 3600,
367 dns.rdataclass.IN,
368 dns.rdatatype.A,
369 '127.0.0.1')
370 response.answer.append(rrset)
371 expectedResponse = dns.message.make_response(query, our_payload=4096)
372 expectedResponse.answer.append(rrset)
373
374 for method in ("sendUDPQuery", "sendTCPQuery"):
375 sender = getattr(self, method)
376 (receivedQuery, receivedResponse) = sender(query, response)
377 self.assertTrue(receivedQuery)
378 self.assertTrue(receivedResponse)
379 receivedQuery.id = expectedQuery.id
380 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
381 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
382
383 def testWithEDNSShorterInitialECS(self):
384 """
385 ECS Override: Existing EDNS with ECS (short)
386
387 Send a query with EDNS and a crafted ECS value.
388 Check that the query received by the responder
389 has an overwritten ECS value (not the initial one)
390 and that the response received from dnsdist contains
391 an EDNS pseudo-RR.
392 The initial ECS value is shorter than the one it will be
393 replaced with.
394 """
395 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
396 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
397 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
398 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
399 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
400 response = dns.message.make_response(query)
401 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
402 rrset = dns.rrset.from_text(name,
403 3600,
404 dns.rdataclass.IN,
405 dns.rdatatype.A,
406 '127.0.0.1')
407 response.answer.append(rrset)
408
409 for method in ("sendUDPQuery", "sendTCPQuery"):
410 sender = getattr(self, method)
411 (receivedQuery, receivedResponse) = sender(query, response)
412 self.assertTrue(receivedQuery)
413 self.assertTrue(receivedResponse)
414 receivedQuery.id = expectedQuery.id
415 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
416 self.checkResponseEDNSWithECS(response, receivedResponse)
417
418 def testWithEDNSLongerInitialECS(self):
419 """
420 ECS Override: Existing EDNS with ECS (long)
421
422 Send a query with EDNS and a crafted ECS value.
423 Check that the query received by the responder
424 has an overwritten ECS value (not the initial one)
425 and that the response received from dnsdist contains
426 an EDNS pseudo-RR.
427 The initial ECS value is longer than the one it will
428 replaced with.
429 """
430 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
431 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
432 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
433 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
434 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
435 response = dns.message.make_response(query)
436 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
437 rrset = dns.rrset.from_text(name,
438 3600,
439 dns.rdataclass.IN,
440 dns.rdatatype.A,
441 '127.0.0.1')
442 response.answer.append(rrset)
443
444 for method in ("sendUDPQuery", "sendTCPQuery"):
445 sender = getattr(self, method)
446 (receivedQuery, receivedResponse) = sender(query, response)
447 self.assertTrue(receivedQuery)
448 self.assertTrue(receivedResponse)
449 receivedQuery.id = expectedQuery.id
450 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
451 self.checkResponseEDNSWithECS(response, receivedResponse)
452
453 def testWithEDNSSameSizeInitialECS(self):
454 """
455 ECS Override: Existing EDNS with ECS (same)
456
457 Send a query with EDNS and a crafted ECS value.
458 Check that the query received by the responder
459 has an overwritten ECS value (not the initial one)
460 and that the response received from dnsdist contains
461 an EDNS pseudo-RR.
462 The initial ECS value is exactly the same size as
463 the one it will replaced with.
464 """
465 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
466 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
467 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
468 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
469 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
470 response = dns.message.make_response(query)
471 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
472 rrset = dns.rrset.from_text(name,
473 3600,
474 dns.rdataclass.IN,
475 dns.rdatatype.A,
476 '127.0.0.1')
477 response.answer.append(rrset)
478
479 for method in ("sendUDPQuery", "sendTCPQuery"):
480 sender = getattr(self, method)
481 (receivedQuery, receivedResponse) = sender(query, response)
482 self.assertTrue(receivedQuery)
483 self.assertTrue(receivedResponse)
484 receivedQuery.id = expectedQuery.id
485 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
486 self.checkResponseEDNSWithECS(response, receivedResponse)
487
488 def testWithECSFollowedByAnother(self):
489 """
490 ECS: Existing EDNS with ECS, followed by another record
491
492 Send a query with EDNS and an existing ECS value.
493 The OPT record is not the last one in the query
494 and is followed by another one.
495 Check that the query received by the responder
496 has a valid ECS value and that the response
497 received from dnsdist contains an EDNS pseudo-RR.
498 """
499 name = 'withecs-followedbyanother.ecs.tests.powerdns.com.'
500 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
501 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
502 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
503 rrset = dns.rrset.from_text(name,
504 3600,
505 dns.rdataclass.IN,
506 dns.rdatatype.A,
507 '127.0.0.1')
508
509 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
510 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
511 # it while parsing the message in the receiver :-/
512 query.additional.append(rrset)
513 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
514 expectedQuery.additional.append(rrset)
515
516 response = dns.message.make_response(expectedQuery)
517 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
518 expectedResponse = dns.message.make_response(query)
519 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
520 response.answer.append(rrset)
521 response.additional.append(rrset)
522 expectedResponse.answer.append(rrset)
523 expectedResponse.additional.append(rrset)
524
525 for method in ("sendUDPQuery", "sendTCPQuery"):
526 sender = getattr(self, method)
527 (receivedQuery, receivedResponse) = sender(query, response)
528 self.assertTrue(receivedQuery)
529 self.assertTrue(receivedResponse)
530 receivedQuery.id = expectedQuery.id
531 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
532 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
533
534 def testWithAnswerThenECS(self):
535 """
536 ECS: Record in answer followed by an existing EDNS with ECS
537
538 Send a query with a record in the answer section, EDNS and an existing ECS value.
539 Check that the query received by the responder
540 has a valid ECS value and that the response
541 received from dnsdist contains an EDNS pseudo-RR.
542 """
543 name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
544 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
545 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
546 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
547 rrset = dns.rrset.from_text(name,
548 3600,
549 dns.rdataclass.IN,
550 dns.rdatatype.A,
551 '127.0.0.1')
552
553 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
554 query.answer.append(rrset)
555 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
556 expectedQuery.answer.append(rrset)
557
558 response = dns.message.make_response(expectedQuery)
559 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
560 expectedResponse = dns.message.make_response(query)
561 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
562 response.answer.append(rrset)
563 response.additional.append(rrset)
564 expectedResponse.answer.append(rrset)
565 expectedResponse.additional.append(rrset)
566
567 for method in ("sendUDPQuery", "sendTCPQuery"):
568 sender = getattr(self, method)
569 (receivedQuery, receivedResponse) = sender(query, response)
570 self.assertTrue(receivedQuery)
571 self.assertTrue(receivedResponse)
572 receivedQuery.id = expectedQuery.id
573 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
574 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
575
576 def testWithAuthThenECS(self):
577 """
578 ECS: Record in authority followed by an existing EDNS with ECS
579
580 Send a query with a record in the authority section, EDNS and an existing ECS value.
581 Check that the query received by the responder
582 has a valid ECS value and that the response
583 received from dnsdist contains an EDNS pseudo-RR.
584 """
585 name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
586 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
587 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
588 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
589 rrset = dns.rrset.from_text(name,
590 3600,
591 dns.rdataclass.IN,
592 dns.rdatatype.A,
593 '127.0.0.1')
594
595 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
596 query.authority.append(rrset)
597 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
598 expectedQuery.authority.append(rrset)
599
600 response = dns.message.make_response(expectedQuery)
601 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
602 expectedResponse = dns.message.make_response(query)
603 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
604 response.answer.append(rrset)
605 response.additional.append(rrset)
606 expectedResponse.answer.append(rrset)
607 expectedResponse.additional.append(rrset)
608
609 for method in ("sendUDPQuery", "sendTCPQuery"):
610 sender = getattr(self, method)
611 (receivedQuery, receivedResponse) = sender(query, response)
612 self.assertTrue(receivedQuery)
613 self.assertTrue(receivedResponse)
614 receivedQuery.id = expectedQuery.id
615 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
616 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
617
618 def testWithEDNSNoECSFollowedByAnother(self):
619 """
620 ECS: Existing EDNS without ECS, followed by another record
621
622 Send a query with EDNS but no ECS value.
623 The OPT record is not the last one in the query
624 and is followed by another one.
625 Check that the query received by the responder
626 has a valid ECS value and that the response
627 received from dnsdist contains an EDNS pseudo-RR.
628 """
629 name = 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
630 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
631 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
632 rrset = dns.rrset.from_text(name,
633 3600,
634 dns.rdataclass.IN,
635 dns.rdatatype.A,
636 '127.0.0.1')
637
638 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
639 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
640 # it while parsing the message in the receiver :-/
641 query.additional.append(rrset)
642 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,rewrittenEcso])
643 expectedQuery.additional.append(rrset)
644
645 response = dns.message.make_response(expectedQuery)
646 response.use_edns(edns=True, payload=4096, options=[eco, rewrittenEcso, eco])
647 expectedResponse = dns.message.make_response(query)
648 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, eco])
649 response.answer.append(rrset)
650 response.additional.append(rrset)
651 expectedResponse.answer.append(rrset)
652 expectedResponse.additional.append(rrset)
653
654 for method in ("sendUDPQuery", "sendTCPQuery"):
655 sender = getattr(self, method)
656 (receivedQuery, receivedResponse) = sender(query, response)
657 self.assertTrue(receivedQuery)
658 self.assertTrue(receivedResponse)
659 receivedQuery.id = expectedQuery.id
660 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
661 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, 2)
662
663 class TestECSDisabledByRuleOrLua(DNSDistTest):
664 """
665 dnsdist is configured to add the EDNS0 Client Subnet
666 option, but we disable it via DisableECSAction()
667 or Lua.
668 """
669
670 _config_template = """
671 setECSOverride(false)
672 setECSSourcePrefixV4(16)
673 setECSSourcePrefixV6(16)
674 newServer{address="127.0.0.1:%s", useClientSubnet=true}
675 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
676 function disableECSViaLua(dq)
677 dq.useECS = false
678 return DNSAction.None, ""
679 end
680 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
681 """
682
683 def testWithECSNotDisabled(self):
684 """
685 ECS Disable: ECS enabled in the backend
686 """
687 name = 'notdisabled.ecsrules.tests.powerdns.com.'
688 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
689 query = dns.message.make_query(name, 'A', 'IN')
690 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
691 response = dns.message.make_response(expectedQuery)
692 expectedResponse = dns.message.make_response(query)
693 rrset = dns.rrset.from_text(name,
694 3600,
695 dns.rdataclass.IN,
696 dns.rdatatype.AAAA,
697 '::1')
698 response.answer.append(rrset)
699 expectedResponse.answer.append(rrset)
700
701 for method in ("sendUDPQuery", "sendTCPQuery"):
702 sender = getattr(self, method)
703 (receivedQuery, receivedResponse) = sender(query, response)
704 self.assertTrue(receivedQuery)
705 self.assertTrue(receivedResponse)
706 receivedQuery.id = expectedQuery.id
707 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
708 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
709
710 def testWithECSDisabledViaRule(self):
711 """
712 ECS Disable: ECS enabled in the backend, but disabled by a rule
713 """
714 name = 'disabled.ecsrules.tests.powerdns.com.'
715 query = dns.message.make_query(name, 'A', 'IN')
716 response = dns.message.make_response(query)
717 rrset = dns.rrset.from_text(name,
718 3600,
719 dns.rdataclass.IN,
720 dns.rdatatype.A,
721 '127.0.0.1')
722 response.answer.append(rrset)
723
724 for method in ("sendUDPQuery", "sendTCPQuery"):
725 sender = getattr(self, method)
726 (receivedQuery, receivedResponse) = sender(query, response)
727 self.assertTrue(receivedQuery)
728 self.assertTrue(receivedResponse)
729 receivedQuery.id = query.id
730 self.checkQueryNoEDNS(query, receivedQuery)
731 self.checkResponseNoEDNS(response, receivedResponse)
732
733 def testWithECSDisabledViaLua(self):
734 """
735 ECS Disable: ECS enabled in the backend, but disabled via Lua
736 """
737 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
738 query = dns.message.make_query(name, 'A', 'IN')
739 response = dns.message.make_response(query)
740 rrset = dns.rrset.from_text(name,
741 3600,
742 dns.rdataclass.IN,
743 dns.rdatatype.A,
744 '127.0.0.1')
745 response.answer.append(rrset)
746
747 for method in ("sendUDPQuery", "sendTCPQuery"):
748 sender = getattr(self, method)
749 (receivedQuery, receivedResponse) = sender(query, response)
750 self.assertTrue(receivedQuery)
751 self.assertTrue(receivedResponse)
752 receivedQuery.id = query.id
753 self.checkQueryNoEDNS(query, receivedQuery)
754 self.checkResponseNoEDNS(response, receivedResponse)
755
756 class TestECSOverrideSetByRuleOrLua(DNSDistTest):
757 """
758 dnsdist is configured to set the EDNS0 Client Subnet
759 option without overriding an existing one, but we
760 force the overriding via ECSOverrideAction() or Lua.
761 """
762
763 _config_template = """
764 setECSOverride(false)
765 setECSSourcePrefixV4(24)
766 setECSSourcePrefixV6(56)
767 newServer{address="127.0.0.1:%s", useClientSubnet=true}
768 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
769 function overrideECSViaLua(dq)
770 dq.ecsOverride = true
771 return DNSAction.None, ""
772 end
773 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
774 """
775
776 def testWithECSOverrideNotSet(self):
777 """
778 ECS Override: not set via Lua or a rule
779 """
780 name = 'notoverridden.ecsrules.tests.powerdns.com.'
781 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
782 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
783 response = dns.message.make_response(query)
784 response.use_edns(edns=True, payload=4096, options=[ecso])
785 rrset = dns.rrset.from_text(name,
786 3600,
787 dns.rdataclass.IN,
788 dns.rdatatype.A,
789 '127.0.0.1')
790 response.answer.append(rrset)
791
792 for method in ("sendUDPQuery", "sendTCPQuery"):
793 sender = getattr(self, method)
794 (receivedQuery, receivedResponse) = sender(query, response)
795 self.assertTrue(receivedQuery)
796 self.assertTrue(receivedResponse)
797 receivedQuery.id = query.id
798 self.checkQueryEDNSWithECS(query, receivedQuery)
799 self.checkResponseEDNSWithECS(response, receivedResponse)
800
801 def testWithECSOverrideSetViaRule(self):
802 """
803 ECS Override: set with a rule
804 """
805 name = 'overridden.ecsrules.tests.powerdns.com.'
806 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
807 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
808 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
809 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
810 response = dns.message.make_response(query)
811 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
812 rrset = dns.rrset.from_text(name,
813 3600,
814 dns.rdataclass.IN,
815 dns.rdatatype.A,
816 '127.0.0.1')
817 response.answer.append(rrset)
818
819 for method in ("sendUDPQuery", "sendTCPQuery"):
820 sender = getattr(self, method)
821 (receivedQuery, receivedResponse) = sender(query, response)
822 self.assertTrue(receivedQuery)
823 self.assertTrue(receivedResponse)
824 receivedQuery.id = expectedQuery.id
825 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
826 self.checkResponseEDNSWithECS(response, receivedResponse)
827
828 def testWithECSOverrideSetViaLua(self):
829 """
830 ECS Override: set via Lua
831 """
832 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
833 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
834 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
835 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
836 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
837 response = dns.message.make_response(query)
838 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
839 rrset = dns.rrset.from_text(name,
840 3600,
841 dns.rdataclass.IN,
842 dns.rdatatype.A,
843 '127.0.0.1')
844 response.answer.append(rrset)
845
846 for method in ("sendUDPQuery", "sendTCPQuery"):
847 sender = getattr(self, method)
848 (receivedQuery, receivedResponse) = sender(query, response)
849 self.assertTrue(receivedQuery)
850 self.assertTrue(receivedResponse)
851 receivedQuery.id = expectedQuery.id
852 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
853 self.checkResponseEDNSWithECS(response, receivedResponse)
854
855 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
856 """
857 dnsdist is configured to set the EDNS0 Client Subnet
858 option with a prefix length of 24 for IPv4 and 56 for IPv6,
859 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
860 """
861
862 _config_template = """
863 setECSOverride(false)
864 setECSSourcePrefixV4(24)
865 setECSSourcePrefixV6(56)
866 newServer{address="127.0.0.1:%s", useClientSubnet=true}
867 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
868 function overrideECSPrefixLengthViaLua(dq)
869 dq.ecsPrefixLength = 32
870 return DNSAction.None, ""
871 end
872 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
873 """
874
875 def testWithECSPrefixLengthNotOverridden(self):
876 """
877 ECS Prefix Length: not overridden via Lua or a rule
878 """
879 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
880 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
881 query = dns.message.make_query(name, 'A', 'IN')
882 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
883 response = dns.message.make_response(query)
884 response.use_edns(edns=True, payload=4096, options=[ecso])
885 rrset = dns.rrset.from_text(name,
886 3600,
887 dns.rdataclass.IN,
888 dns.rdatatype.A,
889 '127.0.0.1')
890 response.answer.append(rrset)
891 expectedResponse = dns.message.make_response(query)
892 expectedResponse.answer.append(rrset)
893
894 for method in ("sendUDPQuery", "sendTCPQuery"):
895 sender = getattr(self, method)
896 (receivedQuery, receivedResponse) = sender(query, response)
897 self.assertTrue(receivedQuery)
898 self.assertTrue(receivedResponse)
899 receivedQuery.id = expectedQuery.id
900 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
901 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
902
903 def testWithECSPrefixLengthOverriddenViaRule(self):
904 """
905 ECS Prefix Length: overridden with a rule
906 """
907 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
908 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
909 query = dns.message.make_query(name, 'A', 'IN')
910 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
911 response = dns.message.make_response(expectedQuery)
912 rrset = dns.rrset.from_text(name,
913 3600,
914 dns.rdataclass.IN,
915 dns.rdatatype.A,
916 '127.0.0.1')
917 response.answer.append(rrset)
918 expectedResponse = dns.message.make_response(query)
919 expectedResponse.answer.append(rrset)
920
921 for method in ("sendUDPQuery", "sendTCPQuery"):
922 sender = getattr(self, method)
923 (receivedQuery, receivedResponse) = sender(query, response)
924 self.assertTrue(receivedQuery)
925 self.assertTrue(receivedResponse)
926 receivedQuery.id = expectedQuery.id
927 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
928 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
929
930 def testWithECSPrefixLengthOverriddenViaLua(self):
931 """
932 ECS Prefix Length: overridden via Lua
933 """
934 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
935 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
936 query = dns.message.make_query(name, 'A', 'IN')
937 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
938 response = dns.message.make_response(expectedQuery)
939 rrset = dns.rrset.from_text(name,
940 3600,
941 dns.rdataclass.IN,
942 dns.rdatatype.A,
943 '127.0.0.1')
944 response.answer.append(rrset)
945 expectedResponse = dns.message.make_response(query)
946 expectedResponse.answer.append(rrset)
947
948 for method in ("sendUDPQuery", "sendTCPQuery"):
949 sender = getattr(self, method)
950 (receivedQuery, receivedResponse) = sender(query, response)
951 self.assertTrue(receivedQuery)
952 self.assertTrue(receivedResponse)
953 receivedQuery.id = expectedQuery.id
954 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
955 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
956
957 class TestECSPrefixSetByRule(DNSDistTest):
958 """
959 dnsdist is configured to set the EDNS0 Client Subnet
960 option for incoming queries to the actual source IP,
961 but we override it for some queries via SetECSAction().
962 """
963
964 _config_template = """
965 setECSOverride(false)
966 setECSSourcePrefixV4(32)
967 setECSSourcePrefixV6(128)
968 newServer{address="127.0.0.1:%s", useClientSubnet=true}
969 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
970 """
971
972 def testWithRegularECS(self):
973 """
974 ECS Prefix: not set
975 """
976 name = 'notsetecsaction.ecsrules.tests.powerdns.com.'
977 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
978 query = dns.message.make_query(name, 'A', 'IN')
979 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
980 response = dns.message.make_response(query)
981 response.use_edns(edns=True, payload=4096, options=[ecso])
982 rrset = dns.rrset.from_text(name,
983 3600,
984 dns.rdataclass.IN,
985 dns.rdatatype.A,
986 '127.0.0.1')
987 response.answer.append(rrset)
988 expectedResponse = dns.message.make_response(query)
989 expectedResponse.answer.append(rrset)
990
991 for method in ("sendUDPQuery", "sendTCPQuery"):
992 sender = getattr(self, method)
993 (receivedQuery, receivedResponse) = sender(query, response)
994 self.assertTrue(receivedQuery)
995 self.assertTrue(receivedResponse)
996 receivedQuery.id = expectedQuery.id
997 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
998 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
999
1000 def testWithECSSetByRule(self):
1001 """
1002 ECS Prefix: set with SetECSAction
1003 """
1004 name = 'setecsaction.ecsrules.tests.powerdns.com.'
1005 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
1006 query = dns.message.make_query(name, 'A', 'IN')
1007 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
1008 response = dns.message.make_response(expectedQuery)
1009 rrset = dns.rrset.from_text(name,
1010 3600,
1011 dns.rdataclass.IN,
1012 dns.rdatatype.A,
1013 '127.0.0.1')
1014 response.answer.append(rrset)
1015 expectedResponse = dns.message.make_response(query)
1016 expectedResponse.answer.append(rrset)
1017
1018 for method in ("sendUDPQuery", "sendTCPQuery"):
1019 sender = getattr(self, method)
1020 (receivedQuery, receivedResponse) = sender(query, response)
1021 self.assertTrue(receivedQuery)
1022 self.assertTrue(receivedResponse)
1023 receivedQuery.id = expectedQuery.id
1024 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
1025 self.checkResponseNoEDNS(expectedResponse, receivedResponse)