]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #5916 from pieterlexis/rm-wiki
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class TestEdnsClientSubnetNoOverride(DNSDistTest):
8 """
9 dnsdist is configured to add the EDNS0 Client Subnet
10 option, but only if it's not already present in the
11 original query.
12 """
13
14 _config_template = """
15 truncateTC(true)
16 newServer{address="127.0.0.1:%s", useClientSubnet=true}
17 """
18
19 def testWithoutEDNS(self):
20 """
21 ECS: No existing EDNS
22
23 Send a query without EDNS, check that the query
24 received by the responder has the correct ECS value
25 and that the response received from dnsdist does not
26 have an EDNS pseudo-RR.
27 """
28 name = 'withoutedns.ecs.tests.powerdns.com.'
29 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
30 query = dns.message.make_query(name, 'A', 'IN')
31 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
32 response = dns.message.make_response(expectedQuery)
33 expectedResponse = dns.message.make_response(query)
34 rrset = dns.rrset.from_text(name,
35 3600,
36 dns.rdataclass.IN,
37 dns.rdatatype.A,
38 '127.0.0.1')
39 response.answer.append(rrset)
40 expectedResponse.answer.append(rrset)
41
42 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
43 self.assertTrue(receivedQuery)
44 self.assertTrue(receivedResponse)
45 receivedQuery.id = expectedQuery.id
46 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
47 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
48
49 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
50 self.assertTrue(receivedQuery)
51 self.assertTrue(receivedResponse)
52 receivedQuery.id = expectedQuery.id
53 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
54 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
55
56 def testWithEDNSNoECS(self):
57 """
58 ECS: Existing EDNS without ECS
59
60 Send a query with EDNS but no ECS value.
61 Check that the query received by the responder
62 has a valid ECS value and that the response
63 received from dnsdist contains an EDNS pseudo-RR.
64 """
65 name = 'withednsnoecs.ecs.tests.powerdns.com.'
66 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
67 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
68 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
69 response = dns.message.make_response(expectedQuery)
70 expectedResponse = dns.message.make_response(query)
71 rrset = dns.rrset.from_text(name,
72 3600,
73 dns.rdataclass.IN,
74 dns.rdatatype.A,
75 '127.0.0.1')
76 response.answer.append(rrset)
77 expectedResponse.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 self.assertTrue(receivedQuery)
81 self.assertTrue(receivedResponse)
82 receivedQuery.id = expectedQuery.id
83 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
84 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
85
86 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = expectedQuery.id
90 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
91 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
92
93 def testWithEDNSECS(self):
94 """
95 ECS: Existing EDNS with ECS
96
97 Send a query with EDNS and a crafted ECS value.
98 Check that the query received by the responder
99 has the initial ECS value (not overwritten)
100 and that the response received from dnsdist contains
101 an EDNS pseudo-RR.
102 """
103 name = 'withednsecs.ecs.tests.powerdns.com.'
104 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
105 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
106 response = dns.message.make_response(query)
107 rrset = dns.rrset.from_text(name,
108 3600,
109 dns.rdataclass.IN,
110 dns.rdatatype.A,
111 '127.0.0.1')
112 response.answer.append(rrset)
113
114
115 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
116 self.assertTrue(receivedQuery)
117 self.assertTrue(receivedResponse)
118 receivedQuery.id = query.id
119 self.checkQueryEDNSWithECS(query, receivedQuery)
120 self.checkResponseEDNSWithoutECS(response, receivedResponse)
121
122 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
123 self.assertTrue(receivedQuery)
124 self.assertTrue(receivedResponse)
125 receivedQuery.id = query.id
126 self.checkQueryEDNSWithECS(query, receivedQuery)
127 self.checkResponseEDNSWithoutECS(response, receivedResponse)
128
129 def testWithoutEDNSResponseWithECS(self):
130 """
131 ECS: No existing EDNS (BE returning ECS)
132
133 Send a query without EDNS, check that the query
134 received by the responder has the correct ECS value
135 and that the response received from dnsdist does not
136 have an EDNS pseudo-RR.
137 This time the response returned by the backend contains
138 an ECS option with scope set.
139 """
140 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
141 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
142 query = dns.message.make_query(name, 'A', 'IN')
143 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
144 response = dns.message.make_response(expectedQuery)
145 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
146 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
147 expectedResponse = dns.message.make_response(query)
148 rrset = dns.rrset.from_text(name,
149 3600,
150 dns.rdataclass.IN,
151 dns.rdatatype.A,
152 '127.0.0.1')
153 response.answer.append(rrset)
154 expectedResponse.answer.append(rrset)
155
156 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
157 self.assertTrue(receivedQuery)
158 self.assertTrue(receivedResponse)
159 receivedQuery.id = expectedQuery.id
160 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
161 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
162
163 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
164 self.assertTrue(receivedQuery)
165 self.assertTrue(receivedResponse)
166 receivedQuery.id = expectedQuery.id
167 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
168 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
169
170 def testWithEDNSNoECSResponseWithECS(self):
171 """
172 ECS: Existing EDNS without ECS (BE returning only the ECS option)
173
174 Send a query with EDNS but no ECS value.
175 Check that the query received by the responder
176 has a valid ECS value and that the response
177 received from dnsdist contains an EDNS pseudo-RR.
178 This time the response returned by the backend contains
179 an ECS option with scope set.
180 """
181 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
182 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
183 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
184 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
185 response = dns.message.make_response(expectedQuery)
186 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
187 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
188 expectedResponse = dns.message.make_response(query)
189 rrset = dns.rrset.from_text(name,
190 3600,
191 dns.rdataclass.IN,
192 dns.rdatatype.A,
193 '127.0.0.1')
194 response.answer.append(rrset)
195 expectedResponse.answer.append(rrset)
196
197 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
198 self.assertTrue(receivedQuery)
199 self.assertTrue(receivedResponse)
200 receivedQuery.id = expectedQuery.id
201 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
202 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
203
204 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
205 self.assertTrue(receivedQuery)
206 self.assertTrue(receivedResponse)
207 receivedQuery.id = expectedQuery.id
208 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
209 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
210
211 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
212 """
213 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
214
215 Send a query with EDNS but no ECS value.
216 Check that the query received by the responder
217 has a valid ECS value and that the response
218 received from dnsdist contains an EDNS pseudo-RR.
219 This time the response returned by the backend contains
220 one cookies then one ECS option.
221 """
222 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
223 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
224 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
225 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
226 response = dns.message.make_response(expectedQuery)
227 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
228 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
229 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
230 expectedResponse = dns.message.make_response(query)
231 rrset = dns.rrset.from_text(name,
232 3600,
233 dns.rdataclass.IN,
234 dns.rdatatype.A,
235 '127.0.0.1')
236 response.answer.append(rrset)
237 expectedResponse.answer.append(rrset)
238 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
239
240 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
241 self.assertTrue(receivedQuery)
242 self.assertTrue(receivedResponse)
243 receivedQuery.id = expectedQuery.id
244 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
245 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
246
247 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
248 self.assertTrue(receivedQuery)
249 self.assertTrue(receivedResponse)
250 receivedQuery.id = expectedQuery.id
251 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
252 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
253
254 def testWithEDNSNoECSResponseWithECSThenCookies(self):
255 """
256 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
257
258 Send a query with EDNS but no ECS value.
259 Check that the query received by the responder
260 has a valid ECS value and that the response
261 received from dnsdist contains an EDNS pseudo-RR.
262 This time the response returned by the backend contains
263 one ECS then one Cookies option.
264 """
265 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
266 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
267 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
268 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
269 response = dns.message.make_response(expectedQuery)
270 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
271 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
272 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
273 expectedResponse = dns.message.make_response(query)
274 rrset = dns.rrset.from_text(name,
275 3600,
276 dns.rdataclass.IN,
277 dns.rdatatype.A,
278 '127.0.0.1')
279 response.answer.append(rrset)
280 expectedResponse.answer.append(rrset)
281 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
282
283 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
284 self.assertTrue(receivedQuery)
285 self.assertTrue(receivedResponse)
286 receivedQuery.id = expectedQuery.id
287 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
288 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
289
290 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
291 self.assertTrue(receivedQuery)
292 self.assertTrue(receivedResponse)
293 receivedQuery.id = expectedQuery.id
294 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
295 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
296
297 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
298 """
299 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
300
301 Send a query with EDNS but no ECS value.
302 Check that the query received by the responder
303 has a valid ECS value and that the response
304 received from dnsdist contains an EDNS pseudo-RR.
305 This time the response returned by the backend contains
306 one Cookies, one ECS then one Cookies option.
307 """
308 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
309 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
310 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
311 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
312 response = dns.message.make_response(expectedQuery)
313 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
314 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
315 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
316 expectedResponse = dns.message.make_response(query)
317 rrset = dns.rrset.from_text(name,
318 3600,
319 dns.rdataclass.IN,
320 dns.rdatatype.A,
321 '127.0.0.1')
322 response.answer.append(rrset)
323 expectedResponse.answer.append(rrset)
324
325 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
326 self.assertTrue(receivedQuery)
327 self.assertTrue(receivedResponse)
328 receivedQuery.id = expectedQuery.id
329 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
330 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
331
332 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
333 self.assertTrue(receivedQuery)
334 self.assertTrue(receivedResponse)
335 receivedQuery.id = expectedQuery.id
336 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
337 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
338
339
340 class TestEdnsClientSubnetOverride(DNSDistTest):
341 """
342 dnsdist is configured to add the EDNS0 Client Subnet
343 option, overwriting any existing value.
344 """
345
346 _config_template = """
347 truncateTC(true)
348 setECSOverride(true)
349 setECSSourcePrefixV4(24)
350 setECSSourcePrefixV6(56)
351 newServer{address="127.0.0.1:%s", useClientSubnet=true}
352 """
353
354 def testWithoutEDNS(self):
355 """
356 ECS Override: No existing EDNS
357
358 Send a query without EDNS, check that the query
359 received by the responder has the correct ECS value
360 and that the response received from dnsdist does not
361 have an EDNS pseudo-RR.
362 """
363 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
364 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
365 query = dns.message.make_query(name, 'A', 'IN')
366 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
367 response = dns.message.make_response(expectedQuery)
368 response.use_edns(edns=True, payload=4096, options=[ecso])
369 rrset = dns.rrset.from_text(name,
370 3600,
371 dns.rdataclass.IN,
372 dns.rdatatype.A,
373 '127.0.0.1')
374 response.answer.append(rrset)
375 expectedResponse = dns.message.make_response(query)
376 expectedResponse.answer.append(rrset)
377
378 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
379 self.assertTrue(receivedQuery)
380 self.assertTrue(receivedResponse)
381 receivedQuery.id = expectedQuery.id
382 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
383 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
384
385 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
386 self.assertTrue(receivedQuery)
387 self.assertTrue(receivedResponse)
388 receivedQuery.id = expectedQuery.id
389 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
390 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
391
392 def testWithEDNSNoECS(self):
393 """
394 ECS Override: Existing EDNS without ECS
395
396 Send a query with EDNS but no ECS value.
397 Check that the query received by the responder
398 has a valid ECS value and that the response
399 received from dnsdist contains an EDNS pseudo-RR.
400 """
401 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
402 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
403 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
404 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
405 response = dns.message.make_response(expectedQuery)
406 response.use_edns(edns=True, payload=4096, options=[ecso])
407 rrset = dns.rrset.from_text(name,
408 3600,
409 dns.rdataclass.IN,
410 dns.rdatatype.A,
411 '127.0.0.1')
412 response.answer.append(rrset)
413 expectedResponse = dns.message.make_response(query)
414 expectedResponse.answer.append(rrset)
415
416 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
417 self.assertTrue(receivedQuery)
418 self.assertTrue(receivedResponse)
419 receivedQuery.id = expectedQuery.id
420 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
421 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
422
423 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
424 self.assertTrue(receivedQuery)
425 self.assertTrue(receivedResponse)
426 receivedQuery.id = expectedQuery.id
427 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
428 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
429
430 def testWithEDNSShorterInitialECS(self):
431 """
432 ECS Override: Existing EDNS with ECS (short)
433
434 Send a query with EDNS and a crafted ECS value.
435 Check that the query received by the responder
436 has an overwritten ECS value (not the initial one)
437 and that the response received from dnsdist contains
438 an EDNS pseudo-RR.
439 The initial ECS value is shorter than the one it will be
440 replaced with.
441 """
442 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
443 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
444 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
445 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
446 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
447 response = dns.message.make_response(query)
448 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
449 rrset = dns.rrset.from_text(name,
450 3600,
451 dns.rdataclass.IN,
452 dns.rdatatype.A,
453 '127.0.0.1')
454 response.answer.append(rrset)
455
456 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
457 self.assertTrue(receivedQuery)
458 self.assertTrue(receivedResponse)
459 receivedQuery.id = expectedQuery.id
460 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
461 self.checkResponseEDNSWithECS(response, receivedResponse)
462
463 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
464 self.assertTrue(receivedQuery)
465 self.assertTrue(receivedResponse)
466 receivedQuery.id = expectedQuery.id
467 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
468 self.checkResponseEDNSWithECS(response, receivedResponse)
469
470 def testWithEDNSLongerInitialECS(self):
471 """
472 ECS Override: Existing EDNS with ECS (long)
473
474 Send a query with EDNS and a crafted ECS value.
475 Check that the query received by the responder
476 has an overwritten ECS value (not the initial one)
477 and that the response received from dnsdist contains
478 an EDNS pseudo-RR.
479 The initial ECS value is longer than the one it will
480 replaced with.
481 """
482 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
483 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
484 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
485 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
486 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
487 response = dns.message.make_response(query)
488 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
489 rrset = dns.rrset.from_text(name,
490 3600,
491 dns.rdataclass.IN,
492 dns.rdatatype.A,
493 '127.0.0.1')
494 response.answer.append(rrset)
495
496 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
497 self.assertTrue(receivedQuery)
498 self.assertTrue(receivedResponse)
499 receivedQuery.id = expectedQuery.id
500 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
501 self.checkResponseEDNSWithECS(response, receivedResponse)
502
503 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
504 self.assertTrue(receivedQuery)
505 self.assertTrue(receivedResponse)
506 receivedQuery.id = expectedQuery.id
507 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
508 self.checkResponseEDNSWithECS(response, receivedResponse)
509
510 def testWithEDNSSameSizeInitialECS(self):
511 """
512 ECS Override: Existing EDNS with ECS (same)
513
514 Send a query with EDNS and a crafted ECS value.
515 Check that the query received by the responder
516 has an overwritten ECS value (not the initial one)
517 and that the response received from dnsdist contains
518 an EDNS pseudo-RR.
519 The initial ECS value is exactly the same size as
520 the one it will replaced with.
521 """
522 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
523 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
524 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
525 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
526 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
527 response = dns.message.make_response(query)
528 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
529 rrset = dns.rrset.from_text(name,
530 3600,
531 dns.rdataclass.IN,
532 dns.rdatatype.A,
533 '127.0.0.1')
534 response.answer.append(rrset)
535
536 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
537 self.assertTrue(receivedQuery)
538 self.assertTrue(receivedResponse)
539 receivedQuery.id = expectedQuery.id
540 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
541 self.checkResponseEDNSWithECS(response, receivedResponse)
542
543 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
544 self.assertTrue(receivedQuery)
545 self.assertTrue(receivedResponse)
546 receivedQuery.id = expectedQuery.id
547 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
548 self.checkResponseEDNSWithECS(response, receivedResponse)
549
550 class TestECSDisabledByRuleOrLua(DNSDistTest):
551 """
552 dnsdist is configured to add the EDNS0 Client Subnet
553 option, but we disable it via DisableECSAction()
554 or Lua.
555 """
556
557 _config_template = """
558 setECSOverride(false)
559 setECSSourcePrefixV4(16)
560 setECSSourcePrefixV6(16)
561 newServer{address="127.0.0.1:%s", useClientSubnet=true}
562 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
563 function disableECSViaLua(dq)
564 dq.useECS = false
565 return DNSAction.None, ""
566 end
567 addLuaAction("disabledvialua.ecsrules.tests.powerdns.com.", disableECSViaLua)
568 """
569
570 def testWithECSNotDisabled(self):
571 """
572 ECS Disable: ECS enabled in the backend
573 """
574 name = 'notdisabled.ecsrules.tests.powerdns.com.'
575 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
576 query = dns.message.make_query(name, 'A', 'IN')
577 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
578 response = dns.message.make_response(expectedQuery)
579 expectedResponse = dns.message.make_response(query)
580 rrset = dns.rrset.from_text(name,
581 3600,
582 dns.rdataclass.IN,
583 dns.rdatatype.AAAA,
584 '::1')
585 response.answer.append(rrset)
586 expectedResponse.answer.append(rrset)
587
588 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
589 self.assertTrue(receivedQuery)
590 self.assertTrue(receivedResponse)
591 receivedQuery.id = expectedQuery.id
592 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
593 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
594
595 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
596 self.assertTrue(receivedQuery)
597 self.assertTrue(receivedResponse)
598 receivedQuery.id = expectedQuery.id
599 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
600 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
601
602 def testWithECSDisabledViaRule(self):
603 """
604 ECS Disable: ECS enabled in the backend, but disabled by a rule
605 """
606 name = 'disabled.ecsrules.tests.powerdns.com.'
607 query = dns.message.make_query(name, 'A', 'IN')
608 response = dns.message.make_response(query)
609 rrset = dns.rrset.from_text(name,
610 3600,
611 dns.rdataclass.IN,
612 dns.rdatatype.A,
613 '127.0.0.1')
614 response.answer.append(rrset)
615
616 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
617 self.assertTrue(receivedQuery)
618 self.assertTrue(receivedResponse)
619 receivedQuery.id = query.id
620 self.checkQueryNoEDNS(query, receivedQuery)
621 self.checkResponseNoEDNS(response, receivedResponse)
622
623 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
624 self.assertTrue(receivedQuery)
625 self.assertTrue(receivedResponse)
626 receivedQuery.id = query.id
627 self.checkQueryNoEDNS(query, receivedQuery)
628 self.checkResponseNoEDNS(response, receivedResponse)
629
630 def testWithECSDisabledViaLua(self):
631 """
632 ECS Disable: ECS enabled in the backend, but disabled via Lua
633 """
634 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
635 query = dns.message.make_query(name, 'A', 'IN')
636 response = dns.message.make_response(query)
637 rrset = dns.rrset.from_text(name,
638 3600,
639 dns.rdataclass.IN,
640 dns.rdatatype.A,
641 '127.0.0.1')
642 response.answer.append(rrset)
643
644 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
645 self.assertTrue(receivedQuery)
646 self.assertTrue(receivedResponse)
647 receivedQuery.id = query.id
648 self.checkQueryNoEDNS(query, receivedQuery)
649 self.checkResponseNoEDNS(response, receivedResponse)
650
651 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
652 self.assertTrue(receivedQuery)
653 self.assertTrue(receivedResponse)
654 receivedQuery.id = query.id
655 self.checkQueryNoEDNS(query, receivedQuery)
656 self.checkResponseNoEDNS(response, receivedResponse)
657
658 class TestECSOverrideSetByRuleOrLua(DNSDistTest):
659 """
660 dnsdist is configured to set the EDNS0 Client Subnet
661 option without overriding an existing one, but we
662 force the overriding via ECSOverrideAction() or Lua.
663 """
664
665 _config_template = """
666 setECSOverride(false)
667 setECSSourcePrefixV4(24)
668 setECSSourcePrefixV6(56)
669 newServer{address="127.0.0.1:%s", useClientSubnet=true}
670 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
671 function overrideECSViaLua(dq)
672 dq.ecsOverride = true
673 return DNSAction.None, ""
674 end
675 addLuaAction("overriddenvialua.ecsrules.tests.powerdns.com.", overrideECSViaLua)
676 """
677
678 def testWithECSOverrideNotSet(self):
679 """
680 ECS Override: not set via Lua or a rule
681 """
682 name = 'notoverridden.ecsrules.tests.powerdns.com.'
683 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
684 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
685 response = dns.message.make_response(query)
686 response.use_edns(edns=True, payload=4096, options=[ecso])
687 rrset = dns.rrset.from_text(name,
688 3600,
689 dns.rdataclass.IN,
690 dns.rdatatype.A,
691 '127.0.0.1')
692 response.answer.append(rrset)
693
694 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
695 self.assertTrue(receivedQuery)
696 self.assertTrue(receivedResponse)
697 receivedQuery.id = query.id
698 self.checkQueryEDNSWithECS(query, receivedQuery)
699 self.checkResponseEDNSWithECS(response, receivedResponse)
700
701 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
702 self.assertTrue(receivedQuery)
703 self.assertTrue(receivedResponse)
704 receivedQuery.id = query.id
705 self.checkQueryEDNSWithECS(query, receivedQuery)
706 self.checkResponseEDNSWithECS(response, receivedResponse)
707
708 def testWithECSOverrideSetViaRule(self):
709 """
710 ECS Override: set with a rule
711 """
712 name = 'overridden.ecsrules.tests.powerdns.com.'
713 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
714 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
715 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
716 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
717 response = dns.message.make_response(query)
718 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
719 rrset = dns.rrset.from_text(name,
720 3600,
721 dns.rdataclass.IN,
722 dns.rdatatype.A,
723 '127.0.0.1')
724 response.answer.append(rrset)
725
726 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
727 self.assertTrue(receivedQuery)
728 self.assertTrue(receivedResponse)
729 receivedQuery.id = expectedQuery.id
730 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
731 self.checkResponseEDNSWithECS(response, receivedResponse)
732
733 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
734 self.assertTrue(receivedQuery)
735 self.assertTrue(receivedResponse)
736 receivedQuery.id = expectedQuery.id
737 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
738 self.checkResponseEDNSWithECS(response, receivedResponse)
739
740 def testWithECSOverrideSetViaLua(self):
741 """
742 ECS Override: set via Lua
743 """
744 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
745 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
746 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
747 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
748 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
749 response = dns.message.make_response(query)
750 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
751 rrset = dns.rrset.from_text(name,
752 3600,
753 dns.rdataclass.IN,
754 dns.rdatatype.A,
755 '127.0.0.1')
756 response.answer.append(rrset)
757
758 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
759 self.assertTrue(receivedQuery)
760 self.assertTrue(receivedResponse)
761 receivedQuery.id = expectedQuery.id
762 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
763 self.checkResponseEDNSWithECS(response, receivedResponse)
764
765 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
766 self.assertTrue(receivedQuery)
767 self.assertTrue(receivedResponse)
768 receivedQuery.id = expectedQuery.id
769 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
770 self.checkResponseEDNSWithECS(response, receivedResponse)
771
772 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
773 """
774 dnsdist is configured to set the EDNS0 Client Subnet
775 option with a prefix length of 24 for IPv4 and 56 for IPv6,
776 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
777 """
778
779 _config_template = """
780 setECSOverride(false)
781 setECSSourcePrefixV4(24)
782 setECSSourcePrefixV6(56)
783 newServer{address="127.0.0.1:%s", useClientSubnet=true}
784 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
785 function overrideECSPrefixLengthViaLua(dq)
786 dq.ecsPrefixLength = 32
787 return DNSAction.None, ""
788 end
789 addLuaAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", overrideECSPrefixLengthViaLua)
790 """
791
792 def testWithECSPrefixLengthNotOverridden(self):
793 """
794 ECS Prefix Length: not overridden via Lua or a rule
795 """
796 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
797 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
798 query = dns.message.make_query(name, 'A', 'IN')
799 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
800 response = dns.message.make_response(query)
801 response.use_edns(edns=True, payload=4096, options=[ecso])
802 rrset = dns.rrset.from_text(name,
803 3600,
804 dns.rdataclass.IN,
805 dns.rdatatype.A,
806 '127.0.0.1')
807 response.answer.append(rrset)
808 expectedResponse = dns.message.make_response(query)
809 expectedResponse.answer.append(rrset)
810
811 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
812 self.assertTrue(receivedQuery)
813 self.assertTrue(receivedResponse)
814 receivedQuery.id = expectedQuery.id
815 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
816 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
817
818 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
819 self.assertTrue(receivedQuery)
820 self.assertTrue(receivedResponse)
821 receivedQuery.id = expectedQuery.id
822 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
823 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
824
825 def testWithECSPrefixLengthOverriddenViaRule(self):
826 """
827 ECS Prefix Length: overridden with a rule
828 """
829 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
830 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
831 query = dns.message.make_query(name, 'A', 'IN')
832 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
833 response = dns.message.make_response(expectedQuery)
834 rrset = dns.rrset.from_text(name,
835 3600,
836 dns.rdataclass.IN,
837 dns.rdatatype.A,
838 '127.0.0.1')
839 response.answer.append(rrset)
840 expectedResponse = dns.message.make_response(query)
841 expectedResponse.answer.append(rrset)
842
843 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
844 self.assertTrue(receivedQuery)
845 self.assertTrue(receivedResponse)
846 receivedQuery.id = expectedQuery.id
847 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
848 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
849
850 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
851 self.assertTrue(receivedQuery)
852 self.assertTrue(receivedResponse)
853 receivedQuery.id = expectedQuery.id
854 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
855 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
856
857 def testWithECSPrefixLengthOverriddenViaLua(self):
858 """
859 ECS Prefix Length: overridden via Lua
860 """
861 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
862 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
863 query = dns.message.make_query(name, 'A', 'IN')
864 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
865 response = dns.message.make_response(expectedQuery)
866 rrset = dns.rrset.from_text(name,
867 3600,
868 dns.rdataclass.IN,
869 dns.rdatatype.A,
870 '127.0.0.1')
871 response.answer.append(rrset)
872 expectedResponse = dns.message.make_response(query)
873 expectedResponse.answer.append(rrset)
874
875 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
876 self.assertTrue(receivedQuery)
877 self.assertTrue(receivedResponse)
878 receivedQuery.id = expectedQuery.id
879 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
880 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
881
882 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
883 self.assertTrue(receivedQuery)
884 self.assertTrue(receivedResponse)
885 receivedQuery.id = expectedQuery.id
886 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
887 self.checkResponseNoEDNS(expectedResponse, receivedResponse)