]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
CommitLineData
ca404e94 1#!/usr/bin/env python
b1bec9f0
RG
2import dns
3import clientsubnetoption
ff73f02b 4import cookiesoption
ca404e94 5from dnsdisttests import DNSDistTest
d83feb68 6from datetime import datetime, timedelta
ca404e94 7
5df86a8a 8class TestEdnsClientSubnetNoOverride(DNSDistTest):
ca404e94 9 """
ec5f5c6b 10 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
11 option, but only if it's not already present in the
12 original query.
13 """
14
ca404e94
RG
15 _config_template = """
16 truncateTC(true)
ca404e94
RG
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
ca404e94
RG
20 def testWithoutEDNS(self):
21 """
617dfe22
RG
22 ECS: No existing EDNS
23
ca404e94
RG
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
6ca2e796
RG
43 for method in ("sendUDPQuery", "sendTCPQuery"):
44 sender = getattr(self, method)
45 (receivedQuery, receivedResponse) = sender(query, response)
46 self.assertTrue(receivedQuery)
47 self.assertTrue(receivedResponse)
48 receivedQuery.id = expectedQuery.id
49 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
50 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
51
52 def testWithEDNSNoECS(self):
53 """
617dfe22
RG
54 ECS: Existing EDNS without ECS
55
ca404e94
RG
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
60 """
61 name = 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
63 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
64 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
65 response = dns.message.make_response(expectedQuery)
66 expectedResponse = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73 expectedResponse.answer.append(rrset)
74
6ca2e796
RG
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = expectedQuery.id
81 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
82 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94
RG
83
84 def testWithEDNSECS(self):
85 """
617dfe22
RG
86 ECS: Existing EDNS with ECS
87
ca404e94
RG
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
92 an EDNS pseudo-RR.
93 """
94 name = 'withednsecs.ecs.tests.powerdns.com.'
95 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
96 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
97 response = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '127.0.0.1')
103 response.answer.append(rrset)
104
ff0902ec 105
6ca2e796
RG
106 for method in ("sendUDPQuery", "sendTCPQuery"):
107 sender = getattr(self, method)
108 (receivedQuery, receivedResponse) = sender(query, response)
109 self.assertTrue(receivedQuery)
110 self.assertTrue(receivedResponse)
111 receivedQuery.id = query.id
112 self.checkQueryEDNSWithECS(query, receivedQuery)
113 self.checkResponseEDNSWithoutECS(response, receivedResponse)
ff73f02b
RG
114
115 def testWithoutEDNSResponseWithECS(self):
116 """
117 ECS: No existing EDNS (BE returning ECS)
118
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
125 """
126 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
128 query = dns.message.make_query(name, 'A', 'IN')
129 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
130 response = dns.message.make_response(expectedQuery)
131 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
132 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
133 expectedResponse = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 3600,
136 dns.rdataclass.IN,
137 dns.rdatatype.A,
138 '127.0.0.1')
139 response.answer.append(rrset)
140 expectedResponse.answer.append(rrset)
141
6ca2e796
RG
142 for method in ("sendUDPQuery", "sendTCPQuery"):
143 sender = getattr(self, method)
144 (receivedQuery, receivedResponse) = sender(query, response)
145 self.assertTrue(receivedQuery)
146 self.assertTrue(receivedResponse)
147 receivedQuery.id = expectedQuery.id
148 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
149 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff73f02b
RG
150
151 def testWithEDNSNoECSResponseWithECS(self):
152 """
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
154
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
161 """
162 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
165 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
166 response = dns.message.make_response(expectedQuery)
167 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
168 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
d70d5ad3 169 expectedResponse = dns.message.make_response(query, our_payload=4096)
ff73f02b
RG
170 rrset = dns.rrset.from_text(name,
171 3600,
172 dns.rdataclass.IN,
173 dns.rdatatype.A,
174 '127.0.0.1')
175 response.answer.append(rrset)
176 expectedResponse.answer.append(rrset)
177
6ca2e796
RG
178 for method in ("sendUDPQuery", "sendTCPQuery"):
179 sender = getattr(self, method)
180 (receivedQuery, receivedResponse) = sender(query, response)
181 self.assertTrue(receivedQuery)
182 self.assertTrue(receivedResponse)
183 receivedQuery.id = expectedQuery.id
184 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
185 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ff73f02b
RG
186
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
188 """
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
190
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
197 """
198 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
201 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
202 response = dns.message.make_response(expectedQuery)
b4f23783 203 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
204 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
205 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
206 expectedResponse = dns.message.make_response(query)
207 rrset = dns.rrset.from_text(name,
208 3600,
209 dns.rdataclass.IN,
210 dns.rdatatype.A,
211 '127.0.0.1')
212 response.answer.append(rrset)
213 expectedResponse.answer.append(rrset)
ff0902ec 214 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b 215
6ca2e796
RG
216 for method in ("sendUDPQuery", "sendTCPQuery"):
217 sender = getattr(self, method)
218 (receivedQuery, receivedResponse) = sender(query, response)
219 self.assertTrue(receivedQuery)
220 self.assertTrue(receivedResponse)
221 receivedQuery.id = expectedQuery.id
222 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
223 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
224
225 def testWithEDNSNoECSResponseWithECSThenCookies(self):
226 """
227 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
228
229 Send a query with EDNS but no ECS value.
230 Check that the query received by the responder
231 has a valid ECS value and that the response
232 received from dnsdist contains an EDNS pseudo-RR.
233 This time the response returned by the backend contains
234 one ECS then one Cookies option.
235 """
236 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
237 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
238 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
239 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
240 response = dns.message.make_response(expectedQuery)
b4f23783 241 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
242 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
243 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
d70d5ad3 244 expectedResponse = dns.message.make_response(query, our_payload=4096)
ff73f02b
RG
245 rrset = dns.rrset.from_text(name,
246 3600,
247 dns.rdataclass.IN,
248 dns.rdatatype.A,
249 '127.0.0.1')
250 response.answer.append(rrset)
251 expectedResponse.answer.append(rrset)
ff0902ec 252 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b 253
6ca2e796
RG
254 for method in ("sendUDPQuery", "sendTCPQuery"):
255 sender = getattr(self, method)
256 (receivedQuery, receivedResponse) = sender(query, response)
257 self.assertTrue(receivedQuery)
258 self.assertTrue(receivedResponse)
259 receivedQuery.id = expectedQuery.id
260 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
261 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
262
263 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
264 """
265 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
266
267 Send a query with EDNS but no ECS value.
268 Check that the query received by the responder
269 has a valid ECS value and that the response
270 received from dnsdist contains an EDNS pseudo-RR.
271 This time the response returned by the backend contains
272 one Cookies, one ECS then one Cookies option.
273 """
274 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
275 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
276 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
277 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
278 response = dns.message.make_response(expectedQuery)
b4f23783 279 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
280 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
281 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
d70d5ad3 282 expectedResponse = dns.message.make_response(query, our_payload=4096)
ff73f02b
RG
283 rrset = dns.rrset.from_text(name,
284 3600,
285 dns.rdataclass.IN,
286 dns.rdatatype.A,
287 '127.0.0.1')
288 response.answer.append(rrset)
289 expectedResponse.answer.append(rrset)
290
6ca2e796
RG
291 for method in ("sendUDPQuery", "sendTCPQuery"):
292 sender = getattr(self, method)
293 (receivedQuery, receivedResponse) = sender(query, response)
294 self.assertTrue(receivedQuery)
295 self.assertTrue(receivedResponse)
296 receivedQuery.id = expectedQuery.id
297 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
298 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
ca404e94 299
5df86a8a 300class TestEdnsClientSubnetOverride(DNSDistTest):
ca404e94 301 """
ec5f5c6b 302 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
303 option, overwriting any existing value.
304 """
305
ca404e94
RG
306 _config_template = """
307 truncateTC(true)
ca404e94
RG
308 setECSOverride(true)
309 setECSSourcePrefixV4(24)
310 setECSSourcePrefixV6(56)
311 newServer{address="127.0.0.1:%s", useClientSubnet=true}
312 """
313
ca404e94
RG
314 def testWithoutEDNS(self):
315 """
617dfe22
RG
316 ECS Override: No existing EDNS
317
ca404e94
RG
318 Send a query without EDNS, check that the query
319 received by the responder has the correct ECS value
320 and that the response received from dnsdist does not
321 have an EDNS pseudo-RR.
322 """
ff0902ec 323 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
324 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
325 query = dns.message.make_query(name, 'A', 'IN')
326 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
327 response = dns.message.make_response(expectedQuery)
ff0902ec 328 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
329 rrset = dns.rrset.from_text(name,
330 3600,
331 dns.rdataclass.IN,
332 dns.rdatatype.A,
333 '127.0.0.1')
334 response.answer.append(rrset)
ff0902ec 335 expectedResponse = dns.message.make_response(query)
ca404e94
RG
336 expectedResponse.answer.append(rrset)
337
6ca2e796
RG
338 for method in ("sendUDPQuery", "sendTCPQuery"):
339 sender = getattr(self, method)
340 (receivedQuery, receivedResponse) = sender(query, response)
341 self.assertTrue(receivedQuery)
342 self.assertTrue(receivedResponse)
343 receivedQuery.id = expectedQuery.id
344 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
345 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
346
347 def testWithEDNSNoECS(self):
348 """
617dfe22
RG
349 ECS Override: Existing EDNS without ECS
350
ca404e94
RG
351 Send a query with EDNS but no ECS value.
352 Check that the query received by the responder
353 has a valid ECS value and that the response
354 received from dnsdist contains an EDNS pseudo-RR.
355 """
ff0902ec 356 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
357 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
358 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
359 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
360 response = dns.message.make_response(expectedQuery)
ff0902ec 361 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
362 rrset = dns.rrset.from_text(name,
363 3600,
364 dns.rdataclass.IN,
365 dns.rdatatype.A,
366 '127.0.0.1')
367 response.answer.append(rrset)
d70d5ad3 368 expectedResponse = dns.message.make_response(query, our_payload=4096)
ca404e94
RG
369 expectedResponse.answer.append(rrset)
370
6ca2e796
RG
371 for method in ("sendUDPQuery", "sendTCPQuery"):
372 sender = getattr(self, method)
373 (receivedQuery, receivedResponse) = sender(query, response)
374 self.assertTrue(receivedQuery)
375 self.assertTrue(receivedResponse)
376 receivedQuery.id = expectedQuery.id
377 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
378 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94 379
7b146b98 380 def testWithEDNSShorterInitialECS(self):
ca404e94 381 """
617dfe22
RG
382 ECS Override: Existing EDNS with ECS (short)
383
ca404e94
RG
384 Send a query with EDNS and a crafted ECS value.
385 Check that the query received by the responder
386 has an overwritten ECS value (not the initial one)
387 and that the response received from dnsdist contains
388 an EDNS pseudo-RR.
ff0902ec 389 The initial ECS value is shorter than the one it will be
7b146b98 390 replaced with.
ca404e94 391 """
ff0902ec 392 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
393 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
394 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
395 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
396 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
397 response = dns.message.make_response(query)
ff0902ec 398 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
399 rrset = dns.rrset.from_text(name,
400 3600,
401 dns.rdataclass.IN,
402 dns.rdatatype.A,
403 '127.0.0.1')
404 response.answer.append(rrset)
405
6ca2e796
RG
406 for method in ("sendUDPQuery", "sendTCPQuery"):
407 sender = getattr(self, method)
408 (receivedQuery, receivedResponse) = sender(query, response)
409 self.assertTrue(receivedQuery)
410 self.assertTrue(receivedResponse)
411 receivedQuery.id = expectedQuery.id
412 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
413 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
414
415 def testWithEDNSLongerInitialECS(self):
416 """
617dfe22
RG
417 ECS Override: Existing EDNS with ECS (long)
418
7b146b98
RG
419 Send a query with EDNS and a crafted ECS value.
420 Check that the query received by the responder
421 has an overwritten ECS value (not the initial one)
422 and that the response received from dnsdist contains
423 an EDNS pseudo-RR.
424 The initial ECS value is longer than the one it will
425 replaced with.
426 """
ff0902ec 427 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
428 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
429 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
430 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
431 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
432 response = dns.message.make_response(query)
ff0902ec 433 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
434 rrset = dns.rrset.from_text(name,
435 3600,
436 dns.rdataclass.IN,
437 dns.rdatatype.A,
438 '127.0.0.1')
439 response.answer.append(rrset)
440
6ca2e796
RG
441 for method in ("sendUDPQuery", "sendTCPQuery"):
442 sender = getattr(self, method)
443 (receivedQuery, receivedResponse) = sender(query, response)
444 self.assertTrue(receivedQuery)
445 self.assertTrue(receivedResponse)
446 receivedQuery.id = expectedQuery.id
447 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
448 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
449
450 def testWithEDNSSameSizeInitialECS(self):
451 """
617dfe22
RG
452 ECS Override: Existing EDNS with ECS (same)
453
7b146b98
RG
454 Send a query with EDNS and a crafted ECS value.
455 Check that the query received by the responder
456 has an overwritten ECS value (not the initial one)
457 and that the response received from dnsdist contains
458 an EDNS pseudo-RR.
459 The initial ECS value is exactly the same size as
460 the one it will replaced with.
461 """
ff0902ec
RG
462 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
463 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
464 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
465 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
466 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
467 response = dns.message.make_response(query)
468 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
469 rrset = dns.rrset.from_text(name,
470 3600,
471 dns.rdataclass.IN,
472 dns.rdatatype.A,
473 '127.0.0.1')
474 response.answer.append(rrset)
475
6ca2e796
RG
476 for method in ("sendUDPQuery", "sendTCPQuery"):
477 sender = getattr(self, method)
478 (receivedQuery, receivedResponse) = sender(query, response)
479 self.assertTrue(receivedQuery)
480 self.assertTrue(receivedResponse)
481 receivedQuery.id = expectedQuery.id
482 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
483 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec 484
5df86a8a 485class TestECSDisabledByRuleOrLua(DNSDistTest):
ff0902ec
RG
486 """
487 dnsdist is configured to add the EDNS0 Client Subnet
488 option, but we disable it via DisableECSAction()
489 or Lua.
490 """
491
492 _config_template = """
493 setECSOverride(false)
494 setECSSourcePrefixV4(16)
495 setECSSourcePrefixV6(16)
496 newServer{address="127.0.0.1:%s", useClientSubnet=true}
497 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
498 function disableECSViaLua(dq)
499 dq.useECS = false
500 return DNSAction.None, ""
501 end
a2ff35e3 502 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
ff0902ec
RG
503 """
504
505 def testWithECSNotDisabled(self):
506 """
507 ECS Disable: ECS enabled in the backend
508 """
509 name = 'notdisabled.ecsrules.tests.powerdns.com.'
510 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
511 query = dns.message.make_query(name, 'A', 'IN')
512 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
513 response = dns.message.make_response(expectedQuery)
514 expectedResponse = dns.message.make_response(query)
515 rrset = dns.rrset.from_text(name,
516 3600,
517 dns.rdataclass.IN,
518 dns.rdatatype.AAAA,
519 '::1')
520 response.answer.append(rrset)
521 expectedResponse.answer.append(rrset)
522
6ca2e796
RG
523 for method in ("sendUDPQuery", "sendTCPQuery"):
524 sender = getattr(self, method)
525 (receivedQuery, receivedResponse) = sender(query, response)
526 self.assertTrue(receivedQuery)
527 self.assertTrue(receivedResponse)
528 receivedQuery.id = expectedQuery.id
529 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
530 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
531
532 def testWithECSDisabledViaRule(self):
533 """
534 ECS Disable: ECS enabled in the backend, but disabled by a rule
535 """
536 name = 'disabled.ecsrules.tests.powerdns.com.'
537 query = dns.message.make_query(name, 'A', 'IN')
538 response = dns.message.make_response(query)
539 rrset = dns.rrset.from_text(name,
540 3600,
541 dns.rdataclass.IN,
542 dns.rdatatype.A,
543 '127.0.0.1')
544 response.answer.append(rrset)
545
6ca2e796
RG
546 for method in ("sendUDPQuery", "sendTCPQuery"):
547 sender = getattr(self, method)
548 (receivedQuery, receivedResponse) = sender(query, response)
549 self.assertTrue(receivedQuery)
550 self.assertTrue(receivedResponse)
551 receivedQuery.id = query.id
552 self.checkQueryNoEDNS(query, receivedQuery)
553 self.checkResponseNoEDNS(response, receivedResponse)
ff0902ec
RG
554
555 def testWithECSDisabledViaLua(self):
556 """
557 ECS Disable: ECS enabled in the backend, but disabled via Lua
558 """
559 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
560 query = dns.message.make_query(name, 'A', 'IN')
561 response = dns.message.make_response(query)
562 rrset = dns.rrset.from_text(name,
563 3600,
564 dns.rdataclass.IN,
565 dns.rdatatype.A,
566 '127.0.0.1')
567 response.answer.append(rrset)
568
6ca2e796
RG
569 for method in ("sendUDPQuery", "sendTCPQuery"):
570 sender = getattr(self, method)
571 (receivedQuery, receivedResponse) = sender(query, response)
572 self.assertTrue(receivedQuery)
573 self.assertTrue(receivedResponse)
574 receivedQuery.id = query.id
575 self.checkQueryNoEDNS(query, receivedQuery)
576 self.checkResponseNoEDNS(response, receivedResponse)
ff0902ec 577
5df86a8a 578class TestECSOverrideSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
579 """
580 dnsdist is configured to set the EDNS0 Client Subnet
581 option without overriding an existing one, but we
582 force the overriding via ECSOverrideAction() or Lua.
583 """
584
585 _config_template = """
586 setECSOverride(false)
587 setECSSourcePrefixV4(24)
588 setECSSourcePrefixV6(56)
589 newServer{address="127.0.0.1:%s", useClientSubnet=true}
590 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
591 function overrideECSViaLua(dq)
592 dq.ecsOverride = true
593 return DNSAction.None, ""
594 end
a2ff35e3 595 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
ff0902ec
RG
596 """
597
598 def testWithECSOverrideNotSet(self):
599 """
600 ECS Override: not set via Lua or a rule
601 """
602 name = 'notoverridden.ecsrules.tests.powerdns.com.'
603 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
604 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
605 response = dns.message.make_response(query)
606 response.use_edns(edns=True, payload=4096, options=[ecso])
607 rrset = dns.rrset.from_text(name,
608 3600,
609 dns.rdataclass.IN,
610 dns.rdatatype.A,
611 '127.0.0.1')
612 response.answer.append(rrset)
613
6ca2e796
RG
614 for method in ("sendUDPQuery", "sendTCPQuery"):
615 sender = getattr(self, method)
616 (receivedQuery, receivedResponse) = sender(query, response)
617 self.assertTrue(receivedQuery)
618 self.assertTrue(receivedResponse)
619 receivedQuery.id = query.id
620 self.checkQueryEDNSWithECS(query, receivedQuery)
621 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec
RG
622
623 def testWithECSOverrideSetViaRule(self):
624 """
625 ECS Override: set with a rule
626 """
627 name = 'overridden.ecsrules.tests.powerdns.com.'
628 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
629 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
630 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
631 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
632 response = dns.message.make_response(query)
633 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
634 rrset = dns.rrset.from_text(name,
635 3600,
636 dns.rdataclass.IN,
637 dns.rdatatype.A,
638 '127.0.0.1')
639 response.answer.append(rrset)
640
6ca2e796
RG
641 for method in ("sendUDPQuery", "sendTCPQuery"):
642 sender = getattr(self, method)
643 (receivedQuery, receivedResponse) = sender(query, response)
644 self.assertTrue(receivedQuery)
645 self.assertTrue(receivedResponse)
646 receivedQuery.id = expectedQuery.id
647 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
648 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec
RG
649
650 def testWithECSOverrideSetViaLua(self):
651 """
652 ECS Override: set via Lua
653 """
654 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
7b146b98 655 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
ca404e94
RG
656 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
657 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
658 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
659 response = dns.message.make_response(query)
ff0902ec 660 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
ca404e94
RG
661 rrset = dns.rrset.from_text(name,
662 3600,
663 dns.rdataclass.IN,
664 dns.rdatatype.A,
665 '127.0.0.1')
666 response.answer.append(rrset)
667
6ca2e796
RG
668 for method in ("sendUDPQuery", "sendTCPQuery"):
669 sender = getattr(self, method)
670 (receivedQuery, receivedResponse) = sender(query, response)
671 self.assertTrue(receivedQuery)
672 self.assertTrue(receivedResponse)
673 receivedQuery.id = expectedQuery.id
674 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
675 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec 676
5df86a8a 677class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
678 """
679 dnsdist is configured to set the EDNS0 Client Subnet
680 option with a prefix length of 24 for IPv4 and 56 for IPv6,
681 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
682 """
683
684 _config_template = """
685 setECSOverride(false)
686 setECSSourcePrefixV4(24)
687 setECSSourcePrefixV6(56)
688 newServer{address="127.0.0.1:%s", useClientSubnet=true}
689 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
690 function overrideECSPrefixLengthViaLua(dq)
691 dq.ecsPrefixLength = 32
692 return DNSAction.None, ""
693 end
a2ff35e3 694 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
ff0902ec
RG
695 """
696
697 def testWithECSPrefixLengthNotOverridden(self):
698 """
699 ECS Prefix Length: not overridden via Lua or a rule
700 """
701 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
702 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
703 query = dns.message.make_query(name, 'A', 'IN')
704 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
705 response = dns.message.make_response(query)
706 response.use_edns(edns=True, payload=4096, options=[ecso])
707 rrset = dns.rrset.from_text(name,
708 3600,
709 dns.rdataclass.IN,
710 dns.rdatatype.A,
711 '127.0.0.1')
712 response.answer.append(rrset)
713 expectedResponse = dns.message.make_response(query)
714 expectedResponse.answer.append(rrset)
715
6ca2e796
RG
716 for method in ("sendUDPQuery", "sendTCPQuery"):
717 sender = getattr(self, method)
718 (receivedQuery, receivedResponse) = sender(query, response)
719 self.assertTrue(receivedQuery)
720 self.assertTrue(receivedResponse)
721 receivedQuery.id = expectedQuery.id
722 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
723 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
724
725 def testWithECSPrefixLengthOverriddenViaRule(self):
726 """
727 ECS Prefix Length: overridden with a rule
728 """
729 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
730 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
731 query = dns.message.make_query(name, 'A', 'IN')
732 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
733 response = dns.message.make_response(expectedQuery)
734 rrset = dns.rrset.from_text(name,
735 3600,
736 dns.rdataclass.IN,
737 dns.rdatatype.A,
738 '127.0.0.1')
739 response.answer.append(rrset)
740 expectedResponse = dns.message.make_response(query)
741 expectedResponse.answer.append(rrset)
742
6ca2e796
RG
743 for method in ("sendUDPQuery", "sendTCPQuery"):
744 sender = getattr(self, method)
745 (receivedQuery, receivedResponse) = sender(query, response)
746 self.assertTrue(receivedQuery)
747 self.assertTrue(receivedResponse)
748 receivedQuery.id = expectedQuery.id
749 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
750 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
751
752 def testWithECSPrefixLengthOverriddenViaLua(self):
753 """
754 ECS Prefix Length: overridden via Lua
755 """
756 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
757 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
758 query = dns.message.make_query(name, 'A', 'IN')
759 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
760 response = dns.message.make_response(expectedQuery)
bd14f087
RG
761 rrset = dns.rrset.from_text(name,
762 3600,
763 dns.rdataclass.IN,
764 dns.rdatatype.A,
765 '127.0.0.1')
766 response.answer.append(rrset)
767 expectedResponse = dns.message.make_response(query)
768 expectedResponse.answer.append(rrset)
769
6ca2e796
RG
770 for method in ("sendUDPQuery", "sendTCPQuery"):
771 sender = getattr(self, method)
772 (receivedQuery, receivedResponse) = sender(query, response)
773 self.assertTrue(receivedQuery)
774 self.assertTrue(receivedResponse)
775 receivedQuery.id = expectedQuery.id
776 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
777 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
bd14f087
RG
778
779class TestECSPrefixSetByRule(DNSDistTest):
780 """
781 dnsdist is configured to set the EDNS0 Client Subnet
782 option for incoming queries to the actual source IP,
783 but we override it for some queries via SetECSAction().
784 """
785
786 _config_template = """
787 setECSOverride(false)
788 setECSSourcePrefixV4(32)
789 setECSSourcePrefixV6(128)
790 newServer{address="127.0.0.1:%s", useClientSubnet=true}
791 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
792 """
793
794 def testWithRegularECS(self):
795 """
796 ECS Prefix: not set
797 """
798 name = 'notsetecsaction.ecsrules.tests.powerdns.com.'
799 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
800 query = dns.message.make_query(name, 'A', 'IN')
801 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
802 response = dns.message.make_response(query)
803 response.use_edns(edns=True, payload=4096, options=[ecso])
804 rrset = dns.rrset.from_text(name,
805 3600,
806 dns.rdataclass.IN,
807 dns.rdatatype.A,
808 '127.0.0.1')
809 response.answer.append(rrset)
810 expectedResponse = dns.message.make_response(query)
811 expectedResponse.answer.append(rrset)
812
6ca2e796
RG
813 for method in ("sendUDPQuery", "sendTCPQuery"):
814 sender = getattr(self, method)
815 (receivedQuery, receivedResponse) = sender(query, response)
816 self.assertTrue(receivedQuery)
817 self.assertTrue(receivedResponse)
818 receivedQuery.id = expectedQuery.id
819 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
820 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
bd14f087
RG
821
822 def testWithECSSetByRule(self):
823 """
824 ECS Prefix: set with SetECSAction
825 """
826 name = 'setecsaction.ecsrules.tests.powerdns.com.'
827 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
828 query = dns.message.make_query(name, 'A', 'IN')
829 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
830 response = dns.message.make_response(expectedQuery)
ff0902ec
RG
831 rrset = dns.rrset.from_text(name,
832 3600,
833 dns.rdataclass.IN,
834 dns.rdatatype.A,
835 '127.0.0.1')
836 response.answer.append(rrset)
837 expectedResponse = dns.message.make_response(query)
838 expectedResponse.answer.append(rrset)
839
6ca2e796
RG
840 for method in ("sendUDPQuery", "sendTCPQuery"):
841 sender = getattr(self, method)
842 (receivedQuery, receivedResponse) = sender(query, response)
843 self.assertTrue(receivedQuery)
844 self.assertTrue(receivedResponse)
845 receivedQuery.id = expectedQuery.id
846 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
847 self.checkResponseNoEDNS(expectedResponse, receivedResponse)