]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #8115 from rgacogne/dnsdist-ecs-before-tsig
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
CommitLineData
ca404e94 1#!/usr/bin/env python
b1bec9f0
RG
2import dns
3import clientsubnetoption
ff73f02b 4import cookiesoption
ca404e94 5from dnsdisttests import DNSDistTest
d83feb68 6from datetime import datetime, timedelta
ca404e94 7
5df86a8a 8class TestEdnsClientSubnetNoOverride(DNSDistTest):
ca404e94 9 """
ec5f5c6b 10 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
11 option, but only if it's not already present in the
12 original query.
13 """
14
ca404e94
RG
15 _config_template = """
16 truncateTC(true)
ca404e94
RG
17 newServer{address="127.0.0.1:%s", useClientSubnet=true}
18 """
19
ca404e94
RG
20 def testWithoutEDNS(self):
21 """
617dfe22
RG
22 ECS: No existing EDNS
23
ca404e94
RG
24 Send a query without EDNS, check that the query
25 received by the responder has the correct ECS value
26 and that the response received from dnsdist does not
27 have an EDNS pseudo-RR.
28 """
29 name = 'withoutedns.ecs.tests.powerdns.com.'
30 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
31 query = dns.message.make_query(name, 'A', 'IN')
32 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
33 response = dns.message.make_response(expectedQuery)
34 expectedResponse = dns.message.make_response(query)
35 rrset = dns.rrset.from_text(name,
36 3600,
37 dns.rdataclass.IN,
38 dns.rdatatype.A,
39 '127.0.0.1')
40 response.answer.append(rrset)
41 expectedResponse.answer.append(rrset)
42
6ca2e796
RG
43 for method in ("sendUDPQuery", "sendTCPQuery"):
44 sender = getattr(self, method)
45 (receivedQuery, receivedResponse) = sender(query, response)
46 self.assertTrue(receivedQuery)
47 self.assertTrue(receivedResponse)
48 receivedQuery.id = expectedQuery.id
49 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
50 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
51
52 def testWithEDNSNoECS(self):
53 """
617dfe22
RG
54 ECS: Existing EDNS without ECS
55
ca404e94
RG
56 Send a query with EDNS but no ECS value.
57 Check that the query received by the responder
58 has a valid ECS value and that the response
59 received from dnsdist contains an EDNS pseudo-RR.
60 """
61 name = 'withednsnoecs.ecs.tests.powerdns.com.'
62 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
63 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
64 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
65 response = dns.message.make_response(expectedQuery)
66 expectedResponse = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73 expectedResponse.answer.append(rrset)
74
6ca2e796
RG
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = expectedQuery.id
81 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
82 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94
RG
83
84 def testWithEDNSECS(self):
85 """
617dfe22
RG
86 ECS: Existing EDNS with ECS
87
ca404e94
RG
88 Send a query with EDNS and a crafted ECS value.
89 Check that the query received by the responder
90 has the initial ECS value (not overwritten)
91 and that the response received from dnsdist contains
92 an EDNS pseudo-RR.
93 """
94 name = 'withednsecs.ecs.tests.powerdns.com.'
95 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
96 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
97 response = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '127.0.0.1')
103 response.answer.append(rrset)
104
ff0902ec 105
6ca2e796
RG
106 for method in ("sendUDPQuery", "sendTCPQuery"):
107 sender = getattr(self, method)
108 (receivedQuery, receivedResponse) = sender(query, response)
109 self.assertTrue(receivedQuery)
110 self.assertTrue(receivedResponse)
111 receivedQuery.id = query.id
112 self.checkQueryEDNSWithECS(query, receivedQuery)
113 self.checkResponseEDNSWithoutECS(response, receivedResponse)
ff73f02b
RG
114
115 def testWithoutEDNSResponseWithECS(self):
116 """
117 ECS: No existing EDNS (BE returning ECS)
118
119 Send a query without EDNS, check that the query
120 received by the responder has the correct ECS value
121 and that the response received from dnsdist does not
122 have an EDNS pseudo-RR.
123 This time the response returned by the backend contains
124 an ECS option with scope set.
125 """
126 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
128 query = dns.message.make_query(name, 'A', 'IN')
129 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
130 response = dns.message.make_response(expectedQuery)
131 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
132 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
133 expectedResponse = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 3600,
136 dns.rdataclass.IN,
137 dns.rdatatype.A,
138 '127.0.0.1')
139 response.answer.append(rrset)
140 expectedResponse.answer.append(rrset)
141
6ca2e796
RG
142 for method in ("sendUDPQuery", "sendTCPQuery"):
143 sender = getattr(self, method)
144 (receivedQuery, receivedResponse) = sender(query, response)
145 self.assertTrue(receivedQuery)
146 self.assertTrue(receivedResponse)
147 receivedQuery.id = expectedQuery.id
148 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
149 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff73f02b
RG
150
151 def testWithEDNSNoECSResponseWithECS(self):
152 """
153 ECS: Existing EDNS without ECS (BE returning only the ECS option)
154
155 Send a query with EDNS but no ECS value.
156 Check that the query received by the responder
157 has a valid ECS value and that the response
158 received from dnsdist contains an EDNS pseudo-RR.
159 This time the response returned by the backend contains
160 an ECS option with scope set.
161 """
162 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
163 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
165 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
166 response = dns.message.make_response(expectedQuery)
167 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
168 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
d70d5ad3 169 expectedResponse = dns.message.make_response(query, our_payload=4096)
ff73f02b
RG
170 rrset = dns.rrset.from_text(name,
171 3600,
172 dns.rdataclass.IN,
173 dns.rdatatype.A,
174 '127.0.0.1')
175 response.answer.append(rrset)
176 expectedResponse.answer.append(rrset)
177
6ca2e796
RG
178 for method in ("sendUDPQuery", "sendTCPQuery"):
179 sender = getattr(self, method)
180 (receivedQuery, receivedResponse) = sender(query, response)
181 self.assertTrue(receivedQuery)
182 self.assertTrue(receivedResponse)
183 receivedQuery.id = expectedQuery.id
184 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
185 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ff73f02b
RG
186
187 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
188 """
189 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
190
191 Send a query with EDNS but no ECS value.
192 Check that the query received by the responder
193 has a valid ECS value and that the response
194 received from dnsdist contains an EDNS pseudo-RR.
195 This time the response returned by the backend contains
196 one cookies then one ECS option.
197 """
198 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
199 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
201 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
202 response = dns.message.make_response(expectedQuery)
b4f23783 203 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
204 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
205 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
206 expectedResponse = dns.message.make_response(query)
be90d6bd 207 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b
RG
208 rrset = dns.rrset.from_text(name,
209 3600,
210 dns.rdataclass.IN,
211 dns.rdatatype.A,
212 '127.0.0.1')
213 response.answer.append(rrset)
214 expectedResponse.answer.append(rrset)
ff0902ec 215 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b 216
6ca2e796
RG
217 for method in ("sendUDPQuery", "sendTCPQuery"):
218 sender = getattr(self, method)
219 (receivedQuery, receivedResponse) = sender(query, response)
220 self.assertTrue(receivedQuery)
221 self.assertTrue(receivedResponse)
222 receivedQuery.id = expectedQuery.id
223 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
224 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
225
226 def testWithEDNSNoECSResponseWithECSThenCookies(self):
227 """
228 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
229
230 Send a query with EDNS but no ECS value.
231 Check that the query received by the responder
232 has a valid ECS value and that the response
233 received from dnsdist contains an EDNS pseudo-RR.
234 This time the response returned by the backend contains
235 one ECS then one Cookies option.
236 """
237 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
238 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
239 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
240 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
241 response = dns.message.make_response(expectedQuery)
b4f23783 242 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
243 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
244 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
d70d5ad3 245 expectedResponse = dns.message.make_response(query, our_payload=4096)
be90d6bd 246 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b
RG
247 rrset = dns.rrset.from_text(name,
248 3600,
249 dns.rdataclass.IN,
250 dns.rdatatype.A,
251 '127.0.0.1')
252 response.answer.append(rrset)
253 expectedResponse.answer.append(rrset)
ff0902ec 254 response.use_edns(edns=True, payload=4096, options=[ecoResponse])
ff73f02b 255
6ca2e796
RG
256 for method in ("sendUDPQuery", "sendTCPQuery"):
257 sender = getattr(self, method)
258 (receivedQuery, receivedResponse) = sender(query, response)
259 self.assertTrue(receivedQuery)
260 self.assertTrue(receivedResponse)
261 receivedQuery.id = expectedQuery.id
262 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
263 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=1)
ff73f02b
RG
264
265 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
266 """
267 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
268
269 Send a query with EDNS but no ECS value.
270 Check that the query received by the responder
271 has a valid ECS value and that the response
272 received from dnsdist contains an EDNS pseudo-RR.
273 This time the response returned by the backend contains
274 one Cookies, one ECS then one Cookies option.
275 """
276 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
277 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
278 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
279 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
280 response = dns.message.make_response(expectedQuery)
b4f23783 281 ecoResponse = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
ff73f02b
RG
282 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
283 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
d70d5ad3 284 expectedResponse = dns.message.make_response(query, our_payload=4096)
be90d6bd 285 expectedResponse.use_edns(edns=True, payload=4096, options=[ecoResponse, ecoResponse])
ff73f02b
RG
286 rrset = dns.rrset.from_text(name,
287 3600,
288 dns.rdataclass.IN,
289 dns.rdatatype.A,
290 '127.0.0.1')
291 response.answer.append(rrset)
292 expectedResponse.answer.append(rrset)
293
6ca2e796
RG
294 for method in ("sendUDPQuery", "sendTCPQuery"):
295 sender = getattr(self, method)
296 (receivedQuery, receivedResponse) = sender(query, response)
297 self.assertTrue(receivedQuery)
298 self.assertTrue(receivedResponse)
299 receivedQuery.id = expectedQuery.id
300 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
301 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
ca404e94 302
5df86a8a 303class TestEdnsClientSubnetOverride(DNSDistTest):
ca404e94 304 """
ec5f5c6b 305 dnsdist is configured to add the EDNS0 Client Subnet
ca404e94
RG
306 option, overwriting any existing value.
307 """
308
ca404e94
RG
309 _config_template = """
310 truncateTC(true)
ca404e94
RG
311 setECSOverride(true)
312 setECSSourcePrefixV4(24)
313 setECSSourcePrefixV6(56)
314 newServer{address="127.0.0.1:%s", useClientSubnet=true}
315 """
316
ca404e94
RG
317 def testWithoutEDNS(self):
318 """
617dfe22
RG
319 ECS Override: No existing EDNS
320
ca404e94
RG
321 Send a query without EDNS, check that the query
322 received by the responder has the correct ECS value
323 and that the response received from dnsdist does not
324 have an EDNS pseudo-RR.
325 """
ff0902ec 326 name = 'withoutedns.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
327 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
328 query = dns.message.make_query(name, 'A', 'IN')
329 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
330 response = dns.message.make_response(expectedQuery)
ff0902ec 331 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
332 rrset = dns.rrset.from_text(name,
333 3600,
334 dns.rdataclass.IN,
335 dns.rdatatype.A,
336 '127.0.0.1')
337 response.answer.append(rrset)
ff0902ec 338 expectedResponse = dns.message.make_response(query)
ca404e94
RG
339 expectedResponse.answer.append(rrset)
340
6ca2e796
RG
341 for method in ("sendUDPQuery", "sendTCPQuery"):
342 sender = getattr(self, method)
343 (receivedQuery, receivedResponse) = sender(query, response)
344 self.assertTrue(receivedQuery)
345 self.assertTrue(receivedResponse)
346 receivedQuery.id = expectedQuery.id
347 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
348 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ca404e94
RG
349
350 def testWithEDNSNoECS(self):
351 """
617dfe22
RG
352 ECS Override: Existing EDNS without ECS
353
ca404e94
RG
354 Send a query with EDNS but no ECS value.
355 Check that the query received by the responder
356 has a valid ECS value and that the response
357 received from dnsdist contains an EDNS pseudo-RR.
358 """
ff0902ec 359 name = 'withednsnoecs.overridden.ecs.tests.powerdns.com.'
ca404e94
RG
360 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
361 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
362 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
363 response = dns.message.make_response(expectedQuery)
ff0902ec 364 response.use_edns(edns=True, payload=4096, options=[ecso])
ca404e94
RG
365 rrset = dns.rrset.from_text(name,
366 3600,
367 dns.rdataclass.IN,
368 dns.rdatatype.A,
369 '127.0.0.1')
370 response.answer.append(rrset)
d70d5ad3 371 expectedResponse = dns.message.make_response(query, our_payload=4096)
ca404e94
RG
372 expectedResponse.answer.append(rrset)
373
6ca2e796
RG
374 for method in ("sendUDPQuery", "sendTCPQuery"):
375 sender = getattr(self, method)
376 (receivedQuery, receivedResponse) = sender(query, response)
377 self.assertTrue(receivedQuery)
378 self.assertTrue(receivedResponse)
379 receivedQuery.id = expectedQuery.id
380 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
381 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse)
ca404e94 382
7b146b98 383 def testWithEDNSShorterInitialECS(self):
ca404e94 384 """
617dfe22
RG
385 ECS Override: Existing EDNS with ECS (short)
386
ca404e94
RG
387 Send a query with EDNS and a crafted ECS value.
388 Check that the query received by the responder
389 has an overwritten ECS value (not the initial one)
390 and that the response received from dnsdist contains
391 an EDNS pseudo-RR.
ff0902ec 392 The initial ECS value is shorter than the one it will be
7b146b98 393 replaced with.
ca404e94 394 """
ff0902ec 395 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
396 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
397 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
398 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
399 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
400 response = dns.message.make_response(query)
ff0902ec 401 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
402 rrset = dns.rrset.from_text(name,
403 3600,
404 dns.rdataclass.IN,
405 dns.rdatatype.A,
406 '127.0.0.1')
407 response.answer.append(rrset)
408
6ca2e796
RG
409 for method in ("sendUDPQuery", "sendTCPQuery"):
410 sender = getattr(self, method)
411 (receivedQuery, receivedResponse) = sender(query, response)
412 self.assertTrue(receivedQuery)
413 self.assertTrue(receivedResponse)
414 receivedQuery.id = expectedQuery.id
415 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
416 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
417
418 def testWithEDNSLongerInitialECS(self):
419 """
617dfe22
RG
420 ECS Override: Existing EDNS with ECS (long)
421
7b146b98
RG
422 Send a query with EDNS and a crafted ECS value.
423 Check that the query received by the responder
424 has an overwritten ECS value (not the initial one)
425 and that the response received from dnsdist contains
426 an EDNS pseudo-RR.
427 The initial ECS value is longer than the one it will
428 replaced with.
429 """
ff0902ec 430 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
7b146b98
RG
431 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
432 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
433 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
434 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
435 response = dns.message.make_response(query)
ff0902ec 436 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
7b146b98
RG
437 rrset = dns.rrset.from_text(name,
438 3600,
439 dns.rdataclass.IN,
440 dns.rdatatype.A,
441 '127.0.0.1')
442 response.answer.append(rrset)
443
6ca2e796
RG
444 for method in ("sendUDPQuery", "sendTCPQuery"):
445 sender = getattr(self, method)
446 (receivedQuery, receivedResponse) = sender(query, response)
447 self.assertTrue(receivedQuery)
448 self.assertTrue(receivedResponse)
449 receivedQuery.id = expectedQuery.id
450 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
451 self.checkResponseEDNSWithECS(response, receivedResponse)
7b146b98
RG
452
453 def testWithEDNSSameSizeInitialECS(self):
454 """
617dfe22
RG
455 ECS Override: Existing EDNS with ECS (same)
456
7b146b98
RG
457 Send a query with EDNS and a crafted ECS value.
458 Check that the query received by the responder
459 has an overwritten ECS value (not the initial one)
460 and that the response received from dnsdist contains
461 an EDNS pseudo-RR.
462 The initial ECS value is exactly the same size as
463 the one it will replaced with.
464 """
ff0902ec
RG
465 name = 'withednsecs.overridden.ecs.tests.powerdns.com.'
466 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
467 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
468 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
469 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
470 response = dns.message.make_response(query)
471 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
472 rrset = dns.rrset.from_text(name,
473 3600,
474 dns.rdataclass.IN,
475 dns.rdatatype.A,
476 '127.0.0.1')
477 response.answer.append(rrset)
478
6ca2e796
RG
479 for method in ("sendUDPQuery", "sendTCPQuery"):
480 sender = getattr(self, method)
481 (receivedQuery, receivedResponse) = sender(query, response)
482 self.assertTrue(receivedQuery)
483 self.assertTrue(receivedResponse)
484 receivedQuery.id = expectedQuery.id
485 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
486 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec 487
be90d6bd
RG
488 def testWithECSFollowedByAnother(self):
489 """
490 ECS: Existing EDNS with ECS, followed by another record
491
492 Send a query with EDNS and an existing ECS value.
493 The OPT record is not the last one in the query
494 and is followed by another one.
495 Check that the query received by the responder
496 has a valid ECS value and that the response
497 received from dnsdist contains an EDNS pseudo-RR.
498 """
499 name = 'withecs-followedbyanother.ecs.tests.powerdns.com.'
500 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
501 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
502 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
503 rrset = dns.rrset.from_text(name,
504 3600,
505 dns.rdataclass.IN,
506 dns.rdatatype.A,
507 '127.0.0.1')
508
509 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
510 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
511 # it while parsing the message in the receiver :-/
512 query.additional.append(rrset)
513 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
514 expectedQuery.additional.append(rrset)
515
516 response = dns.message.make_response(expectedQuery)
517 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
518 expectedResponse = dns.message.make_response(query)
519 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
520 response.answer.append(rrset)
521 response.additional.append(rrset)
522 expectedResponse.answer.append(rrset)
f8a01c44
RG
523 expectedResponse.additional.append(rrset)
524
525 for method in ("sendUDPQuery", "sendTCPQuery"):
526 sender = getattr(self, method)
527 (receivedQuery, receivedResponse) = sender(query, response)
528 self.assertTrue(receivedQuery)
529 self.assertTrue(receivedResponse)
530 receivedQuery.id = expectedQuery.id
531 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
532 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
533
534 def testWithAnswerThenECS(self):
535 """
536 ECS: Record in answer followed by an existing EDNS with ECS
537
538 Send a query with a record in the answer section, EDNS and an existing ECS value.
539 Check that the query received by the responder
540 has a valid ECS value and that the response
541 received from dnsdist contains an EDNS pseudo-RR.
542 """
543 name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
544 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
545 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
546 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
547 rrset = dns.rrset.from_text(name,
548 3600,
549 dns.rdataclass.IN,
550 dns.rdatatype.A,
551 '127.0.0.1')
552
553 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
554 query.answer.append(rrset)
555 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
556 expectedQuery.answer.append(rrset)
557
558 response = dns.message.make_response(expectedQuery)
559 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
560 expectedResponse = dns.message.make_response(query)
561 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
562 response.answer.append(rrset)
563 response.additional.append(rrset)
564 expectedResponse.answer.append(rrset)
565 expectedResponse.additional.append(rrset)
566
567 for method in ("sendUDPQuery", "sendTCPQuery"):
568 sender = getattr(self, method)
569 (receivedQuery, receivedResponse) = sender(query, response)
570 self.assertTrue(receivedQuery)
571 self.assertTrue(receivedResponse)
572 receivedQuery.id = expectedQuery.id
573 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
574 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
575
576 def testWithAuthThenECS(self):
577 """
578 ECS: Record in authority followed by an existing EDNS with ECS
579
580 Send a query with a record in the authority section, EDNS and an existing ECS value.
581 Check that the query received by the responder
582 has a valid ECS value and that the response
583 received from dnsdist contains an EDNS pseudo-RR.
584 """
585 name = 'record-in-an-withecs.ecs.tests.powerdns.com.'
586 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
587 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
588 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
589 rrset = dns.rrset.from_text(name,
590 3600,
591 dns.rdataclass.IN,
592 dns.rdatatype.A,
593 '127.0.0.1')
594
595 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,ecso,eco])
596 query.authority.append(rrset)
597 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,eco,rewrittenEcso])
598 expectedQuery.authority.append(rrset)
599
600 response = dns.message.make_response(expectedQuery)
601 response.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
602 expectedResponse = dns.message.make_response(query)
603 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, ecso, eco])
604 response.answer.append(rrset)
605 response.additional.append(rrset)
606 expectedResponse.answer.append(rrset)
be90d6bd
RG
607 expectedResponse.additional.append(rrset)
608
609 for method in ("sendUDPQuery", "sendTCPQuery"):
610 sender = getattr(self, method)
611 (receivedQuery, receivedResponse) = sender(query, response)
612 self.assertTrue(receivedQuery)
613 self.assertTrue(receivedResponse)
614 receivedQuery.id = expectedQuery.id
615 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 2)
616 self.checkResponseEDNSWithECS(expectedResponse, receivedResponse, 2)
617
618 def testWithEDNSNoECSFollowedByAnother(self):
619 """
620 ECS: Existing EDNS without ECS, followed by another record
621
622 Send a query with EDNS but no ECS value.
623 The OPT record is not the last one in the query
624 and is followed by another one.
625 Check that the query received by the responder
626 has a valid ECS value and that the response
627 received from dnsdist contains an EDNS pseudo-RR.
628 """
629 name = 'withedns-no-ecs-followedbyanother.ecs.tests.powerdns.com.'
630 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
631 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
632 rrset = dns.rrset.from_text(name,
633 3600,
634 dns.rdataclass.IN,
635 dns.rdatatype.A,
636 '127.0.0.1')
637
638 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
639 # I would have loved to use a TSIG here but I can't find how to make dnspython ignore
640 # it while parsing the message in the receiver :-/
641 query.additional.append(rrset)
642 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco,rewrittenEcso])
643 expectedQuery.additional.append(rrset)
644
645 response = dns.message.make_response(expectedQuery)
646 response.use_edns(edns=True, payload=4096, options=[eco, rewrittenEcso, eco])
647 expectedResponse = dns.message.make_response(query)
648 expectedResponse.use_edns(edns=True, payload=4096, options=[eco, eco])
649 response.answer.append(rrset)
650 response.additional.append(rrset)
651 expectedResponse.answer.append(rrset)
652 expectedResponse.additional.append(rrset)
653
654 for method in ("sendUDPQuery", "sendTCPQuery"):
655 sender = getattr(self, method)
656 (receivedQuery, receivedResponse) = sender(query, response)
657 self.assertTrue(receivedQuery)
658 self.assertTrue(receivedResponse)
659 receivedQuery.id = expectedQuery.id
660 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
661 self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, 2)
662
5df86a8a 663class TestECSDisabledByRuleOrLua(DNSDistTest):
ff0902ec
RG
664 """
665 dnsdist is configured to add the EDNS0 Client Subnet
666 option, but we disable it via DisableECSAction()
667 or Lua.
668 """
669
670 _config_template = """
671 setECSOverride(false)
672 setECSSourcePrefixV4(16)
673 setECSSourcePrefixV6(16)
674 newServer{address="127.0.0.1:%s", useClientSubnet=true}
675 addAction(makeRule("disabled.ecsrules.tests.powerdns.com."), DisableECSAction())
676 function disableECSViaLua(dq)
677 dq.useECS = false
678 return DNSAction.None, ""
679 end
a2ff35e3 680 addAction("disabledvialua.ecsrules.tests.powerdns.com.", LuaAction(disableECSViaLua))
ff0902ec
RG
681 """
682
683 def testWithECSNotDisabled(self):
684 """
685 ECS Disable: ECS enabled in the backend
686 """
687 name = 'notdisabled.ecsrules.tests.powerdns.com.'
688 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 16)
689 query = dns.message.make_query(name, 'A', 'IN')
690 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
691 response = dns.message.make_response(expectedQuery)
692 expectedResponse = dns.message.make_response(query)
693 rrset = dns.rrset.from_text(name,
694 3600,
695 dns.rdataclass.IN,
696 dns.rdatatype.AAAA,
697 '::1')
698 response.answer.append(rrset)
699 expectedResponse.answer.append(rrset)
700
6ca2e796
RG
701 for method in ("sendUDPQuery", "sendTCPQuery"):
702 sender = getattr(self, method)
703 (receivedQuery, receivedResponse) = sender(query, response)
704 self.assertTrue(receivedQuery)
705 self.assertTrue(receivedResponse)
706 receivedQuery.id = expectedQuery.id
707 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
708 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
709
710 def testWithECSDisabledViaRule(self):
711 """
712 ECS Disable: ECS enabled in the backend, but disabled by a rule
713 """
714 name = 'disabled.ecsrules.tests.powerdns.com.'
715 query = dns.message.make_query(name, 'A', 'IN')
716 response = dns.message.make_response(query)
717 rrset = dns.rrset.from_text(name,
718 3600,
719 dns.rdataclass.IN,
720 dns.rdatatype.A,
721 '127.0.0.1')
722 response.answer.append(rrset)
723
6ca2e796
RG
724 for method in ("sendUDPQuery", "sendTCPQuery"):
725 sender = getattr(self, method)
726 (receivedQuery, receivedResponse) = sender(query, response)
727 self.assertTrue(receivedQuery)
728 self.assertTrue(receivedResponse)
729 receivedQuery.id = query.id
730 self.checkQueryNoEDNS(query, receivedQuery)
731 self.checkResponseNoEDNS(response, receivedResponse)
ff0902ec
RG
732
733 def testWithECSDisabledViaLua(self):
734 """
735 ECS Disable: ECS enabled in the backend, but disabled via Lua
736 """
737 name = 'disabledvialua.ecsrules.tests.powerdns.com.'
738 query = dns.message.make_query(name, 'A', 'IN')
739 response = dns.message.make_response(query)
740 rrset = dns.rrset.from_text(name,
741 3600,
742 dns.rdataclass.IN,
743 dns.rdatatype.A,
744 '127.0.0.1')
745 response.answer.append(rrset)
746
6ca2e796
RG
747 for method in ("sendUDPQuery", "sendTCPQuery"):
748 sender = getattr(self, method)
749 (receivedQuery, receivedResponse) = sender(query, response)
750 self.assertTrue(receivedQuery)
751 self.assertTrue(receivedResponse)
752 receivedQuery.id = query.id
753 self.checkQueryNoEDNS(query, receivedQuery)
754 self.checkResponseNoEDNS(response, receivedResponse)
ff0902ec 755
5df86a8a 756class TestECSOverrideSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
757 """
758 dnsdist is configured to set the EDNS0 Client Subnet
759 option without overriding an existing one, but we
760 force the overriding via ECSOverrideAction() or Lua.
761 """
762
763 _config_template = """
764 setECSOverride(false)
765 setECSSourcePrefixV4(24)
766 setECSSourcePrefixV6(56)
767 newServer{address="127.0.0.1:%s", useClientSubnet=true}
768 addAction(makeRule("overridden.ecsrules.tests.powerdns.com."), ECSOverrideAction(true))
769 function overrideECSViaLua(dq)
770 dq.ecsOverride = true
771 return DNSAction.None, ""
772 end
a2ff35e3 773 addAction("overriddenvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSViaLua))
ff0902ec
RG
774 """
775
776 def testWithECSOverrideNotSet(self):
777 """
778 ECS Override: not set via Lua or a rule
779 """
780 name = 'notoverridden.ecsrules.tests.powerdns.com.'
781 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
782 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
783 response = dns.message.make_response(query)
784 response.use_edns(edns=True, payload=4096, options=[ecso])
785 rrset = dns.rrset.from_text(name,
786 3600,
787 dns.rdataclass.IN,
788 dns.rdatatype.A,
789 '127.0.0.1')
790 response.answer.append(rrset)
791
6ca2e796
RG
792 for method in ("sendUDPQuery", "sendTCPQuery"):
793 sender = getattr(self, method)
794 (receivedQuery, receivedResponse) = sender(query, response)
795 self.assertTrue(receivedQuery)
796 self.assertTrue(receivedResponse)
797 receivedQuery.id = query.id
798 self.checkQueryEDNSWithECS(query, receivedQuery)
799 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec
RG
800
801 def testWithECSOverrideSetViaRule(self):
802 """
803 ECS Override: set with a rule
804 """
805 name = 'overridden.ecsrules.tests.powerdns.com.'
806 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
807 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
808 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
809 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
810 response = dns.message.make_response(query)
811 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
812 rrset = dns.rrset.from_text(name,
813 3600,
814 dns.rdataclass.IN,
815 dns.rdatatype.A,
816 '127.0.0.1')
817 response.answer.append(rrset)
818
6ca2e796
RG
819 for method in ("sendUDPQuery", "sendTCPQuery"):
820 sender = getattr(self, method)
821 (receivedQuery, receivedResponse) = sender(query, response)
822 self.assertTrue(receivedQuery)
823 self.assertTrue(receivedResponse)
824 receivedQuery.id = expectedQuery.id
825 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
826 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec
RG
827
828 def testWithECSOverrideSetViaLua(self):
829 """
830 ECS Override: set via Lua
831 """
832 name = 'overriddenvialua.ecsrules.tests.powerdns.com.'
7b146b98 833 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
ca404e94
RG
834 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
835 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
836 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
837 response = dns.message.make_response(query)
ff0902ec 838 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
ca404e94
RG
839 rrset = dns.rrset.from_text(name,
840 3600,
841 dns.rdataclass.IN,
842 dns.rdatatype.A,
843 '127.0.0.1')
844 response.answer.append(rrset)
845
6ca2e796
RG
846 for method in ("sendUDPQuery", "sendTCPQuery"):
847 sender = getattr(self, method)
848 (receivedQuery, receivedResponse) = sender(query, response)
849 self.assertTrue(receivedQuery)
850 self.assertTrue(receivedResponse)
851 receivedQuery.id = expectedQuery.id
852 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
853 self.checkResponseEDNSWithECS(response, receivedResponse)
ff0902ec 854
5df86a8a 855class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
ff0902ec
RG
856 """
857 dnsdist is configured to set the EDNS0 Client Subnet
858 option with a prefix length of 24 for IPv4 and 56 for IPv6,
859 but we override that to 32 and 128 via ECSPrefixLengthAction() or Lua.
860 """
861
862 _config_template = """
863 setECSOverride(false)
864 setECSSourcePrefixV4(24)
865 setECSSourcePrefixV6(56)
866 newServer{address="127.0.0.1:%s", useClientSubnet=true}
867 addAction(makeRule("overriddenprefixlength.ecsrules.tests.powerdns.com."), ECSPrefixLengthAction(32, 128))
868 function overrideECSPrefixLengthViaLua(dq)
869 dq.ecsPrefixLength = 32
870 return DNSAction.None, ""
871 end
a2ff35e3 872 addAction("overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.", LuaAction(overrideECSPrefixLengthViaLua))
ff0902ec
RG
873 """
874
875 def testWithECSPrefixLengthNotOverridden(self):
876 """
877 ECS Prefix Length: not overridden via Lua or a rule
878 """
879 name = 'notoverriddenprefixlength.ecsrules.tests.powerdns.com.'
880 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
881 query = dns.message.make_query(name, 'A', 'IN')
882 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
883 response = dns.message.make_response(query)
884 response.use_edns(edns=True, payload=4096, options=[ecso])
885 rrset = dns.rrset.from_text(name,
886 3600,
887 dns.rdataclass.IN,
888 dns.rdatatype.A,
889 '127.0.0.1')
890 response.answer.append(rrset)
891 expectedResponse = dns.message.make_response(query)
892 expectedResponse.answer.append(rrset)
893
6ca2e796
RG
894 for method in ("sendUDPQuery", "sendTCPQuery"):
895 sender = getattr(self, method)
896 (receivedQuery, receivedResponse) = sender(query, response)
897 self.assertTrue(receivedQuery)
898 self.assertTrue(receivedResponse)
899 receivedQuery.id = expectedQuery.id
900 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
901 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
902
903 def testWithECSPrefixLengthOverriddenViaRule(self):
904 """
905 ECS Prefix Length: overridden with a rule
906 """
907 name = 'overriddenprefixlength.ecsrules.tests.powerdns.com.'
908 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
909 query = dns.message.make_query(name, 'A', 'IN')
910 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
911 response = dns.message.make_response(expectedQuery)
912 rrset = dns.rrset.from_text(name,
913 3600,
914 dns.rdataclass.IN,
915 dns.rdatatype.A,
916 '127.0.0.1')
917 response.answer.append(rrset)
918 expectedResponse = dns.message.make_response(query)
919 expectedResponse.answer.append(rrset)
920
6ca2e796
RG
921 for method in ("sendUDPQuery", "sendTCPQuery"):
922 sender = getattr(self, method)
923 (receivedQuery, receivedResponse) = sender(query, response)
924 self.assertTrue(receivedQuery)
925 self.assertTrue(receivedResponse)
926 receivedQuery.id = expectedQuery.id
927 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
928 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
ff0902ec
RG
929
930 def testWithECSPrefixLengthOverriddenViaLua(self):
931 """
932 ECS Prefix Length: overridden via Lua
933 """
934 name = 'overriddenprefixlengthvialua.ecsrules.tests.powerdns.com.'
935 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
936 query = dns.message.make_query(name, 'A', 'IN')
937 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
938 response = dns.message.make_response(expectedQuery)
bd14f087
RG
939 rrset = dns.rrset.from_text(name,
940 3600,
941 dns.rdataclass.IN,
942 dns.rdatatype.A,
943 '127.0.0.1')
944 response.answer.append(rrset)
945 expectedResponse = dns.message.make_response(query)
946 expectedResponse.answer.append(rrset)
947
6ca2e796
RG
948 for method in ("sendUDPQuery", "sendTCPQuery"):
949 sender = getattr(self, method)
950 (receivedQuery, receivedResponse) = sender(query, response)
951 self.assertTrue(receivedQuery)
952 self.assertTrue(receivedResponse)
953 receivedQuery.id = expectedQuery.id
954 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
955 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
bd14f087
RG
956
957class TestECSPrefixSetByRule(DNSDistTest):
958 """
959 dnsdist is configured to set the EDNS0 Client Subnet
960 option for incoming queries to the actual source IP,
961 but we override it for some queries via SetECSAction().
962 """
963
964 _config_template = """
965 setECSOverride(false)
966 setECSSourcePrefixV4(32)
967 setECSSourcePrefixV6(128)
968 newServer{address="127.0.0.1:%s", useClientSubnet=true}
969 addAction(makeRule("setecsaction.ecsrules.tests.powerdns.com."), SetECSAction("192.0.2.1/32"))
970 """
971
972 def testWithRegularECS(self):
973 """
974 ECS Prefix: not set
975 """
976 name = 'notsetecsaction.ecsrules.tests.powerdns.com.'
977 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
978 query = dns.message.make_query(name, 'A', 'IN')
979 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
980 response = dns.message.make_response(query)
981 response.use_edns(edns=True, payload=4096, options=[ecso])
982 rrset = dns.rrset.from_text(name,
983 3600,
984 dns.rdataclass.IN,
985 dns.rdatatype.A,
986 '127.0.0.1')
987 response.answer.append(rrset)
988 expectedResponse = dns.message.make_response(query)
989 expectedResponse.answer.append(rrset)
990
6ca2e796
RG
991 for method in ("sendUDPQuery", "sendTCPQuery"):
992 sender = getattr(self, method)
993 (receivedQuery, receivedResponse) = sender(query, response)
994 self.assertTrue(receivedQuery)
995 self.assertTrue(receivedResponse)
996 receivedQuery.id = expectedQuery.id
997 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
998 self.checkResponseNoEDNS(expectedResponse, receivedResponse)
bd14f087
RG
999
1000 def testWithECSSetByRule(self):
1001 """
1002 ECS Prefix: set with SetECSAction
1003 """
1004 name = 'setecsaction.ecsrules.tests.powerdns.com.'
1005 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
1006 query = dns.message.make_query(name, 'A', 'IN')
1007 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
1008 response = dns.message.make_response(expectedQuery)
ff0902ec
RG
1009 rrset = dns.rrset.from_text(name,
1010 3600,
1011 dns.rdataclass.IN,
1012 dns.rdatatype.A,
1013 '127.0.0.1')
1014 response.answer.append(rrset)
1015 expectedResponse = dns.message.make_response(query)
1016 expectedResponse.answer.append(rrset)
1017
6ca2e796
RG
1018 for method in ("sendUDPQuery", "sendTCPQuery"):
1019 sender = getattr(self, method)
1020 (receivedQuery, receivedResponse) = sender(query, response)
1021 self.assertTrue(receivedQuery)
1022 self.assertTrue(receivedResponse)
1023 receivedQuery.id = expectedQuery.id
1024 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
1025 self.checkResponseNoEDNS(expectedResponse, receivedResponse)