]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #4508 from Habbie/wrong
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class TestEdnsClientSubnet(DNSDistTest):
8 def compareOptions(self, a, b):
9 self.assertEquals(len(a), len(b))
10 for idx in xrange(len(a)):
11 self.assertEquals(a[idx], b[idx])
12
13 def checkMessageNoEDNS(self, expected, received):
14 self.assertEquals(expected, received)
15 self.assertEquals(received.edns, -1)
16 self.assertEquals(len(received.options), 0)
17
18 def checkMessageEDNSWithoutECS(self, expected, received, withCookies=0):
19 self.assertEquals(expected, received)
20 self.assertEquals(received.edns, 0)
21 self.assertEquals(len(received.options), withCookies)
22 if withCookies:
23 for option in received.options:
24 self.assertEquals(option.otype, 10)
25
26 def checkMessageEDNSWithECS(self, expected, received):
27 self.assertEquals(expected, received)
28 self.assertEquals(received.edns, 0)
29 self.assertEquals(len(received.options), 1)
30 self.assertEquals(received.options[0].otype, clientsubnetoption.ASSIGNED_OPTION_CODE)
31 self.compareOptions(expected.options, received.options)
32
33 def checkQueryEDNSWithECS(self, expected, received):
34 self.checkMessageEDNSWithECS(expected, received)
35
36 def checkResponseEDNSWithECS(self, expected, received):
37 self.checkMessageEDNSWithECS(expected, received)
38
39 def checkQueryEDNSWithoutECS(self, expected, received):
40 self.checkMessageEDNSWithoutECS(expected, received)
41
42 def checkResponseEDNSWithoutECS(self, expected, received, withCookies=0):
43 self.checkMessageEDNSWithoutECS(expected, received, withCookies)
44
45 def checkQueryNoEDNS(self, expected, received):
46 self.checkMessageNoEDNS(expected, received)
47
48 def checkResponseNoEDNS(self, expected, received):
49 self.checkMessageNoEDNS(expected, received)
50
51 class TestEdnsClientSubnetNoOverride(TestEdnsClientSubnet):
52 """
53 dnsdist is configured to add the EDNS0 Client Subnet
54 option, but only if it's not already present in the
55 original query.
56 """
57
58 _config_template = """
59 truncateTC(true)
60 newServer{address="127.0.0.1:%s", useClientSubnet=true}
61 """
62
63 def testWithoutEDNS(self):
64 """
65 ECS: No existing EDNS
66
67 Send a query without EDNS, check that the query
68 received by the responder has the correct ECS value
69 and that the response received from dnsdist does not
70 have an EDNS pseudo-RR.
71 """
72 name = 'withoutedns.ecs.tests.powerdns.com.'
73 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
74 query = dns.message.make_query(name, 'A', 'IN')
75 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
76 response = dns.message.make_response(expectedQuery)
77 expectedResponse = dns.message.make_response(query)
78 rrset = dns.rrset.from_text(name,
79 3600,
80 dns.rdataclass.IN,
81 dns.rdatatype.A,
82 '127.0.0.1')
83 response.answer.append(rrset)
84 expectedResponse.answer.append(rrset)
85
86 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = expectedQuery.id
90 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
91 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
92
93 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
94 self.assertTrue(receivedQuery)
95 self.assertTrue(receivedResponse)
96 receivedQuery.id = expectedQuery.id
97 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
98 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
99
100 def testWithEDNSNoECS(self):
101 """
102 ECS: Existing EDNS without ECS
103
104 Send a query with EDNS but no ECS value.
105 Check that the query received by the responder
106 has a valid ECS value and that the response
107 received from dnsdist contains an EDNS pseudo-RR.
108 """
109 name = 'withednsnoecs.ecs.tests.powerdns.com.'
110 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
111 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
112 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
113 response = dns.message.make_response(expectedQuery)
114 expectedResponse = dns.message.make_response(query)
115 rrset = dns.rrset.from_text(name,
116 3600,
117 dns.rdataclass.IN,
118 dns.rdatatype.A,
119 '127.0.0.1')
120 response.answer.append(rrset)
121 expectedResponse.answer.append(rrset)
122
123 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
124 self.assertTrue(receivedQuery)
125 self.assertTrue(receivedResponse)
126 receivedQuery.id = expectedQuery.id
127 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
128 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
129
130 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
131 self.assertTrue(receivedQuery)
132 self.assertTrue(receivedResponse)
133 receivedQuery.id = expectedQuery.id
134 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
135 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
136
137 def testWithEDNSECS(self):
138 """
139 ECS: Existing EDNS with ECS
140
141 Send a query with EDNS and a crafted ECS value.
142 Check that the query received by the responder
143 has the initial ECS value (not overwritten)
144 and that the response received from dnsdist contains
145 an EDNS pseudo-RR.
146 """
147 name = 'withednsecs.ecs.tests.powerdns.com.'
148 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
149 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
150 response = dns.message.make_response(query)
151 rrset = dns.rrset.from_text(name,
152 3600,
153 dns.rdataclass.IN,
154 dns.rdatatype.A,
155 '127.0.0.1')
156 response.answer.append(rrset)
157
158
159 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
160 self.assertTrue(receivedQuery)
161 self.assertTrue(receivedResponse)
162 receivedQuery.id = query.id
163 self.checkQueryEDNSWithECS(query, receivedQuery)
164 self.checkResponseEDNSWithoutECS(response, receivedResponse)
165
166 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
167 self.assertTrue(receivedQuery)
168 self.assertTrue(receivedResponse)
169 receivedQuery.id = query.id
170 self.checkQueryEDNSWithECS(query, receivedQuery)
171 self.checkResponseEDNSWithoutECS(response, receivedResponse)
172
173 def testWithoutEDNSResponseWithECS(self):
174 """
175 ECS: No existing EDNS (BE returning ECS)
176
177 Send a query without EDNS, check that the query
178 received by the responder has the correct ECS value
179 and that the response received from dnsdist does not
180 have an EDNS pseudo-RR.
181 This time the response returned by the backend contains
182 an ECS option with scope set.
183 """
184 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
185 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
186 query = dns.message.make_query(name, 'A', 'IN')
187 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
188 response = dns.message.make_response(expectedQuery)
189 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
190 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
191 expectedResponse = dns.message.make_response(query)
192 rrset = dns.rrset.from_text(name,
193 3600,
194 dns.rdataclass.IN,
195 dns.rdatatype.A,
196 '127.0.0.1')
197 response.answer.append(rrset)
198 expectedResponse.answer.append(rrset)
199
200 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
201 self.assertTrue(receivedQuery)
202 self.assertTrue(receivedResponse)
203 receivedQuery.id = expectedQuery.id
204 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
205 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
206
207 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
208 self.assertTrue(receivedQuery)
209 self.assertTrue(receivedResponse)
210 receivedQuery.id = expectedQuery.id
211 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
212 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
213
214 def testWithEDNSNoECSResponseWithECS(self):
215 """
216 ECS: Existing EDNS without ECS (BE returning only the ECS option)
217
218 Send a query with EDNS but no ECS value.
219 Check that the query received by the responder
220 has a valid ECS value and that the response
221 received from dnsdist contains an EDNS pseudo-RR.
222 This time the response returned by the backend contains
223 an ECS option with scope set.
224 """
225 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
226 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
227 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
228 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
229 response = dns.message.make_response(expectedQuery)
230 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
231 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
232 expectedResponse = dns.message.make_response(query)
233 rrset = dns.rrset.from_text(name,
234 3600,
235 dns.rdataclass.IN,
236 dns.rdatatype.A,
237 '127.0.0.1')
238 response.answer.append(rrset)
239 expectedResponse.answer.append(rrset)
240
241 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
242 self.assertTrue(receivedQuery)
243 self.assertTrue(receivedResponse)
244 receivedQuery.id = expectedQuery.id
245 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
246 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
247
248 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
249 self.assertTrue(receivedQuery)
250 self.assertTrue(receivedResponse)
251 receivedQuery.id = expectedQuery.id
252 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
253 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
254
255 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
256 """
257 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
258
259 Send a query with EDNS but no ECS value.
260 Check that the query received by the responder
261 has a valid ECS value and that the response
262 received from dnsdist contains an EDNS pseudo-RR.
263 This time the response returned by the backend contains
264 one cookies then one ECS option.
265 """
266 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
267 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
268 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
269 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
270 response = dns.message.make_response(expectedQuery)
271 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
272 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
273 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
274 expectedResponse = dns.message.make_response(query)
275 rrset = dns.rrset.from_text(name,
276 3600,
277 dns.rdataclass.IN,
278 dns.rdatatype.A,
279 '127.0.0.1')
280 response.answer.append(rrset)
281 expectedResponse.answer.append(rrset)
282 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
283
284 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
285 self.assertTrue(receivedQuery)
286 self.assertTrue(receivedResponse)
287 receivedQuery.id = expectedQuery.id
288 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
289 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
290
291 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
292 self.assertTrue(receivedQuery)
293 self.assertTrue(receivedResponse)
294 receivedQuery.id = expectedQuery.id
295 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
296 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
297
298 def testWithEDNSNoECSResponseWithECSThenCookies(self):
299 """
300 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
301
302 Send a query with EDNS but no ECS value.
303 Check that the query received by the responder
304 has a valid ECS value and that the response
305 received from dnsdist contains an EDNS pseudo-RR.
306 This time the response returned by the backend contains
307 one ECS then one Cookies option.
308 """
309 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
310 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
311 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
312 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
313 response = dns.message.make_response(expectedQuery)
314 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
315 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
316 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
317 expectedResponse = dns.message.make_response(query)
318 rrset = dns.rrset.from_text(name,
319 3600,
320 dns.rdataclass.IN,
321 dns.rdatatype.A,
322 '127.0.0.1')
323 response.answer.append(rrset)
324 expectedResponse.answer.append(rrset)
325 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
326
327 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
328 self.assertTrue(receivedQuery)
329 self.assertTrue(receivedResponse)
330 receivedQuery.id = expectedQuery.id
331 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
332 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
333
334 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
335 self.assertTrue(receivedQuery)
336 self.assertTrue(receivedResponse)
337 receivedQuery.id = expectedQuery.id
338 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
339 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
340
341 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
342 """
343 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
344
345 Send a query with EDNS but no ECS value.
346 Check that the query received by the responder
347 has a valid ECS value and that the response
348 received from dnsdist contains an EDNS pseudo-RR.
349 This time the response returned by the backend contains
350 one Cookies, one ECS then one Cookies option.
351 """
352 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
353 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
354 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
355 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
356 response = dns.message.make_response(expectedQuery)
357 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
358 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
359 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
360 expectedResponse = dns.message.make_response(query)
361 rrset = dns.rrset.from_text(name,
362 3600,
363 dns.rdataclass.IN,
364 dns.rdatatype.A,
365 '127.0.0.1')
366 response.answer.append(rrset)
367 expectedResponse.answer.append(rrset)
368
369 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
370 self.assertTrue(receivedQuery)
371 self.assertTrue(receivedResponse)
372 receivedQuery.id = expectedQuery.id
373 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
374 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
375
376 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
377 self.assertTrue(receivedQuery)
378 self.assertTrue(receivedResponse)
379 receivedQuery.id = expectedQuery.id
380 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
381 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
382
383
384 class TestEdnsClientSubnetOverride(TestEdnsClientSubnet):
385 """
386 dnsdist is configured to add the EDNS0 Client Subnet
387 option, overwriting any existing value.
388 """
389
390 _config_template = """
391 truncateTC(true)
392 setECSOverride(true)
393 setECSSourcePrefixV4(24)
394 setECSSourcePrefixV6(56)
395 newServer{address="127.0.0.1:%s", useClientSubnet=true}
396 """
397
398 def testWithoutEDNS(self):
399 """
400 ECS Override: No existing EDNS
401
402 Send a query without EDNS, check that the query
403 received by the responder has the correct ECS value
404 and that the response received from dnsdist does not
405 have an EDNS pseudo-RR.
406 """
407 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
408 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
409 query = dns.message.make_query(name, 'A', 'IN')
410 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
411 response = dns.message.make_response(expectedQuery)
412 response.use_edns(edns=True, payload=4096, options=[ecso])
413 rrset = dns.rrset.from_text(name,
414 3600,
415 dns.rdataclass.IN,
416 dns.rdatatype.A,
417 '127.0.0.1')
418 response.answer.append(rrset)
419 expectedResponse = dns.message.make_response(query)
420 expectedResponse.answer.append(rrset)
421
422 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
423 self.assertTrue(receivedQuery)
424 self.assertTrue(receivedResponse)
425 receivedQuery.id = expectedQuery.id
426 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
427 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
428
429 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
430 self.assertTrue(receivedQuery)
431 self.assertTrue(receivedResponse)
432 receivedQuery.id = expectedQuery.id
433 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
434 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
435
436 def testWithEDNSNoECS(self):
437 """
438 ECS Override: Existing EDNS without ECS
439
440 Send a query with EDNS but no ECS value.
441 Check that the query received by the responder
442 has a valid ECS value and that the response
443 received from dnsdist contains an EDNS pseudo-RR.
444 """
445 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
446 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
447 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
448 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
449 response = dns.message.make_response(expectedQuery)
450 response.use_edns(edns=True, payload=4096, options=[ecso])
451 rrset = dns.rrset.from_text(name,
452 3600,
453 dns.rdataclass.IN,
454 dns.rdatatype.A,
455 '127.0.0.1')
456 response.answer.append(rrset)
457 expectedResponse = dns.message.make_response(query)
458 expectedResponse.answer.append(rrset)
459
460 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
461 self.assertTrue(receivedQuery)
462 self.assertTrue(receivedResponse)
463 receivedQuery.id = expectedQuery.id
464 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
465 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
466
467 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
468 self.assertTrue(receivedQuery)
469 self.assertTrue(receivedResponse)
470 receivedQuery.id = expectedQuery.id
471 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
472 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
473
474 def testWithEDNSShorterInitialECS(self):
475 """
476 ECS Override: Existing EDNS with ECS (short)
477
478 Send a query with EDNS and a crafted ECS value.
479 Check that the query received by the responder
480 has an overwritten ECS value (not the initial one)
481 and that the response received from dnsdist contains
482 an EDNS pseudo-RR.
483 The initial ECS value is shorter than the one it will be
484 replaced with.
485 """
486 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
487 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
488 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
489 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
490 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
491 response = dns.message.make_response(query)
492 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
493 rrset = dns.rrset.from_text(name,
494 3600,
495 dns.rdataclass.IN,
496 dns.rdatatype.A,
497 '127.0.0.1')
498 response.answer.append(rrset)
499
500 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
501 self.assertTrue(receivedQuery)
502 self.assertTrue(receivedResponse)
503 receivedQuery.id = expectedQuery.id
504 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
505 self.checkResponseEDNSWithECS(response, receivedResponse)
506
507 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
508 self.assertTrue(receivedQuery)
509 self.assertTrue(receivedResponse)
510 receivedQuery.id = expectedQuery.id
511 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
512 self.checkResponseEDNSWithECS(response, receivedResponse)
513
514 def testWithEDNSLongerInitialECS(self):
515 """
516 ECS Override: Existing EDNS with ECS (long)
517
518 Send a query with EDNS and a crafted ECS value.
519 Check that the query received by the responder
520 has an overwritten ECS value (not the initial one)
521 and that the response received from dnsdist contains
522 an EDNS pseudo-RR.
523 The initial ECS value is longer than the one it will
524 replaced with.
525 """
526 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
527 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
528 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
529 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
530 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
531 response = dns.message.make_response(query)
532 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
533 rrset = dns.rrset.from_text(name,
534 3600,
535 dns.rdataclass.IN,
536 dns.rdatatype.A,
537 '127.0.0.1')
538 response.answer.append(rrset)
539
540 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
541 self.assertTrue(receivedQuery)
542 self.assertTrue(receivedResponse)
543 receivedQuery.id = expectedQuery.id
544 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
545 self.checkResponseEDNSWithECS(response, receivedResponse)
546
547 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
548 self.assertTrue(receivedQuery)
549 self.assertTrue(receivedResponse)
550 receivedQuery.id = expectedQuery.id
551 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
552 self.checkResponseEDNSWithECS(response, receivedResponse)
553
554 def testWithEDNSSameSizeInitialECS(self):
555 """
556 ECS Override: Existing EDNS with ECS (same)
557
558 Send a query with EDNS and a crafted ECS value.
559 Check that the query received by the responder
560 has an overwritten ECS value (not the initial one)
561 and that the response received from dnsdist contains
562 an EDNS pseudo-RR.
563 The initial ECS value is exactly the same size as
564 the one it will replaced with.
565 """
566 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
567 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
568 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
569 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
570 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
571 response = dns.message.make_response(query)
572 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
573 rrset = dns.rrset.from_text(name,
574 3600,
575 dns.rdataclass.IN,
576 dns.rdatatype.A,
577 '127.0.0.1')
578 response.answer.append(rrset)
579
580 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
581 self.assertTrue(receivedQuery)
582 self.assertTrue(receivedResponse)
583 receivedQuery.id = expectedQuery.id
584 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
585 self.checkResponseEDNSWithECS(response, receivedResponse)
586
587 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
588 self.assertTrue(receivedQuery)
589 self.assertTrue(receivedResponse)
590 receivedQuery.id = expectedQuery.id
591 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
592 self.checkResponseEDNSWithECS(response, receivedResponse)
593
594 class TestECSDisabledByRuleOrLua(TestEdnsClientSubnet):
595 """
596 dnsdist is configured to add the EDNS0 Client Subnet
597 option, but we disable it via DisableECSAction()
598 or Lua.
599 """
600
601 _config_template = """
602 setECSOverride(false)
603 setECSSourcePrefixV4(16)
604 setECSSourcePrefixV6(16)
605 newServer{address="127.0.0.1:%s", useClientSubnet=true}
606 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
607 function disableECSViaLua(dq)
608 dq.useECS = false
609 return DNSAction.None, ""
610 end
611 addLuaAction("disabledvialua.ecsrules.tests.powerdns.com.", disableECSViaLua)
612 """
613
614 def testWithECSNotDisabled(self):
615 """
616 ECS Disable: ECS enabled in the backend
617 """
618 name = 'notdisabled.ecsrules.tests.powerdns.com.'
619 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
620 query = dns.message.make_query(name, 'A', 'IN')
621 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
622 response = dns.message.make_response(expectedQuery)
623 expectedResponse = dns.message.make_response(query)
624 rrset = dns.rrset.from_text(name,
625 3600,
626 dns.rdataclass.IN,
627 dns.rdatatype.AAAA,
628 '::1')
629 response.answer.append(rrset)
630 expectedResponse.answer.append(rrset)
631
632 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
633 self.assertTrue(receivedQuery)
634 self.assertTrue(receivedResponse)
635 receivedQuery.id = expectedQuery.id
636 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
637 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
638
639 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
640 self.assertTrue(receivedQuery)
641 self.assertTrue(receivedResponse)
642 receivedQuery.id = expectedQuery.id
643 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
644 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
645
646 def testWithECSDisabledViaRule(self):
647 """
648 ECS Disable: ECS enabled in the backend, but disabled by a rule
649 """
650 name = 'disabled.ecsrules.tests.powerdns.com.'
651 query = dns.message.make_query(name, 'A', 'IN')
652 response = dns.message.make_response(query)
653 rrset = dns.rrset.from_text(name,
654 3600,
655 dns.rdataclass.IN,
656 dns.rdatatype.A,
657 '127.0.0.1')
658 response.answer.append(rrset)
659
660 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
661 self.assertTrue(receivedQuery)
662 self.assertTrue(receivedResponse)
663 receivedQuery.id = query.id
664 self.checkQueryNoEDNS(query, receivedQuery)
665 self.checkResponseNoEDNS(response, receivedResponse)
666
667 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
668 self.assertTrue(receivedQuery)
669 self.assertTrue(receivedResponse)
670 receivedQuery.id = query.id
671 self.checkQueryNoEDNS(query, receivedQuery)
672 self.checkResponseNoEDNS(response, receivedResponse)
673
674 def testWithECSDisabledViaLua(self):
675 """
676 ECS Disable: ECS enabled in the backend, but disabled via Lua
677 """
678 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
679 query = dns.message.make_query(name, 'A', 'IN')
680 response = dns.message.make_response(query)
681 rrset = dns.rrset.from_text(name,
682 3600,
683 dns.rdataclass.IN,
684 dns.rdatatype.A,
685 '127.0.0.1')
686 response.answer.append(rrset)
687
688 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
689 self.assertTrue(receivedQuery)
690 self.assertTrue(receivedResponse)
691 receivedQuery.id = query.id
692 self.checkQueryNoEDNS(query, receivedQuery)
693 self.checkResponseNoEDNS(response, receivedResponse)
694
695 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
696 self.assertTrue(receivedQuery)
697 self.assertTrue(receivedResponse)
698 receivedQuery.id = query.id
699 self.checkQueryNoEDNS(query, receivedQuery)
700 self.checkResponseNoEDNS(response, receivedResponse)
701
702 class TestECSOverrideSetByRuleOrLua(TestEdnsClientSubnet):
703 """
704 dnsdist is configured to set the EDNS0 Client Subnet
705 option without overriding an existing one, but we
706 force the overriding via ECSOverrideAction() or Lua.
707 """
708
709 _config_template = """
710 setECSOverride(false)
711 setECSSourcePrefixV4(24)
712 setECSSourcePrefixV6(56)
713 newServer{address="127.0.0.1:%s", useClientSubnet=true}
714 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
715 function overrideECSViaLua(dq)
716 dq.ecsOverride = true
717 return DNSAction.None, ""
718 end
719 addLuaAction("overriddenvialua.ecsrules.tests.powerdns.com.", overrideECSViaLua)
720 """
721
722 def testWithECSOverrideNotSet(self):
723 """
724 ECS Override: not set via Lua or a rule
725 """
726 name = 'notoverridden.ecsrules.tests.powerdns.com.'
727 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
728 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
729 response = dns.message.make_response(query)
730 response.use_edns(edns=True, payload=4096, options=[ecso])
731 rrset = dns.rrset.from_text(name,
732 3600,
733 dns.rdataclass.IN,
734 dns.rdatatype.A,
735 '127.0.0.1')
736 response.answer.append(rrset)
737
738 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
739 self.assertTrue(receivedQuery)
740 self.assertTrue(receivedResponse)
741 receivedQuery.id = query.id
742 self.checkQueryEDNSWithECS(query, receivedQuery)
743 self.checkResponseEDNSWithECS(response, receivedResponse)
744
745 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
746 self.assertTrue(receivedQuery)
747 self.assertTrue(receivedResponse)
748 receivedQuery.id = query.id
749 self.checkQueryEDNSWithECS(query, receivedQuery)
750 self.checkResponseEDNSWithECS(response, receivedResponse)
751
752 def testWithECSOverrideSetViaRule(self):
753 """
754 ECS Override: set with a rule
755 """
756 name = 'overridden.ecsrules.tests.powerdns.com.'
757 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
758 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
759 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
760 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
761 response = dns.message.make_response(query)
762 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
763 rrset = dns.rrset.from_text(name,
764 3600,
765 dns.rdataclass.IN,
766 dns.rdatatype.A,
767 '127.0.0.1')
768 response.answer.append(rrset)
769
770 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
771 self.assertTrue(receivedQuery)
772 self.assertTrue(receivedResponse)
773 receivedQuery.id = expectedQuery.id
774 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
775 self.checkResponseEDNSWithECS(response, receivedResponse)
776
777 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
778 self.assertTrue(receivedQuery)
779 self.assertTrue(receivedResponse)
780 receivedQuery.id = expectedQuery.id
781 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
782 self.checkResponseEDNSWithECS(response, receivedResponse)
783
784 def testWithECSOverrideSetViaLua(self):
785 """
786 ECS Override: set via Lua
787 """
788 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
789 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
790 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
791 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
792 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
793 response = dns.message.make_response(query)
794 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
795 rrset = dns.rrset.from_text(name,
796 3600,
797 dns.rdataclass.IN,
798 dns.rdatatype.A,
799 '127.0.0.1')
800 response.answer.append(rrset)
801
802 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
803 self.assertTrue(receivedQuery)
804 self.assertTrue(receivedResponse)
805 receivedQuery.id = expectedQuery.id
806 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
807 self.checkResponseEDNSWithECS(response, receivedResponse)
808
809 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
810 self.assertTrue(receivedQuery)
811 self.assertTrue(receivedResponse)
812 receivedQuery.id = expectedQuery.id
813 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
814 self.checkResponseEDNSWithECS(response, receivedResponse)
815
816 class TestECSPrefixLengthSetByRuleOrLua(TestEdnsClientSubnet):
817 """
818 dnsdist is configured to set the EDNS0 Client Subnet
819 option with a prefix length of 24 for IPv4 and 56 for IPv6,
820 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
821 """
822
823 _config_template = """
824 setECSOverride(false)
825 setECSSourcePrefixV4(24)
826 setECSSourcePrefixV6(56)
827 newServer{address="127.0.0.1:%s", useClientSubnet=true}
828 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
829 function overrideECSPrefixLengthViaLua(dq)
830 dq.ecsPrefixLength = 32
831 return DNSAction.None, ""
832 end
833 addLuaAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", overrideECSPrefixLengthViaLua)
834 """
835
836 def testWithECSPrefixLengthNotOverridden(self):
837 """
838 ECS Prefix Length: not overridden via Lua or a rule
839 """
840 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
841 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
842 query = dns.message.make_query(name, 'A', 'IN')
843 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
844 response = dns.message.make_response(query)
845 response.use_edns(edns=True, payload=4096, options=[ecso])
846 rrset = dns.rrset.from_text(name,
847 3600,
848 dns.rdataclass.IN,
849 dns.rdatatype.A,
850 '127.0.0.1')
851 response.answer.append(rrset)
852 expectedResponse = dns.message.make_response(query)
853 expectedResponse.answer.append(rrset)
854
855 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
856 self.assertTrue(receivedQuery)
857 self.assertTrue(receivedResponse)
858 receivedQuery.id = expectedQuery.id
859 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
860 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
861
862 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
863 self.assertTrue(receivedQuery)
864 self.assertTrue(receivedResponse)
865 receivedQuery.id = expectedQuery.id
866 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
867 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
868
869 def testWithECSPrefixLengthOverriddenViaRule(self):
870 """
871 ECS Prefix Length: overridden with a rule
872 """
873 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
874 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
875 query = dns.message.make_query(name, 'A', 'IN')
876 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
877 response = dns.message.make_response(expectedQuery)
878 rrset = dns.rrset.from_text(name,
879 3600,
880 dns.rdataclass.IN,
881 dns.rdatatype.A,
882 '127.0.0.1')
883 response.answer.append(rrset)
884 expectedResponse = dns.message.make_response(query)
885 expectedResponse.answer.append(rrset)
886
887 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
888 self.assertTrue(receivedQuery)
889 self.assertTrue(receivedResponse)
890 receivedQuery.id = expectedQuery.id
891 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
892 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
893
894 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
895 self.assertTrue(receivedQuery)
896 self.assertTrue(receivedResponse)
897 receivedQuery.id = expectedQuery.id
898 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
899 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
900
901 def testWithECSPrefixLengthOverriddenViaLua(self):
902 """
903 ECS Prefix Length: overridden via Lua
904 """
905 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
906 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
907 query = dns.message.make_query(name, 'A', 'IN')
908 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
909 response = dns.message.make_response(expectedQuery)
910 rrset = dns.rrset.from_text(name,
911 3600,
912 dns.rdataclass.IN,
913 dns.rdatatype.A,
914 '127.0.0.1')
915 response.answer.append(rrset)
916 expectedResponse = dns.message.make_response(query)
917 expectedResponse.answer.append(rrset)
918
919 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
920 self.assertTrue(receivedQuery)
921 self.assertTrue(receivedResponse)
922 receivedQuery.id = expectedQuery.id
923 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
924 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
925
926 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
927 self.assertTrue(receivedQuery)
928 self.assertTrue(receivedResponse)
929 receivedQuery.id = expectedQuery.id
930 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
931 self.checkResponseNoEDNS(expectedResponse, receivedResponse)