]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EdnsClientSubnet.py
dnsdist tests: make py3k compatible and pick py3k if available
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
CommitLineData
ca404e94 1#!/usr/bin/env python
b1bec9f0
RG
2import dns
3import clientsubnetoption
ff73f02b 4import cookiesoption
ca404e94 5from dnsdisttests import DNSDistTest
d83feb68 6from datetime import datetime, timedelta
ca404e94 7
5df86a8a 8class TestEdnsClientSubnetNoOverride(DNSDistTest):
ca404e94 9 """
ec5f5c6b 10 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
11 option, but only if it's not already present in the
12 original query.
13 """
14
ca404e94
RG
15 _config_template = """
16 truncateTC(true)
ca404e94
RG
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
ca404e94
RG
20 def testWithoutEDNS(self):
21 """
617dfe22
RG
22 ECS: No existing EDNS
23
ca404e94
RG
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
43 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
44 self.assertTrue(receivedQuery)
45 self.assertTrue(receivedResponse)
46 receivedQuery.id = expectedQuery.id
ff0902ec
RG
47 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
48 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
49
50 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = expectedQuery.id
ff0902ec
RG
54 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
55 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
56
57 def testWithEDNSNoECS(self):
58 """
617dfe22
RG
59 ECS: Existing EDNS without ECS
60
ca404e94
RG
61 Send a query with EDNS but no ECS value.
62 Check that the query received by the responder
63 has a valid ECS value and that the response
64 received from dnsdist contains an EDNS pseudo-RR.
65 """
66 name = 'withednsnoecs.ecs.tests.powerdns.com.'
67 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
68 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
69 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
70 response = dns.message.make_response(expectedQuery)
71 expectedResponse = dns.message.make_response(query)
72 rrset = dns.rrset.from_text(name,
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78 expectedResponse.answer.append(rrset)
79
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 self.assertTrue(receivedQuery)
82 self.assertTrue(receivedResponse)
83 receivedQuery.id = expectedQuery.id
ff0902ec
RG
84 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
85 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94
RG
86
87 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
88 self.assertTrue(receivedQuery)
89 self.assertTrue(receivedResponse)
90 receivedQuery.id = expectedQuery.id
ff0902ec
RG
91 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
92 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94
RG
93
94 def testWithEDNSECS(self):
95 """
617dfe22
RG
96 ECS: Existing EDNS with ECS
97
ca404e94
RG
98 Send a query with EDNS and a crafted ECS value.
99 Check that the query received by the responder
100 has the initial ECS value (not overwritten)
101 and that the response received from dnsdist contains
102 an EDNS pseudo-RR.
103 """
104 name = 'withednsecs.ecs.tests.powerdns.com.'
105 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
106 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
107 response = dns.message.make_response(query)
108 rrset = dns.rrset.from_text(name,
109 3600,
110 dns.rdataclass.IN,
111 dns.rdatatype.A,
112 '127.0.0.1')
113 response.answer.append(rrset)
114
ff0902ec 115
ca404e94
RG
116 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
117 self.assertTrue(receivedQuery)
118 self.assertTrue(receivedResponse)
119 receivedQuery.id = query.id
ff0902ec
RG
120 self.checkQueryEDNSWithECS(query, receivedQuery)
121 self.checkResponseEDNSWithoutECS(response, receivedResponse)
ca404e94
RG
122
123 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
124 self.assertTrue(receivedQuery)
125 self.assertTrue(receivedResponse)
126 receivedQuery.id = query.id
ff0902ec
RG
127 self.checkQueryEDNSWithECS(query, receivedQuery)
128 self.checkResponseEDNSWithoutECS(response, receivedResponse)
ff73f02b
RG
129
130 def testWithoutEDNSResponseWithECS(self):
131 """
132 ECS: No existing EDNS (BE returning ECS)
133
134 Send a query without EDNS, check that the query
135 received by the responder has the correct ECS value
136 and that the response received from dnsdist does not
137 have an EDNS pseudo-RR.
138 This time the response returned by the backend contains
139 an ECS option with scope set.
140 """
141 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
142 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
143 query = dns.message.make_query(name, 'A', 'IN')
144 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
145 response = dns.message.make_response(expectedQuery)
146 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
147 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
148 expectedResponse = dns.message.make_response(query)
149 rrset = dns.rrset.from_text(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 '127.0.0.1')
154 response.answer.append(rrset)
155 expectedResponse.answer.append(rrset)
156
157 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
158 self.assertTrue(receivedQuery)
159 self.assertTrue(receivedResponse)
160 receivedQuery.id = expectedQuery.id
ff0902ec
RG
161 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
162 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff73f02b
RG
163
164 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
165 self.assertTrue(receivedQuery)
166 self.assertTrue(receivedResponse)
167 receivedQuery.id = expectedQuery.id
ff0902ec
RG
168 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
169 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff73f02b
RG
170
171 def testWithEDNSNoECSResponseWithECS(self):
172 """
173 ECS: Existing EDNS without ECS (BE returning only the ECS option)
174
175 Send a query with EDNS but no ECS value.
176 Check that the query received by the responder
177 has a valid ECS value and that the response
178 received from dnsdist contains an EDNS pseudo-RR.
179 This time the response returned by the backend contains
180 an ECS option with scope set.
181 """
182 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
183 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
184 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
185 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
186 response = dns.message.make_response(expectedQuery)
187 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
188 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
189 expectedResponse = dns.message.make_response(query)
190 rrset = dns.rrset.from_text(name,
191 3600,
192 dns.rdataclass.IN,
193 dns.rdatatype.A,
194 '127.0.0.1')
195 response.answer.append(rrset)
196 expectedResponse.answer.append(rrset)
197
198 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
199 self.assertTrue(receivedQuery)
200 self.assertTrue(receivedResponse)
201 receivedQuery.id = expectedQuery.id
ff0902ec
RG
202 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
203 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ff73f02b
RG
204
205 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
206 self.assertTrue(receivedQuery)
207 self.assertTrue(receivedResponse)
208 receivedQuery.id = expectedQuery.id
ff0902ec
RG
209 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
210 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ff73f02b
RG
211
212 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
213 """
214 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
215
216 Send a query with EDNS but no ECS value.
217 Check that the query received by the responder
218 has a valid ECS value and that the response
219 received from dnsdist contains an EDNS pseudo-RR.
220 This time the response returned by the backend contains
221 one cookies then one ECS option.
222 """
223 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
224 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
225 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
226 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
227 response = dns.message.make_response(expectedQuery)
b4f23783 228 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
229 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
230 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
231 expectedResponse = dns.message.make_response(query)
232 rrset = dns.rrset.from_text(name,
233 3600,
234 dns.rdataclass.IN,
235 dns.rdatatype.A,
236 '127.0.0.1')
237 response.answer.append(rrset)
238 expectedResponse.answer.append(rrset)
ff0902ec 239 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b
RG
240
241 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
242 self.assertTrue(receivedQuery)
243 self.assertTrue(receivedResponse)
244 receivedQuery.id = expectedQuery.id
ff0902ec
RG
245 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
246 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
247
248 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
249 self.assertTrue(receivedQuery)
250 self.assertTrue(receivedResponse)
251 receivedQuery.id = expectedQuery.id
ff0902ec
RG
252 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
253 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
254
255 def testWithEDNSNoECSResponseWithECSThenCookies(self):
256 """
257 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
258
259 Send a query with EDNS but no ECS value.
260 Check that the query received by the responder
261 has a valid ECS value and that the response
262 received from dnsdist contains an EDNS pseudo-RR.
263 This time the response returned by the backend contains
264 one ECS then one Cookies option.
265 """
266 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
267 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
268 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
269 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
270 response = dns.message.make_response(expectedQuery)
b4f23783 271 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
272 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
273 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
274 expectedResponse = dns.message.make_response(query)
275 rrset = dns.rrset.from_text(name,
276 3600,
277 dns.rdataclass.IN,
278 dns.rdatatype.A,
279 '127.0.0.1')
280 response.answer.append(rrset)
281 expectedResponse.answer.append(rrset)
ff0902ec 282 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b
RG
283
284 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
285 self.assertTrue(receivedQuery)
286 self.assertTrue(receivedResponse)
287 receivedQuery.id = expectedQuery.id
ff0902ec
RG
288 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
289 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
290
291 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
292 self.assertTrue(receivedQuery)
293 self.assertTrue(receivedResponse)
294 receivedQuery.id = expectedQuery.id
ff0902ec
RG
295 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
296 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
297
298 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
299 """
300 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
301
302 Send a query with EDNS but no ECS value.
303 Check that the query received by the responder
304 has a valid ECS value and that the response
305 received from dnsdist contains an EDNS pseudo-RR.
306 This time the response returned by the backend contains
307 one Cookies, one ECS then one Cookies option.
308 """
309 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
310 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
311 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
312 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
313 response = dns.message.make_response(expectedQuery)
b4f23783 314 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
315 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
316 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
317 expectedResponse = dns.message.make_response(query)
318 rrset = dns.rrset.from_text(name,
319 3600,
320 dns.rdataclass.IN,
321 dns.rdatatype.A,
322 '127.0.0.1')
323 response.answer.append(rrset)
324 expectedResponse.answer.append(rrset)
325
326 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
327 self.assertTrue(receivedQuery)
328 self.assertTrue(receivedResponse)
329 receivedQuery.id = expectedQuery.id
ff0902ec
RG
330 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
331 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
ff73f02b
RG
332
333 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
334 self.assertTrue(receivedQuery)
335 self.assertTrue(receivedResponse)
336 receivedQuery.id = expectedQuery.id
ff0902ec
RG
337 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
338 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
ff73f02b 339
ca404e94 340
5df86a8a 341class TestEdnsClientSubnetOverride(DNSDistTest):
ca404e94 342 """
ec5f5c6b 343 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
344 option, overwriting any existing value.
345 """
346
ca404e94
RG
347 _config_template = """
348 truncateTC(true)
ca404e94
RG
349 setECSOverride(true)
350 setECSSourcePrefixV4(24)
351 setECSSourcePrefixV6(56)
352 newServer{address="127.0.0.1:%s", useClientSubnet=true}
353 """
354
ca404e94
RG
355 def testWithoutEDNS(self):
356 """
617dfe22
RG
357 ECS Override: No existing EDNS
358
ca404e94
RG
359 Send a query without EDNS, check that the query
360 received by the responder has the correct ECS value
361 and that the response received from dnsdist does not
362 have an EDNS pseudo-RR.
363 """
ff0902ec 364 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
365 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
366 query = dns.message.make_query(name, 'A', 'IN')
367 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
368 response = dns.message.make_response(expectedQuery)
ff0902ec 369 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
370 rrset = dns.rrset.from_text(name,
371 3600,
372 dns.rdataclass.IN,
373 dns.rdatatype.A,
374 '127.0.0.1')
375 response.answer.append(rrset)
ff0902ec 376 expectedResponse = dns.message.make_response(query)
ca404e94
RG
377 expectedResponse.answer.append(rrset)
378
379 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
380 self.assertTrue(receivedQuery)
381 self.assertTrue(receivedResponse)
382 receivedQuery.id = expectedQuery.id
ff0902ec
RG
383 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
384 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
385
386 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
387 self.assertTrue(receivedQuery)
388 self.assertTrue(receivedResponse)
389 receivedQuery.id = expectedQuery.id
ff0902ec
RG
390 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
391 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
392
393 def testWithEDNSNoECS(self):
394 """
617dfe22
RG
395 ECS Override: Existing EDNS without ECS
396
ca404e94
RG
397 Send a query with EDNS but no ECS value.
398 Check that the query received by the responder
399 has a valid ECS value and that the response
400 received from dnsdist contains an EDNS pseudo-RR.
401 """
ff0902ec 402 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
403 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
404 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
405 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
406 response = dns.message.make_response(expectedQuery)
ff0902ec 407 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
408 rrset = dns.rrset.from_text(name,
409 3600,
410 dns.rdataclass.IN,
411 dns.rdatatype.A,
412 '127.0.0.1')
413 response.answer.append(rrset)
ff0902ec 414 expectedResponse = dns.message.make_response(query)
ca404e94
RG
415 expectedResponse.answer.append(rrset)
416
417 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
418 self.assertTrue(receivedQuery)
419 self.assertTrue(receivedResponse)
420 receivedQuery.id = expectedQuery.id
ff0902ec
RG
421 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
422 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94
RG
423
424 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
425 self.assertTrue(receivedQuery)
426 self.assertTrue(receivedResponse)
427 receivedQuery.id = expectedQuery.id
ff0902ec
RG
428 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
429 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94 430
7b146b98 431 def testWithEDNSShorterInitialECS(self):
ca404e94 432 """
617dfe22
RG
433 ECS Override: Existing EDNS with ECS (short)
434
ca404e94
RG
435 Send a query with EDNS and a crafted ECS value.
436 Check that the query received by the responder
437 has an overwritten ECS value (not the initial one)
438 and that the response received from dnsdist contains
439 an EDNS pseudo-RR.
ff0902ec 440 The initial ECS value is shorter than the one it will be
7b146b98 441 replaced with.
ca404e94 442 """
ff0902ec 443 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
444 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
445 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
446 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
447 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
448 response = dns.message.make_response(query)
ff0902ec 449 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
450 rrset = dns.rrset.from_text(name,
451 3600,
452 dns.rdataclass.IN,
453 dns.rdatatype.A,
454 '127.0.0.1')
455 response.answer.append(rrset)
456
457 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
458 self.assertTrue(receivedQuery)
459 self.assertTrue(receivedResponse)
460 receivedQuery.id = expectedQuery.id
ff0902ec
RG
461 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
462 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
463
464 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
465 self.assertTrue(receivedQuery)
466 self.assertTrue(receivedResponse)
467 receivedQuery.id = expectedQuery.id
ff0902ec
RG
468 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
469 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
470
471 def testWithEDNSLongerInitialECS(self):
472 """
617dfe22
RG
473 ECS Override: Existing EDNS with ECS (long)
474
7b146b98
RG
475 Send a query with EDNS and a crafted ECS value.
476 Check that the query received by the responder
477 has an overwritten ECS value (not the initial one)
478 and that the response received from dnsdist contains
479 an EDNS pseudo-RR.
480 The initial ECS value is longer than the one it will
481 replaced with.
482 """
ff0902ec 483 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
484 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
485 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
486 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
487 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
488 response = dns.message.make_response(query)
ff0902ec 489 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
490 rrset = dns.rrset.from_text(name,
491 3600,
492 dns.rdataclass.IN,
493 dns.rdatatype.A,
494 '127.0.0.1')
495 response.answer.append(rrset)
496
497 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
498 self.assertTrue(receivedQuery)
499 self.assertTrue(receivedResponse)
500 receivedQuery.id = expectedQuery.id
ff0902ec
RG
501 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
502 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
503
504 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
505 self.assertTrue(receivedQuery)
506 self.assertTrue(receivedResponse)
507 receivedQuery.id = expectedQuery.id
ff0902ec
RG
508 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
509 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
510
511 def testWithEDNSSameSizeInitialECS(self):
512 """
617dfe22
RG
513 ECS Override: Existing EDNS with ECS (same)
514
7b146b98
RG
515 Send a query with EDNS and a crafted ECS value.
516 Check that the query received by the responder
517 has an overwritten ECS value (not the initial one)
518 and that the response received from dnsdist contains
519 an EDNS pseudo-RR.
520 The initial ECS value is exactly the same size as
521 the one it will replaced with.
522 """
ff0902ec
RG
523 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
524 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
525 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
526 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
527 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
528 response = dns.message.make_response(query)
529 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
530 rrset = dns.rrset.from_text(name,
531 3600,
532 dns.rdataclass.IN,
533 dns.rdatatype.A,
534 '127.0.0.1')
535 response.answer.append(rrset)
536
537 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
538 self.assertTrue(receivedQuery)
539 self.assertTrue(receivedResponse)
540 receivedQuery.id = expectedQuery.id
541 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
542 self.checkResponseEDNSWithECS(response, receivedResponse)
543
544 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
545 self.assertTrue(receivedQuery)
546 self.assertTrue(receivedResponse)
547 receivedQuery.id = expectedQuery.id
548 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
549 self.checkResponseEDNSWithECS(response, receivedResponse)
550
5df86a8a 551class TestECSDisabledByRuleOrLua(DNSDistTest):
ff0902ec
RG
552 """
553 dnsdist is configured to add the EDNS0 Client Subnet
554 option, but we disable it via DisableECSAction()
555 or Lua.
556 """
557
558 _config_template = """
559 setECSOverride(false)
560 setECSSourcePrefixV4(16)
561 setECSSourcePrefixV6(16)
562 newServer{address="127.0.0.1:%s", useClientSubnet=true}
563 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
564 function disableECSViaLua(dq)
565 dq.useECS = false
566 return DNSAction.None, ""
567 end
a2ff35e3 568 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
ff0902ec
RG
569 """
570
571 def testWithECSNotDisabled(self):
572 """
573 ECS Disable: ECS enabled in the backend
574 """
575 name = 'notdisabled.ecsrules.tests.powerdns.com.'
576 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
577 query = dns.message.make_query(name, 'A', 'IN')
578 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
579 response = dns.message.make_response(expectedQuery)
580 expectedResponse = dns.message.make_response(query)
581 rrset = dns.rrset.from_text(name,
582 3600,
583 dns.rdataclass.IN,
584 dns.rdatatype.AAAA,
585 '::1')
586 response.answer.append(rrset)
587 expectedResponse.answer.append(rrset)
588
589 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
590 self.assertTrue(receivedQuery)
591 self.assertTrue(receivedResponse)
592 receivedQuery.id = expectedQuery.id
593 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
594 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
595
596 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
597 self.assertTrue(receivedQuery)
598 self.assertTrue(receivedResponse)
599 receivedQuery.id = expectedQuery.id
600 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
601 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
602
603 def testWithECSDisabledViaRule(self):
604 """
605 ECS Disable: ECS enabled in the backend, but disabled by a rule
606 """
607 name = 'disabled.ecsrules.tests.powerdns.com.'
608 query = dns.message.make_query(name, 'A', 'IN')
609 response = dns.message.make_response(query)
610 rrset = dns.rrset.from_text(name,
611 3600,
612 dns.rdataclass.IN,
613 dns.rdatatype.A,
614 '127.0.0.1')
615 response.answer.append(rrset)
616
617 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
618 self.assertTrue(receivedQuery)
619 self.assertTrue(receivedResponse)
620 receivedQuery.id = query.id
621 self.checkQueryNoEDNS(query, receivedQuery)
622 self.checkResponseNoEDNS(response, receivedResponse)
623
624 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
625 self.assertTrue(receivedQuery)
626 self.assertTrue(receivedResponse)
627 receivedQuery.id = query.id
628 self.checkQueryNoEDNS(query, receivedQuery)
629 self.checkResponseNoEDNS(response, receivedResponse)
630
631 def testWithECSDisabledViaLua(self):
632 """
633 ECS Disable: ECS enabled in the backend, but disabled via Lua
634 """
635 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
636 query = dns.message.make_query(name, 'A', 'IN')
637 response = dns.message.make_response(query)
638 rrset = dns.rrset.from_text(name,
639 3600,
640 dns.rdataclass.IN,
641 dns.rdatatype.A,
642 '127.0.0.1')
643 response.answer.append(rrset)
644
645 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
646 self.assertTrue(receivedQuery)
647 self.assertTrue(receivedResponse)
648 receivedQuery.id = query.id
649 self.checkQueryNoEDNS(query, receivedQuery)
650 self.checkResponseNoEDNS(response, receivedResponse)
651
652 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
653 self.assertTrue(receivedQuery)
654 self.assertTrue(receivedResponse)
655 receivedQuery.id = query.id
656 self.checkQueryNoEDNS(query, receivedQuery)
657 self.checkResponseNoEDNS(response, receivedResponse)
658
5df86a8a 659class TestECSOverrideSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
660 """
661 dnsdist is configured to set the EDNS0 Client Subnet
662 option without overriding an existing one, but we
663 force the overriding via ECSOverrideAction() or Lua.
664 """
665
666 _config_template = """
667 setECSOverride(false)
668 setECSSourcePrefixV4(24)
669 setECSSourcePrefixV6(56)
670 newServer{address="127.0.0.1:%s", useClientSubnet=true}
671 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
672 function overrideECSViaLua(dq)
673 dq.ecsOverride = true
674 return DNSAction.None, ""
675 end
a2ff35e3 676 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
ff0902ec
RG
677 """
678
679 def testWithECSOverrideNotSet(self):
680 """
681 ECS Override: not set via Lua or a rule
682 """
683 name = 'notoverridden.ecsrules.tests.powerdns.com.'
684 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
685 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
686 response = dns.message.make_response(query)
687 response.use_edns(edns=True, payload=4096, options=[ecso])
688 rrset = dns.rrset.from_text(name,
689 3600,
690 dns.rdataclass.IN,
691 dns.rdatatype.A,
692 '127.0.0.1')
693 response.answer.append(rrset)
694
695 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
696 self.assertTrue(receivedQuery)
697 self.assertTrue(receivedResponse)
698 receivedQuery.id = query.id
699 self.checkQueryEDNSWithECS(query, receivedQuery)
700 self.checkResponseEDNSWithECS(response, receivedResponse)
701
702 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
703 self.assertTrue(receivedQuery)
704 self.assertTrue(receivedResponse)
705 receivedQuery.id = query.id
706 self.checkQueryEDNSWithECS(query, receivedQuery)
707 self.checkResponseEDNSWithECS(response, receivedResponse)
708
709 def testWithECSOverrideSetViaRule(self):
710 """
711 ECS Override: set with a rule
712 """
713 name = 'overridden.ecsrules.tests.powerdns.com.'
714 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
715 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
716 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
717 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
718 response = dns.message.make_response(query)
719 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
720 rrset = dns.rrset.from_text(name,
721 3600,
722 dns.rdataclass.IN,
723 dns.rdatatype.A,
724 '127.0.0.1')
725 response.answer.append(rrset)
726
727 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
728 self.assertTrue(receivedQuery)
729 self.assertTrue(receivedResponse)
730 receivedQuery.id = expectedQuery.id
731 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
732 self.checkResponseEDNSWithECS(response, receivedResponse)
733
734 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
735 self.assertTrue(receivedQuery)
736 self.assertTrue(receivedResponse)
737 receivedQuery.id = expectedQuery.id
738 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
739 self.checkResponseEDNSWithECS(response, receivedResponse)
740
741 def testWithECSOverrideSetViaLua(self):
742 """
743 ECS Override: set via Lua
744 """
745 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
7b146b98 746 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
ca404e94
RG
747 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
748 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
749 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
750 response = dns.message.make_response(query)
ff0902ec 751 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
ca404e94
RG
752 rrset = dns.rrset.from_text(name,
753 3600,
754 dns.rdataclass.IN,
755 dns.rdatatype.A,
756 '127.0.0.1')
757 response.answer.append(rrset)
758
759 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
760 self.assertTrue(receivedQuery)
761 self.assertTrue(receivedResponse)
762 receivedQuery.id = expectedQuery.id
ff0902ec
RG
763 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
764 self.checkResponseEDNSWithECS(response, receivedResponse)
765
766 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
767 self.assertTrue(receivedQuery)
768 self.assertTrue(receivedResponse)
769 receivedQuery.id = expectedQuery.id
770 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
771 self.checkResponseEDNSWithECS(response, receivedResponse)
772
5df86a8a 773class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
774 """
775 dnsdist is configured to set the EDNS0 Client Subnet
776 option with a prefix length of 24 for IPv4 and 56 for IPv6,
777 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
778 """
779
780 _config_template = """
781 setECSOverride(false)
782 setECSSourcePrefixV4(24)
783 setECSSourcePrefixV6(56)
784 newServer{address="127.0.0.1:%s", useClientSubnet=true}
785 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
786 function overrideECSPrefixLengthViaLua(dq)
787 dq.ecsPrefixLength = 32
788 return DNSAction.None, ""
789 end
a2ff35e3 790 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
ff0902ec
RG
791 """
792
793 def testWithECSPrefixLengthNotOverridden(self):
794 """
795 ECS Prefix Length: not overridden via Lua or a rule
796 """
797 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
798 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
799 query = dns.message.make_query(name, 'A', 'IN')
800 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
801 response = dns.message.make_response(query)
802 response.use_edns(edns=True, payload=4096, options=[ecso])
803 rrset = dns.rrset.from_text(name,
804 3600,
805 dns.rdataclass.IN,
806 dns.rdatatype.A,
807 '127.0.0.1')
808 response.answer.append(rrset)
809 expectedResponse = dns.message.make_response(query)
810 expectedResponse.answer.append(rrset)
811
812 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
813 self.assertTrue(receivedQuery)
814 self.assertTrue(receivedResponse)
815 receivedQuery.id = expectedQuery.id
816 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
817 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
818
819 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
820 self.assertTrue(receivedQuery)
821 self.assertTrue(receivedResponse)
822 receivedQuery.id = expectedQuery.id
823 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
824 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
825
826 def testWithECSPrefixLengthOverriddenViaRule(self):
827 """
828 ECS Prefix Length: overridden with a rule
829 """
830 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
831 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
832 query = dns.message.make_query(name, 'A', 'IN')
833 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
834 response = dns.message.make_response(expectedQuery)
835 rrset = dns.rrset.from_text(name,
836 3600,
837 dns.rdataclass.IN,
838 dns.rdatatype.A,
839 '127.0.0.1')
840 response.answer.append(rrset)
841 expectedResponse = dns.message.make_response(query)
842 expectedResponse.answer.append(rrset)
843
844 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
845 self.assertTrue(receivedQuery)
846 self.assertTrue(receivedResponse)
847 receivedQuery.id = expectedQuery.id
848 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
849 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
850
851 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
852 self.assertTrue(receivedQuery)
853 self.assertTrue(receivedResponse)
854 receivedQuery.id = expectedQuery.id
855 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
856 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
857
858 def testWithECSPrefixLengthOverriddenViaLua(self):
859 """
860 ECS Prefix Length: overridden via Lua
861 """
862 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
863 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
864 query = dns.message.make_query(name, 'A', 'IN')
865 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
866 response = dns.message.make_response(expectedQuery)
867 rrset = dns.rrset.from_text(name,
868 3600,
869 dns.rdataclass.IN,
870 dns.rdatatype.A,
871 '127.0.0.1')
872 response.answer.append(rrset)
873 expectedResponse = dns.message.make_response(query)
874 expectedResponse.answer.append(rrset)
875
876 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
877 self.assertTrue(receivedQuery)
878 self.assertTrue(receivedResponse)
879 receivedQuery.id = expectedQuery.id
880 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
881 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
882
883 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
884 self.assertTrue(receivedQuery)
885 self.assertTrue(receivedResponse)
886 receivedQuery.id = expectedQuery.id
ff0902ec
RG
887 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
888 self.checkResponseNoEDNS(expectedResponse, receivedResponse)