]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6 from datetime import datetime, timedelta
7
8 class TestEdnsClientSubnetNoOverride(DNSDistTest):
9 """
10 dnsdist is configured to add the EDNS0 Client Subnet
11 option, but only if it's not already present in the
12 original query.
13 """
14
15 _config_template = """
16 truncateTC(true)
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
20 def testWithoutEDNS(self):
21 """
22 ECS: No existing EDNS
23
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
43 for method in ("sendUDPQuery", "sendTCPQuery"):
44 sender = getattr(self, method)
45 (receivedQuery, receivedResponse) = sender(query, response)
46 self.assertTrue(receivedQuery)
47 self.assertTrue(receivedResponse)
48 receivedQuery.id = expectedQuery.id
49 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
50 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
51
52 def testWithEDNSNoECS(self):
53 """
54 ECS: Existing EDNS without ECS
55
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
60 """
61 name = 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
63 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
64 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
65 response = dns.message.make_response(expectedQuery)
66 expectedResponse = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73 expectedResponse.answer.append(rrset)
74
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = expectedQuery.id
81 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
82 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
83
84 def testWithEDNSECS(self):
85 """
86 ECS: Existing EDNS with ECS
87
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
92 an EDNS pseudo-RR.
93 """
94 name = 'withednsecs.ecs.tests.powerdns.com.'
95 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
96 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
97 response = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '127.0.0.1')
103 response.answer.append(rrset)
104
105
106 for method in ("sendUDPQuery", "sendTCPQuery"):
107 sender = getattr(self, method)
108 (receivedQuery, receivedResponse) = sender(query, response)
109 self.assertTrue(receivedQuery)
110 self.assertTrue(receivedResponse)
111 receivedQuery.id = query.id
112 self.checkQueryEDNSWithECS(query, receivedQuery)
113 self.checkResponseEDNSWithoutECS(response, receivedResponse)
114
115 def testWithoutEDNSResponseWithECS(self):
116 """
117 ECS: No existing EDNS (BE returning ECS)
118
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
125 """
126 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
128 query = dns.message.make_query(name, 'A', 'IN')
129 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
130 response = dns.message.make_response(expectedQuery)
131 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
132 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
133 expectedResponse = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 3600,
136 dns.rdataclass.IN,
137 dns.rdatatype.A,
138 '127.0.0.1')
139 response.answer.append(rrset)
140 expectedResponse.answer.append(rrset)
141
142 for method in ("sendUDPQuery", "sendTCPQuery"):
143 sender = getattr(self, method)
144 (receivedQuery, receivedResponse) = sender(query, response)
145 self.assertTrue(receivedQuery)
146 self.assertTrue(receivedResponse)
147 receivedQuery.id = expectedQuery.id
148 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
149 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
150
151 def testWithEDNSNoECSResponseWithECS(self):
152 """
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
154
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
161 """
162 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
165 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
166 response = dns.message.make_response(expectedQuery)
167 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
168 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
169 expectedResponse = dns.message.make_response(query, our_payload=4096)
170 rrset = dns.rrset.from_text(name,
171 3600,
172 dns.rdataclass.IN,
173 dns.rdatatype.A,
174 '127.0.0.1')
175 response.answer.append(rrset)
176 expectedResponse.answer.append(rrset)
177
178 for method in ("sendUDPQuery", "sendTCPQuery"):
179 sender = getattr(self, method)
180 (receivedQuery, receivedResponse) = sender(query, response)
181 self.assertTrue(receivedQuery)
182 self.assertTrue(receivedResponse)
183 receivedQuery.id = expectedQuery.id
184 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
185 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
186
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
188 """
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
190
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
197 """
198 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
201 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
202 response = dns.message.make_response(expectedQuery)
203 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
204 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
205 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
206 expectedResponse = dns.message.make_response(query)
207 rrset = dns.rrset.from_text(name,
208 3600,
209 dns.rdataclass.IN,
210 dns.rdatatype.A,
211 '127.0.0.1')
212 response.answer.append(rrset)
213 expectedResponse.answer.append(rrset)
214 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
215
216 for method in ("sendUDPQuery", "sendTCPQuery"):
217 sender = getattr(self, method)
218 (receivedQuery, receivedResponse) = sender(query, response)
219 self.assertTrue(receivedQuery)
220 self.assertTrue(receivedResponse)
221 receivedQuery.id = expectedQuery.id
222 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
223 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
224
225 def testWithEDNSNoECSResponseWithECSThenCookies(self):
226 """
227 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
228
229 Send a query with EDNS but no ECS value.
230 Check that the query received by the responder
231 has a valid ECS value and that the response
232 received from dnsdist contains an EDNS pseudo-RR.
233 This time the response returned by the backend contains
234 one ECS then one Cookies option.
235 """
236 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
237 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
238 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
239 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
240 response = dns.message.make_response(expectedQuery)
241 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
242 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
243 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
244 expectedResponse = dns.message.make_response(query, our_payload=4096)
245 rrset = dns.rrset.from_text(name,
246 3600,
247 dns.rdataclass.IN,
248 dns.rdatatype.A,
249 '127.0.0.1')
250 response.answer.append(rrset)
251 expectedResponse.answer.append(rrset)
252 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
253
254 for method in ("sendUDPQuery", "sendTCPQuery"):
255 sender = getattr(self, method)
256 (receivedQuery, receivedResponse) = sender(query, response)
257 self.assertTrue(receivedQuery)
258 self.assertTrue(receivedResponse)
259 receivedQuery.id = expectedQuery.id
260 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
261 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
262
263 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
264 """
265 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
266
267 Send a query with EDNS but no ECS value.
268 Check that the query received by the responder
269 has a valid ECS value and that the response
270 received from dnsdist contains an EDNS pseudo-RR.
271 This time the response returned by the backend contains
272 one Cookies, one ECS then one Cookies option.
273 """
274 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
275 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
276 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
277 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
278 response = dns.message.make_response(expectedQuery)
279 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
280 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
281 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
282 expectedResponse = dns.message.make_response(query, our_payload=4096)
283 rrset = dns.rrset.from_text(name,
284 3600,
285 dns.rdataclass.IN,
286 dns.rdatatype.A,
287 '127.0.0.1')
288 response.answer.append(rrset)
289 expectedResponse.answer.append(rrset)
290
291 for method in ("sendUDPQuery", "sendTCPQuery"):
292 sender = getattr(self, method)
293 (receivedQuery, receivedResponse) = sender(query, response)
294 self.assertTrue(receivedQuery)
295 self.assertTrue(receivedResponse)
296 receivedQuery.id = expectedQuery.id
297 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
298 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
299
300 class TestEdnsClientSubnetOverride(DNSDistTest):
301 """
302 dnsdist is configured to add the EDNS0 Client Subnet
303 option, overwriting any existing value.
304 """
305
306 _config_template = """
307 truncateTC(true)
308 setECSOverride(true)
309 setECSSourcePrefixV4(24)
310 setECSSourcePrefixV6(56)
311 newServer{address="127.0.0.1:%s", useClientSubnet=true}
312 """
313
314 def testWithoutEDNS(self):
315 """
316 ECS Override: No existing EDNS
317
318 Send a query without EDNS, check that the query
319 received by the responder has the correct ECS value
320 and that the response received from dnsdist does not
321 have an EDNS pseudo-RR.
322 """
323 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
324 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
325 query = dns.message.make_query(name, 'A', 'IN')
326 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
327 response = dns.message.make_response(expectedQuery)
328 response.use_edns(edns=True, payload=4096, options=[ecso])
329 rrset = dns.rrset.from_text(name,
330 3600,
331 dns.rdataclass.IN,
332 dns.rdatatype.A,
333 '127.0.0.1')
334 response.answer.append(rrset)
335 expectedResponse = dns.message.make_response(query)
336 expectedResponse.answer.append(rrset)
337
338 for method in ("sendUDPQuery", "sendTCPQuery"):
339 sender = getattr(self, method)
340 (receivedQuery, receivedResponse) = sender(query, response)
341 self.assertTrue(receivedQuery)
342 self.assertTrue(receivedResponse)
343 receivedQuery.id = expectedQuery.id
344 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
345 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
346
347 def testWithEDNSNoECS(self):
348 """
349 ECS Override: Existing EDNS without ECS
350
351 Send a query with EDNS but no ECS value.
352 Check that the query received by the responder
353 has a valid ECS value and that the response
354 received from dnsdist contains an EDNS pseudo-RR.
355 """
356 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
357 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
358 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
359 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
360 response = dns.message.make_response(expectedQuery)
361 response.use_edns(edns=True, payload=4096, options=[ecso])
362 rrset = dns.rrset.from_text(name,
363 3600,
364 dns.rdataclass.IN,
365 dns.rdatatype.A,
366 '127.0.0.1')
367 response.answer.append(rrset)
368 expectedResponse = dns.message.make_response(query, our_payload=4096)
369 expectedResponse.answer.append(rrset)
370
371 for method in ("sendUDPQuery", "sendTCPQuery"):
372 sender = getattr(self, method)
373 (receivedQuery, receivedResponse) = sender(query, response)
374 self.assertTrue(receivedQuery)
375 self.assertTrue(receivedResponse)
376 receivedQuery.id = expectedQuery.id
377 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
378 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
379
380 def testWithEDNSShorterInitialECS(self):
381 """
382 ECS Override: Existing EDNS with ECS (short)
383
384 Send a query with EDNS and a crafted ECS value.
385 Check that the query received by the responder
386 has an overwritten ECS value (not the initial one)
387 and that the response received from dnsdist contains
388 an EDNS pseudo-RR.
389 The initial ECS value is shorter than the one it will be
390 replaced with.
391 """
392 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
393 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
394 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
395 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
396 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
397 response = dns.message.make_response(query)
398 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
399 rrset = dns.rrset.from_text(name,
400 3600,
401 dns.rdataclass.IN,
402 dns.rdatatype.A,
403 '127.0.0.1')
404 response.answer.append(rrset)
405
406 for method in ("sendUDPQuery", "sendTCPQuery"):
407 sender = getattr(self, method)
408 (receivedQuery, receivedResponse) = sender(query, response)
409 self.assertTrue(receivedQuery)
410 self.assertTrue(receivedResponse)
411 receivedQuery.id = expectedQuery.id
412 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
413 self.checkResponseEDNSWithECS(response, receivedResponse)
414
415 def testWithEDNSLongerInitialECS(self):
416 """
417 ECS Override: Existing EDNS with ECS (long)
418
419 Send a query with EDNS and a crafted ECS value.
420 Check that the query received by the responder
421 has an overwritten ECS value (not the initial one)
422 and that the response received from dnsdist contains
423 an EDNS pseudo-RR.
424 The initial ECS value is longer than the one it will
425 replaced with.
426 """
427 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
428 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
429 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
430 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
431 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
432 response = dns.message.make_response(query)
433 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
434 rrset = dns.rrset.from_text(name,
435 3600,
436 dns.rdataclass.IN,
437 dns.rdatatype.A,
438 '127.0.0.1')
439 response.answer.append(rrset)
440
441 for method in ("sendUDPQuery", "sendTCPQuery"):
442 sender = getattr(self, method)
443 (receivedQuery, receivedResponse) = sender(query, response)
444 self.assertTrue(receivedQuery)
445 self.assertTrue(receivedResponse)
446 receivedQuery.id = expectedQuery.id
447 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
448 self.checkResponseEDNSWithECS(response, receivedResponse)
449
450 def testWithEDNSSameSizeInitialECS(self):
451 """
452 ECS Override: Existing EDNS with ECS (same)
453
454 Send a query with EDNS and a crafted ECS value.
455 Check that the query received by the responder
456 has an overwritten ECS value (not the initial one)
457 and that the response received from dnsdist contains
458 an EDNS pseudo-RR.
459 The initial ECS value is exactly the same size as
460 the one it will replaced with.
461 """
462 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
463 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
464 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
465 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
466 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
467 response = dns.message.make_response(query)
468 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
469 rrset = dns.rrset.from_text(name,
470 3600,
471 dns.rdataclass.IN,
472 dns.rdatatype.A,
473 '127.0.0.1')
474 response.answer.append(rrset)
475
476 for method in ("sendUDPQuery", "sendTCPQuery"):
477 sender = getattr(self, method)
478 (receivedQuery, receivedResponse) = sender(query, response)
479 self.assertTrue(receivedQuery)
480 self.assertTrue(receivedResponse)
481 receivedQuery.id = expectedQuery.id
482 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
483 self.checkResponseEDNSWithECS(response, receivedResponse)
484
485 class TestECSDisabledByRuleOrLua(DNSDistTest):
486 """
487 dnsdist is configured to add the EDNS0 Client Subnet
488 option, but we disable it via DisableECSAction()
489 or Lua.
490 """
491
492 _config_template = """
493 setECSOverride(false)
494 setECSSourcePrefixV4(16)
495 setECSSourcePrefixV6(16)
496 newServer{address="127.0.0.1:%s", useClientSubnet=true}
497 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
498 function disableECSViaLua(dq)
499 dq.useECS = false
500 return DNSAction.None, ""
501 end
502 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
503 """
504
505 def testWithECSNotDisabled(self):
506 """
507 ECS Disable: ECS enabled in the backend
508 """
509 name = 'notdisabled.ecsrules.tests.powerdns.com.'
510 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
511 query = dns.message.make_query(name, 'A', 'IN')
512 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
513 response = dns.message.make_response(expectedQuery)
514 expectedResponse = dns.message.make_response(query)
515 rrset = dns.rrset.from_text(name,
516 3600,
517 dns.rdataclass.IN,
518 dns.rdatatype.AAAA,
519 '::1')
520 response.answer.append(rrset)
521 expectedResponse.answer.append(rrset)
522
523 for method in ("sendUDPQuery", "sendTCPQuery"):
524 sender = getattr(self, method)
525 (receivedQuery, receivedResponse) = sender(query, response)
526 self.assertTrue(receivedQuery)
527 self.assertTrue(receivedResponse)
528 receivedQuery.id = expectedQuery.id
529 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
530 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
531
532 def testWithECSDisabledViaRule(self):
533 """
534 ECS Disable: ECS enabled in the backend, but disabled by a rule
535 """
536 name = 'disabled.ecsrules.tests.powerdns.com.'
537 query = dns.message.make_query(name, 'A', 'IN')
538 response = dns.message.make_response(query)
539 rrset = dns.rrset.from_text(name,
540 3600,
541 dns.rdataclass.IN,
542 dns.rdatatype.A,
543 '127.0.0.1')
544 response.answer.append(rrset)
545
546 for method in ("sendUDPQuery", "sendTCPQuery"):
547 sender = getattr(self, method)
548 (receivedQuery, receivedResponse) = sender(query, response)
549 self.assertTrue(receivedQuery)
550 self.assertTrue(receivedResponse)
551 receivedQuery.id = query.id
552 self.checkQueryNoEDNS(query, receivedQuery)
553 self.checkResponseNoEDNS(response, receivedResponse)
554
555 def testWithECSDisabledViaLua(self):
556 """
557 ECS Disable: ECS enabled in the backend, but disabled via Lua
558 """
559 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
560 query = dns.message.make_query(name, 'A', 'IN')
561 response = dns.message.make_response(query)
562 rrset = dns.rrset.from_text(name,
563 3600,
564 dns.rdataclass.IN,
565 dns.rdatatype.A,
566 '127.0.0.1')
567 response.answer.append(rrset)
568
569 for method in ("sendUDPQuery", "sendTCPQuery"):
570 sender = getattr(self, method)
571 (receivedQuery, receivedResponse) = sender(query, response)
572 self.assertTrue(receivedQuery)
573 self.assertTrue(receivedResponse)
574 receivedQuery.id = query.id
575 self.checkQueryNoEDNS(query, receivedQuery)
576 self.checkResponseNoEDNS(response, receivedResponse)
577
578 class TestECSOverrideSetByRuleOrLua(DNSDistTest):
579 """
580 dnsdist is configured to set the EDNS0 Client Subnet
581 option without overriding an existing one, but we
582 force the overriding via ECSOverrideAction() or Lua.
583 """
584
585 _config_template = """
586 setECSOverride(false)
587 setECSSourcePrefixV4(24)
588 setECSSourcePrefixV6(56)
589 newServer{address="127.0.0.1:%s", useClientSubnet=true}
590 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
591 function overrideECSViaLua(dq)
592 dq.ecsOverride = true
593 return DNSAction.None, ""
594 end
595 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
596 """
597
598 def testWithECSOverrideNotSet(self):
599 """
600 ECS Override: not set via Lua or a rule
601 """
602 name = 'notoverridden.ecsrules.tests.powerdns.com.'
603 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
604 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
605 response = dns.message.make_response(query)
606 response.use_edns(edns=True, payload=4096, options=[ecso])
607 rrset = dns.rrset.from_text(name,
608 3600,
609 dns.rdataclass.IN,
610 dns.rdatatype.A,
611 '127.0.0.1')
612 response.answer.append(rrset)
613
614 for method in ("sendUDPQuery", "sendTCPQuery"):
615 sender = getattr(self, method)
616 (receivedQuery, receivedResponse) = sender(query, response)
617 self.assertTrue(receivedQuery)
618 self.assertTrue(receivedResponse)
619 receivedQuery.id = query.id
620 self.checkQueryEDNSWithECS(query, receivedQuery)
621 self.checkResponseEDNSWithECS(response, receivedResponse)
622
623 def testWithECSOverrideSetViaRule(self):
624 """
625 ECS Override: set with a rule
626 """
627 name = 'overridden.ecsrules.tests.powerdns.com.'
628 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
629 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
630 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
631 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
632 response = dns.message.make_response(query)
633 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
634 rrset = dns.rrset.from_text(name,
635 3600,
636 dns.rdataclass.IN,
637 dns.rdatatype.A,
638 '127.0.0.1')
639 response.answer.append(rrset)
640
641 for method in ("sendUDPQuery", "sendTCPQuery"):
642 sender = getattr(self, method)
643 (receivedQuery, receivedResponse) = sender(query, response)
644 self.assertTrue(receivedQuery)
645 self.assertTrue(receivedResponse)
646 receivedQuery.id = expectedQuery.id
647 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
648 self.checkResponseEDNSWithECS(response, receivedResponse)
649
650 def testWithECSOverrideSetViaLua(self):
651 """
652 ECS Override: set via Lua
653 """
654 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
655 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
656 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
657 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
658 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
659 response = dns.message.make_response(query)
660 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
661 rrset = dns.rrset.from_text(name,
662 3600,
663 dns.rdataclass.IN,
664 dns.rdatatype.A,
665 '127.0.0.1')
666 response.answer.append(rrset)
667
668 for method in ("sendUDPQuery", "sendTCPQuery"):
669 sender = getattr(self, method)
670 (receivedQuery, receivedResponse) = sender(query, response)
671 self.assertTrue(receivedQuery)
672 self.assertTrue(receivedResponse)
673 receivedQuery.id = expectedQuery.id
674 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
675 self.checkResponseEDNSWithECS(response, receivedResponse)
676
677 class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
678 """
679 dnsdist is configured to set the EDNS0 Client Subnet
680 option with a prefix length of 24 for IPv4 and 56 for IPv6,
681 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
682 """
683
684 _config_template = """
685 setECSOverride(false)
686 setECSSourcePrefixV4(24)
687 setECSSourcePrefixV6(56)
688 newServer{address="127.0.0.1:%s", useClientSubnet=true}
689 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
690 function overrideECSPrefixLengthViaLua(dq)
691 dq.ecsPrefixLength = 32
692 return DNSAction.None, ""
693 end
694 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
695 """
696
697 def testWithECSPrefixLengthNotOverridden(self):
698 """
699 ECS Prefix Length: not overridden via Lua or a rule
700 """
701 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
702 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
703 query = dns.message.make_query(name, 'A', 'IN')
704 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
705 response = dns.message.make_response(query)
706 response.use_edns(edns=True, payload=4096, options=[ecso])
707 rrset = dns.rrset.from_text(name,
708 3600,
709 dns.rdataclass.IN,
710 dns.rdatatype.A,
711 '127.0.0.1')
712 response.answer.append(rrset)
713 expectedResponse = dns.message.make_response(query)
714 expectedResponse.answer.append(rrset)
715
716 for method in ("sendUDPQuery", "sendTCPQuery"):
717 sender = getattr(self, method)
718 (receivedQuery, receivedResponse) = sender(query, response)
719 self.assertTrue(receivedQuery)
720 self.assertTrue(receivedResponse)
721 receivedQuery.id = expectedQuery.id
722 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
723 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
724
725 def testWithECSPrefixLengthOverriddenViaRule(self):
726 """
727 ECS Prefix Length: overridden with a rule
728 """
729 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
730 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
731 query = dns.message.make_query(name, 'A', 'IN')
732 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
733 response = dns.message.make_response(expectedQuery)
734 rrset = dns.rrset.from_text(name,
735 3600,
736 dns.rdataclass.IN,
737 dns.rdatatype.A,
738 '127.0.0.1')
739 response.answer.append(rrset)
740 expectedResponse = dns.message.make_response(query)
741 expectedResponse.answer.append(rrset)
742
743 for method in ("sendUDPQuery", "sendTCPQuery"):
744 sender = getattr(self, method)
745 (receivedQuery, receivedResponse) = sender(query, response)
746 self.assertTrue(receivedQuery)
747 self.assertTrue(receivedResponse)
748 receivedQuery.id = expectedQuery.id
749 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
750 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
751
752 def testWithECSPrefixLengthOverriddenViaLua(self):
753 """
754 ECS Prefix Length: overridden via Lua
755 """
756 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
757 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
758 query = dns.message.make_query(name, 'A', 'IN')
759 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
760 response = dns.message.make_response(expectedQuery)
761 rrset = dns.rrset.from_text(name,
762 3600,
763 dns.rdataclass.IN,
764 dns.rdatatype.A,
765 '127.0.0.1')
766 response.answer.append(rrset)
767 expectedResponse = dns.message.make_response(query)
768 expectedResponse.answer.append(rrset)
769
770 for method in ("sendUDPQuery", "sendTCPQuery"):
771 sender = getattr(self, method)
772 (receivedQuery, receivedResponse) = sender(query, response)
773 self.assertTrue(receivedQuery)
774 self.assertTrue(receivedResponse)
775 receivedQuery.id = expectedQuery.id
776 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
777 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
778
779 class TestECSPrefixSetByRule(DNSDistTest):
780 """
781 dnsdist is configured to set the EDNS0 Client Subnet
782 option for incoming queries to the actual source IP,
783 but we override it for some queries via SetECSAction().
784 """
785
786 _config_template = """
787 setECSOverride(false)
788 setECSSourcePrefixV4(32)
789 setECSSourcePrefixV6(128)
790 newServer{address="127.0.0.1:%s", useClientSubnet=true}
791 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
792 """
793
794 def testWithRegularECS(self):
795 """
796 ECS Prefix: not set
797 """
798 name = 'notsetecsaction.ecsrules.tests.powerdns.com.'
799 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
800 query = dns.message.make_query(name, 'A', 'IN')
801 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
802 response = dns.message.make_response(query)
803 response.use_edns(edns=True, payload=4096, options=[ecso])
804 rrset = dns.rrset.from_text(name,
805 3600,
806 dns.rdataclass.IN,
807 dns.rdatatype.A,
808 '127.0.0.1')
809 response.answer.append(rrset)
810 expectedResponse = dns.message.make_response(query)
811 expectedResponse.answer.append(rrset)
812
813 for method in ("sendUDPQuery", "sendTCPQuery"):
814 sender = getattr(self, method)
815 (receivedQuery, receivedResponse) = sender(query, response)
816 self.assertTrue(receivedQuery)
817 self.assertTrue(receivedResponse)
818 receivedQuery.id = expectedQuery.id
819 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
820 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
821
822 def testWithECSSetByRule(self):
823 """
824 ECS Prefix: set with SetECSAction
825 """
826 name = 'setecsaction.ecsrules.tests.powerdns.com.'
827 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
828 query = dns.message.make_query(name, 'A', 'IN')
829 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
830 response = dns.message.make_response(expectedQuery)
831 rrset = dns.rrset.from_text(name,
832 3600,
833 dns.rdataclass.IN,
834 dns.rdatatype.A,
835 '127.0.0.1')
836 response.answer.append(rrset)
837 expectedResponse = dns.message.make_response(query)
838 expectedResponse.answer.append(rrset)
839
840 for method in ("sendUDPQuery", "sendTCPQuery"):
841 sender = getattr(self, method)
842 (receivedQuery, receivedResponse) = sender(query, response)
843 self.assertTrue(receivedQuery)
844 self.assertTrue(receivedResponse)
845 receivedQuery.id = expectedQuery.id
846 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
847 self.checkResponseNoEDNS(expectedResponse, receivedResponse)