]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
dnsdist tests: make py3k compatible and pick py3k if available
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6 from datetime import datetime, timedelta
7
8 class TestEdnsClientSubnetNoOverride(DNSDistTest):
9 """
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
12 original query.
13 """
14
15 _config_template = """
16 truncateTC(true)
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
20 def testWithoutEDNS(self):
21 """
22 ECS: No existing EDNS
23
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
43 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
44 self.assertTrue(receivedQuery)
45 self.assertTrue(receivedResponse)
46 receivedQuery.id = expectedQuery.id
47 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
48 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
49
50 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = expectedQuery.id
54 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
55 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
56
57 def testWithEDNSNoECS(self):
58 """
59 ECS: Existing EDNS without ECS
60
61 Send a query with EDNS but no ECS value.
62 Check that the query received by the responder
63 has a valid ECS value and that the response
64 received from dnsdist contains an EDNS pseudo-RR.
65 """
66 name = 'withednsnoecs.ecs.tests.powerdns.com.'
67 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
68 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
69 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
70 response = dns.message.make_response(expectedQuery)
71 expectedResponse = dns.message.make_response(query)
72 rrset = dns.rrset.from_text(name,
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78 expectedResponse.answer.append(rrset)
79
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 self.assertTrue(receivedQuery)
82 self.assertTrue(receivedResponse)
83 receivedQuery.id = expectedQuery.id
84 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
85 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
86
87 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
88 self.assertTrue(receivedQuery)
89 self.assertTrue(receivedResponse)
90 receivedQuery.id = expectedQuery.id
91 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
92 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
93
94 def testWithEDNSECS(self):
95 """
96 ECS: Existing EDNS with ECS
97
98 Send a query with EDNS and a crafted ECS value.
99 Check that the query received by the responder
100 has the initial ECS value (not overwritten)
101 and that the response received from dnsdist contains
102 an EDNS pseudo-RR.
103 """
104 name = 'withednsecs.ecs.tests.powerdns.com.'
105 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
106 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
107 response = dns.message.make_response(query)
108 rrset = dns.rrset.from_text(name,
109 3600,
110 dns.rdataclass.IN,
111 dns.rdatatype.A,
112 '127.0.0.1')
113 response.answer.append(rrset)
114
115
116 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
117 self.assertTrue(receivedQuery)
118 self.assertTrue(receivedResponse)
119 receivedQuery.id = query.id
120 self.checkQueryEDNSWithECS(query, receivedQuery)
121 self.checkResponseEDNSWithoutECS(response, receivedResponse)
122
123 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
124 self.assertTrue(receivedQuery)
125 self.assertTrue(receivedResponse)
126 receivedQuery.id = query.id
127 self.checkQueryEDNSWithECS(query, receivedQuery)
128 self.checkResponseEDNSWithoutECS(response, receivedResponse)
129
130 def testWithoutEDNSResponseWithECS(self):
131 """
132 ECS: No existing EDNS (BE returning ECS)
133
134 Send a query without EDNS, check that the query
135 received by the responder has the correct ECS value
136 and that the response received from dnsdist does not
137 have an EDNS pseudo-RR.
138 This time the response returned by the backend contains
139 an ECS option with scope set.
140 """
141 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
142 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
143 query = dns.message.make_query(name, 'A', 'IN')
144 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
145 response = dns.message.make_response(expectedQuery)
146 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
147 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
148 expectedResponse = dns.message.make_response(query)
149 rrset = dns.rrset.from_text(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 '127.0.0.1')
154 response.answer.append(rrset)
155 expectedResponse.answer.append(rrset)
156
157 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
158 self.assertTrue(receivedQuery)
159 self.assertTrue(receivedResponse)
160 receivedQuery.id = expectedQuery.id
161 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
162 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
163
164 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
165 self.assertTrue(receivedQuery)
166 self.assertTrue(receivedResponse)
167 receivedQuery.id = expectedQuery.id
168 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
169 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
170
171 def testWithEDNSNoECSResponseWithECS(self):
172 """
173 ECS: Existing EDNS without ECS (BE returning only the ECS option)
174
175 Send a query with EDNS but no ECS value.
176 Check that the query received by the responder
177 has a valid ECS value and that the response
178 received from dnsdist contains an EDNS pseudo-RR.
179 This time the response returned by the backend contains
180 an ECS option with scope set.
181 """
182 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
183 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
184 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
185 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
186 response = dns.message.make_response(expectedQuery)
187 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
188 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
189 expectedResponse = dns.message.make_response(query)
190 rrset = dns.rrset.from_text(name,
191 3600,
192 dns.rdataclass.IN,
193 dns.rdatatype.A,
194 '127.0.0.1')
195 response.answer.append(rrset)
196 expectedResponse.answer.append(rrset)
197
198 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
199 self.assertTrue(receivedQuery)
200 self.assertTrue(receivedResponse)
201 receivedQuery.id = expectedQuery.id
202 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
203 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
204
205 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
206 self.assertTrue(receivedQuery)
207 self.assertTrue(receivedResponse)
208 receivedQuery.id = expectedQuery.id
209 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
210 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
211
212 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
213 """
214 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
215
216 Send a query with EDNS but no ECS value.
217 Check that the query received by the responder
218 has a valid ECS value and that the response
219 received from dnsdist contains an EDNS pseudo-RR.
220 This time the response returned by the backend contains
221 one cookies then one ECS option.
222 """
223 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
224 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
225 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
226 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
227 response = dns.message.make_response(expectedQuery)
228 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
229 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
230 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
231 expectedResponse = dns.message.make_response(query)
232 rrset = dns.rrset.from_text(name,
233 3600,
234 dns.rdataclass.IN,
235 dns.rdatatype.A,
236 '127.0.0.1')
237 response.answer.append(rrset)
238 expectedResponse.answer.append(rrset)
239 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
240
241 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
242 self.assertTrue(receivedQuery)
243 self.assertTrue(receivedResponse)
244 receivedQuery.id = expectedQuery.id
245 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
246 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
247
248 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
249 self.assertTrue(receivedQuery)
250 self.assertTrue(receivedResponse)
251 receivedQuery.id = expectedQuery.id
252 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
253 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
254
255 def testWithEDNSNoECSResponseWithECSThenCookies(self):
256 """
257 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
258
259 Send a query with EDNS but no ECS value.
260 Check that the query received by the responder
261 has a valid ECS value and that the response
262 received from dnsdist contains an EDNS pseudo-RR.
263 This time the response returned by the backend contains
264 one ECS then one Cookies option.
265 """
266 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
267 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
268 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
269 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
270 response = dns.message.make_response(expectedQuery)
271 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
272 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
273 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
274 expectedResponse = dns.message.make_response(query)
275 rrset = dns.rrset.from_text(name,
276 3600,
277 dns.rdataclass.IN,
278 dns.rdatatype.A,
279 '127.0.0.1')
280 response.answer.append(rrset)
281 expectedResponse.answer.append(rrset)
282 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
283
284 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
285 self.assertTrue(receivedQuery)
286 self.assertTrue(receivedResponse)
287 receivedQuery.id = expectedQuery.id
288 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
289 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
290
291 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
292 self.assertTrue(receivedQuery)
293 self.assertTrue(receivedResponse)
294 receivedQuery.id = expectedQuery.id
295 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
296 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
297
298 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
299 """
300 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
301
302 Send a query with EDNS but no ECS value.
303 Check that the query received by the responder
304 has a valid ECS value and that the response
305 received from dnsdist contains an EDNS pseudo-RR.
306 This time the response returned by the backend contains
307 one Cookies, one ECS then one Cookies option.
308 """
309 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
310 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
311 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
312 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
313 response = dns.message.make_response(expectedQuery)
314 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
315 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
316 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
317 expectedResponse = dns.message.make_response(query)
318 rrset = dns.rrset.from_text(name,
319 3600,
320 dns.rdataclass.IN,
321 dns.rdatatype.A,
322 '127.0.0.1')
323 response.answer.append(rrset)
324 expectedResponse.answer.append(rrset)
325
326 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
327 self.assertTrue(receivedQuery)
328 self.assertTrue(receivedResponse)
329 receivedQuery.id = expectedQuery.id
330 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
331 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
332
333 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
334 self.assertTrue(receivedQuery)
335 self.assertTrue(receivedResponse)
336 receivedQuery.id = expectedQuery.id
337 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
338 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
339
340
341 class TestEdnsClientSubnetOverride(DNSDistTest):
342 """
343 dnsdist is configured to add the EDNS0 Client Subnet
344 option, overwriting any existing value.
345 """
346
347 _config_template = """
348 truncateTC(true)
349 setECSOverride(true)
350 setECSSourcePrefixV4(24)
351 setECSSourcePrefixV6(56)
352 newServer{address="127.0.0.1:%s", useClientSubnet=true}
353 """
354
355 def testWithoutEDNS(self):
356 """
357 ECS Override: No existing EDNS
358
359 Send a query without EDNS, check that the query
360 received by the responder has the correct ECS value
361 and that the response received from dnsdist does not
362 have an EDNS pseudo-RR.
363 """
364 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
365 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
366 query = dns.message.make_query(name, 'A', 'IN')
367 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
368 response = dns.message.make_response(expectedQuery)
369 response.use_edns(edns=True, payload=4096, options=[ecso])
370 rrset = dns.rrset.from_text(name,
371 3600,
372 dns.rdataclass.IN,
373 dns.rdatatype.A,
374 '127.0.0.1')
375 response.answer.append(rrset)
376 expectedResponse = dns.message.make_response(query)
377 expectedResponse.answer.append(rrset)
378
379 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
380 self.assertTrue(receivedQuery)
381 self.assertTrue(receivedResponse)
382 receivedQuery.id = expectedQuery.id
383 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
384 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
385
386 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
387 self.assertTrue(receivedQuery)
388 self.assertTrue(receivedResponse)
389 receivedQuery.id = expectedQuery.id
390 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
391 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
392
393 def testWithEDNSNoECS(self):
394 """
395 ECS Override: Existing EDNS without ECS
396
397 Send a query with EDNS but no ECS value.
398 Check that the query received by the responder
399 has a valid ECS value and that the response
400 received from dnsdist contains an EDNS pseudo-RR.
401 """
402 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
403 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
404 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
405 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
406 response = dns.message.make_response(expectedQuery)
407 response.use_edns(edns=True, payload=4096, options=[ecso])
408 rrset = dns.rrset.from_text(name,
409 3600,
410 dns.rdataclass.IN,
411 dns.rdatatype.A,
412 '127.0.0.1')
413 response.answer.append(rrset)
414 expectedResponse = dns.message.make_response(query)
415 expectedResponse.answer.append(rrset)
416
417 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
418 self.assertTrue(receivedQuery)
419 self.assertTrue(receivedResponse)
420 receivedQuery.id = expectedQuery.id
421 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
422 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
423
424 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
425 self.assertTrue(receivedQuery)
426 self.assertTrue(receivedResponse)
427 receivedQuery.id = expectedQuery.id
428 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
429 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
430
431 def testWithEDNSShorterInitialECS(self):
432 """
433 ECS Override: Existing EDNS with ECS (short)
434
435 Send a query with EDNS and a crafted ECS value.
436 Check that the query received by the responder
437 has an overwritten ECS value (not the initial one)
438 and that the response received from dnsdist contains
439 an EDNS pseudo-RR.
440 The initial ECS value is shorter than the one it will be
441 replaced with.
442 """
443 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
444 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
445 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
446 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
447 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
448 response = dns.message.make_response(query)
449 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
450 rrset = dns.rrset.from_text(name,
451 3600,
452 dns.rdataclass.IN,
453 dns.rdatatype.A,
454 '127.0.0.1')
455 response.answer.append(rrset)
456
457 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
458 self.assertTrue(receivedQuery)
459 self.assertTrue(receivedResponse)
460 receivedQuery.id = expectedQuery.id
461 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
462 self.checkResponseEDNSWithECS(response, receivedResponse)
463
464 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
465 self.assertTrue(receivedQuery)
466 self.assertTrue(receivedResponse)
467 receivedQuery.id = expectedQuery.id
468 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
469 self.checkResponseEDNSWithECS(response, receivedResponse)
470
471 def testWithEDNSLongerInitialECS(self):
472 """
473 ECS Override: Existing EDNS with ECS (long)
474
475 Send a query with EDNS and a crafted ECS value.
476 Check that the query received by the responder
477 has an overwritten ECS value (not the initial one)
478 and that the response received from dnsdist contains
479 an EDNS pseudo-RR.
480 The initial ECS value is longer than the one it will
481 replaced with.
482 """
483 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
484 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
485 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
486 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
487 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
488 response = dns.message.make_response(query)
489 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
490 rrset = dns.rrset.from_text(name,
491 3600,
492 dns.rdataclass.IN,
493 dns.rdatatype.A,
494 '127.0.0.1')
495 response.answer.append(rrset)
496
497 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
498 self.assertTrue(receivedQuery)
499 self.assertTrue(receivedResponse)
500 receivedQuery.id = expectedQuery.id
501 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
502 self.checkResponseEDNSWithECS(response, receivedResponse)
503
504 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
505 self.assertTrue(receivedQuery)
506 self.assertTrue(receivedResponse)
507 receivedQuery.id = expectedQuery.id
508 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
509 self.checkResponseEDNSWithECS(response, receivedResponse)
510
511 def testWithEDNSSameSizeInitialECS(self):
512 """
513 ECS Override: Existing EDNS with ECS (same)
514
515 Send a query with EDNS and a crafted ECS value.
516 Check that the query received by the responder
517 has an overwritten ECS value (not the initial one)
518 and that the response received from dnsdist contains
519 an EDNS pseudo-RR.
520 The initial ECS value is exactly the same size as
521 the one it will replaced with.
522 """
523 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
524 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
525 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
526 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
527 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
528 response = dns.message.make_response(query)
529 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
530 rrset = dns.rrset.from_text(name,
531 3600,
532 dns.rdataclass.IN,
533 dns.rdatatype.A,
534 '127.0.0.1')
535 response.answer.append(rrset)
536
537 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
538 self.assertTrue(receivedQuery)
539 self.assertTrue(receivedResponse)
540 receivedQuery.id = expectedQuery.id
541 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
542 self.checkResponseEDNSWithECS(response, receivedResponse)
543
544 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
545 self.assertTrue(receivedQuery)
546 self.assertTrue(receivedResponse)
547 receivedQuery.id = expectedQuery.id
548 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
549 self.checkResponseEDNSWithECS(response, receivedResponse)
550
551 class TestECSDisabledByRuleOrLua(DNSDistTest):
552 """
553 dnsdist is configured to add the EDNS0 Client Subnet
554 option, but we disable it via DisableECSAction()
555 or Lua.
556 """
557
558 _config_template = """
559 setECSOverride(false)
560 setECSSourcePrefixV4(16)
561 setECSSourcePrefixV6(16)
562 newServer{address="127.0.0.1:%s", useClientSubnet=true}
563 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
564 function disableECSViaLua(dq)
565 dq.useECS = false
566 return DNSAction.None, ""
567 end
568 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
569 """
570
571 def testWithECSNotDisabled(self):
572 """
573 ECS Disable: ECS enabled in the backend
574 """
575 name = 'notdisabled.ecsrules.tests.powerdns.com.'
576 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
577 query = dns.message.make_query(name, 'A', 'IN')
578 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
579 response = dns.message.make_response(expectedQuery)
580 expectedResponse = dns.message.make_response(query)
581 rrset = dns.rrset.from_text(name,
582 3600,
583 dns.rdataclass.IN,
584 dns.rdatatype.AAAA,
585 '::1')
586 response.answer.append(rrset)
587 expectedResponse.answer.append(rrset)
588
589 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
590 self.assertTrue(receivedQuery)
591 self.assertTrue(receivedResponse)
592 receivedQuery.id = expectedQuery.id
593 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
594 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
595
596 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
597 self.assertTrue(receivedQuery)
598 self.assertTrue(receivedResponse)
599 receivedQuery.id = expectedQuery.id
600 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
601 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
602
603 def testWithECSDisabledViaRule(self):
604 """
605 ECS Disable: ECS enabled in the backend, but disabled by a rule
606 """
607 name = 'disabled.ecsrules.tests.powerdns.com.'
608 query = dns.message.make_query(name, 'A', 'IN')
609 response = dns.message.make_response(query)
610 rrset = dns.rrset.from_text(name,
611 3600,
612 dns.rdataclass.IN,
613 dns.rdatatype.A,
614 '127.0.0.1')
615 response.answer.append(rrset)
616
617 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
618 self.assertTrue(receivedQuery)
619 self.assertTrue(receivedResponse)
620 receivedQuery.id = query.id
621 self.checkQueryNoEDNS(query, receivedQuery)
622 self.checkResponseNoEDNS(response, receivedResponse)
623
624 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
625 self.assertTrue(receivedQuery)
626 self.assertTrue(receivedResponse)
627 receivedQuery.id = query.id
628 self.checkQueryNoEDNS(query, receivedQuery)
629 self.checkResponseNoEDNS(response, receivedResponse)
630
631 def testWithECSDisabledViaLua(self):
632 """
633 ECS Disable: ECS enabled in the backend, but disabled via Lua
634 """
635 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
636 query = dns.message.make_query(name, 'A', 'IN')
637 response = dns.message.make_response(query)
638 rrset = dns.rrset.from_text(name,
639 3600,
640 dns.rdataclass.IN,
641 dns.rdatatype.A,
642 '127.0.0.1')
643 response.answer.append(rrset)
644
645 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
646 self.assertTrue(receivedQuery)
647 self.assertTrue(receivedResponse)
648 receivedQuery.id = query.id
649 self.checkQueryNoEDNS(query, receivedQuery)
650 self.checkResponseNoEDNS(response, receivedResponse)
651
652 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
653 self.assertTrue(receivedQuery)
654 self.assertTrue(receivedResponse)
655 receivedQuery.id = query.id
656 self.checkQueryNoEDNS(query, receivedQuery)
657 self.checkResponseNoEDNS(response, receivedResponse)
658
659 class TestECSOverrideSetByRuleOrLua(DNSDistTest):
660 """
661 dnsdist is configured to set the EDNS0 Client Subnet
662 option without overriding an existing one, but we
663 force the overriding via ECSOverrideAction() or Lua.
664 """
665
666 _config_template = """
667 setECSOverride(false)
668 setECSSourcePrefixV4(24)
669 setECSSourcePrefixV6(56)
670 newServer{address="127.0.0.1:%s", useClientSubnet=true}
671 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
672 function overrideECSViaLua(dq)
673 dq.ecsOverride = true
674 return DNSAction.None, ""
675 end
676 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
677 """
678
679 def testWithECSOverrideNotSet(self):
680 """
681 ECS Override: not set via Lua or a rule
682 """
683 name = 'notoverridden.ecsrules.tests.powerdns.com.'
684 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
685 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
686 response = dns.message.make_response(query)
687 response.use_edns(edns=True, payload=4096, options=[ecso])
688 rrset = dns.rrset.from_text(name,
689 3600,
690 dns.rdataclass.IN,
691 dns.rdatatype.A,
692 '127.0.0.1')
693 response.answer.append(rrset)
694
695 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
696 self.assertTrue(receivedQuery)
697 self.assertTrue(receivedResponse)
698 receivedQuery.id = query.id
699 self.checkQueryEDNSWithECS(query, receivedQuery)
700 self.checkResponseEDNSWithECS(response, receivedResponse)
701
702 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
703 self.assertTrue(receivedQuery)
704 self.assertTrue(receivedResponse)
705 receivedQuery.id = query.id
706 self.checkQueryEDNSWithECS(query, receivedQuery)
707 self.checkResponseEDNSWithECS(response, receivedResponse)
708
709 def testWithECSOverrideSetViaRule(self):
710 """
711 ECS Override: set with a rule
712 """
713 name = 'overridden.ecsrules.tests.powerdns.com.'
714 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
715 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
716 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
717 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
718 response = dns.message.make_response(query)
719 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
720 rrset = dns.rrset.from_text(name,
721 3600,
722 dns.rdataclass.IN,
723 dns.rdatatype.A,
724 '127.0.0.1')
725 response.answer.append(rrset)
726
727 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
728 self.assertTrue(receivedQuery)
729 self.assertTrue(receivedResponse)
730 receivedQuery.id = expectedQuery.id
731 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
732 self.checkResponseEDNSWithECS(response, receivedResponse)
733
734 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
735 self.assertTrue(receivedQuery)
736 self.assertTrue(receivedResponse)
737 receivedQuery.id = expectedQuery.id
738 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
739 self.checkResponseEDNSWithECS(response, receivedResponse)
740
741 def testWithECSOverrideSetViaLua(self):
742 """
743 ECS Override: set via Lua
744 """
745 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
746 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
747 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
748 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
749 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
750 response = dns.message.make_response(query)
751 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
752 rrset = dns.rrset.from_text(name,
753 3600,
754 dns.rdataclass.IN,
755 dns.rdatatype.A,
756 '127.0.0.1')
757 response.answer.append(rrset)
758
759 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
760 self.assertTrue(receivedQuery)
761 self.assertTrue(receivedResponse)
762 receivedQuery.id = expectedQuery.id
763 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
764 self.checkResponseEDNSWithECS(response, receivedResponse)
765
766 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
767 self.assertTrue(receivedQuery)
768 self.assertTrue(receivedResponse)
769 receivedQuery.id = expectedQuery.id
770 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
771 self.checkResponseEDNSWithECS(response, receivedResponse)
772
773 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
774 """
775 dnsdist is configured to set the EDNS0 Client Subnet
776 option with a prefix length of 24 for IPv4 and 56 for IPv6,
777 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
778 """
779
780 _config_template = """
781 setECSOverride(false)
782 setECSSourcePrefixV4(24)
783 setECSSourcePrefixV6(56)
784 newServer{address="127.0.0.1:%s", useClientSubnet=true}
785 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
786 function overrideECSPrefixLengthViaLua(dq)
787 dq.ecsPrefixLength = 32
788 return DNSAction.None, ""
789 end
790 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
791 """
792
793 def testWithECSPrefixLengthNotOverridden(self):
794 """
795 ECS Prefix Length: not overridden via Lua or a rule
796 """
797 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
798 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
799 query = dns.message.make_query(name, 'A', 'IN')
800 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
801 response = dns.message.make_response(query)
802 response.use_edns(edns=True, payload=4096, options=[ecso])
803 rrset = dns.rrset.from_text(name,
804 3600,
805 dns.rdataclass.IN,
806 dns.rdatatype.A,
807 '127.0.0.1')
808 response.answer.append(rrset)
809 expectedResponse = dns.message.make_response(query)
810 expectedResponse.answer.append(rrset)
811
812 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
813 self.assertTrue(receivedQuery)
814 self.assertTrue(receivedResponse)
815 receivedQuery.id = expectedQuery.id
816 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
817 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
818
819 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
820 self.assertTrue(receivedQuery)
821 self.assertTrue(receivedResponse)
822 receivedQuery.id = expectedQuery.id
823 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
824 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
825
826 def testWithECSPrefixLengthOverriddenViaRule(self):
827 """
828 ECS Prefix Length: overridden with a rule
829 """
830 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
831 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
832 query = dns.message.make_query(name, 'A', 'IN')
833 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
834 response = dns.message.make_response(expectedQuery)
835 rrset = dns.rrset.from_text(name,
836 3600,
837 dns.rdataclass.IN,
838 dns.rdatatype.A,
839 '127.0.0.1')
840 response.answer.append(rrset)
841 expectedResponse = dns.message.make_response(query)
842 expectedResponse.answer.append(rrset)
843
844 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
845 self.assertTrue(receivedQuery)
846 self.assertTrue(receivedResponse)
847 receivedQuery.id = expectedQuery.id
848 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
849 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
850
851 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
852 self.assertTrue(receivedQuery)
853 self.assertTrue(receivedResponse)
854 receivedQuery.id = expectedQuery.id
855 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
856 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
857
858 def testWithECSPrefixLengthOverriddenViaLua(self):
859 """
860 ECS Prefix Length: overridden via Lua
861 """
862 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
863 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
864 query = dns.message.make_query(name, 'A', 'IN')
865 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
866 response = dns.message.make_response(expectedQuery)
867 rrset = dns.rrset.from_text(name,
868 3600,
869 dns.rdataclass.IN,
870 dns.rdatatype.A,
871 '127.0.0.1')
872 response.answer.append(rrset)
873 expectedResponse = dns.message.make_response(query)
874 expectedResponse.answer.append(rrset)
875
876 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
877 self.assertTrue(receivedQuery)
878 self.assertTrue(receivedResponse)
879 receivedQuery.id = expectedQuery.id
880 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
881 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
882
883 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
884 self.assertTrue(receivedQuery)
885 self.assertTrue(receivedResponse)
886 receivedQuery.id = expectedQuery.id
887 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
888 self.checkResponseNoEDNS(expectedResponse, receivedResponse)