]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EdnsClientSubnet.py
Merge pull request #3953 from pieterlexis/sync-debian-build
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EdnsClientSubnet.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class TestEdnsClientSubnetNoOverride(DNSDistTest):
8 """
9 dnsdist is configured to add the EDNS0 Client Subnet
10 option, but only if it's not already present in the
11 original query.
12 """
13
14 _config_template = """
15 truncateTC(true)
16 newServer{address="127.0.0.1:%s", useClientSubnet=true}
17 """
18
19 def testWithoutEDNS(self):
20 """
21 ECS: No existing EDNS
22
23 Send a query without EDNS, check that the query
24 received by the responder has the correct ECS value
25 and that the response received from dnsdist does not
26 have an EDNS pseudo-RR.
27 """
28 name = 'withoutedns.ecs.tests.powerdns.com.'
29 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
30 query = dns.message.make_query(name, 'A', 'IN')
31 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
32 response = dns.message.make_response(expectedQuery)
33 expectedResponse = dns.message.make_response(query)
34 rrset = dns.rrset.from_text(name,
35 3600,
36 dns.rdataclass.IN,
37 dns.rdatatype.A,
38 '127.0.0.1')
39 response.answer.append(rrset)
40 expectedResponse.answer.append(rrset)
41
42 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
43 self.assertTrue(receivedQuery)
44 self.assertTrue(receivedResponse)
45 receivedQuery.id = expectedQuery.id
46 self.assertEquals(expectedQuery, receivedQuery)
47 self.assertEquals(expectedResponse, receivedResponse)
48 self.assertEquals(receivedResponse.edns, -1)
49 self.assertEquals(len(receivedResponse.options), 0)
50
51 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
52 self.assertTrue(receivedQuery)
53 self.assertTrue(receivedResponse)
54 receivedQuery.id = expectedQuery.id
55 self.assertEquals(expectedQuery, receivedQuery)
56 self.assertEquals(expectedResponse, receivedResponse)
57 self.assertEquals(receivedResponse.edns, -1)
58 self.assertEquals(len(receivedResponse.options), 0)
59
60 def testWithEDNSNoECS(self):
61 """
62 ECS: Existing EDNS without ECS
63
64 Send a query with EDNS but no ECS value.
65 Check that the query received by the responder
66 has a valid ECS value and that the response
67 received from dnsdist contains an EDNS pseudo-RR.
68 """
69 name = 'withednsnoecs.ecs.tests.powerdns.com.'
70 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
71 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
72 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
73 response = dns.message.make_response(expectedQuery)
74 expectedResponse = dns.message.make_response(query)
75 rrset = dns.rrset.from_text(name,
76 3600,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '127.0.0.1')
80 response.answer.append(rrset)
81 expectedResponse.answer.append(rrset)
82
83 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
84 self.assertTrue(receivedQuery)
85 self.assertTrue(receivedResponse)
86 receivedQuery.id = expectedQuery.id
87 self.assertEquals(expectedQuery, receivedQuery)
88 self.assertEquals(expectedResponse, receivedResponse)
89 self.assertEquals(receivedResponse.edns, 0)
90 self.assertEquals(len(receivedResponse.options), 0)
91
92 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
93 self.assertTrue(receivedQuery)
94 self.assertTrue(receivedResponse)
95 receivedQuery.id = expectedQuery.id
96 self.assertEquals(expectedQuery, receivedQuery)
97 self.assertEquals(expectedResponse, receivedResponse)
98 self.assertEquals(receivedResponse.edns, 0)
99 self.assertEquals(len(receivedResponse.options), 0)
100
101 def testWithEDNSECS(self):
102 """
103 ECS: Existing EDNS with ECS
104
105 Send a query with EDNS and a crafted ECS value.
106 Check that the query received by the responder
107 has the initial ECS value (not overwritten)
108 and that the response received from dnsdist contains
109 an EDNS pseudo-RR.
110 """
111 name = 'withednsecs.ecs.tests.powerdns.com.'
112 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24)
113 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
114 response = dns.message.make_response(query)
115 rrset = dns.rrset.from_text(name,
116 3600,
117 dns.rdataclass.IN,
118 dns.rdatatype.A,
119 '127.0.0.1')
120 response.answer.append(rrset)
121
122 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
123 self.assertTrue(receivedQuery)
124 self.assertTrue(receivedResponse)
125 receivedQuery.id = query.id
126 self.assertEquals(query, receivedQuery)
127 self.assertEquals(response, receivedResponse)
128 self.assertEquals(receivedResponse.edns, 0)
129 self.assertEquals(len(receivedResponse.options), 0)
130
131 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
132 self.assertTrue(receivedQuery)
133 self.assertTrue(receivedResponse)
134 receivedQuery.id = query.id
135 self.assertEquals(query, receivedQuery)
136 self.assertEquals(response, receivedResponse)
137 self.assertEquals(receivedResponse.edns, 0)
138 self.assertEquals(len(receivedResponse.options), 0)
139
140 def testWithoutEDNSResponseWithECS(self):
141 """
142 ECS: No existing EDNS (BE returning ECS)
143
144 Send a query without EDNS, check that the query
145 received by the responder has the correct ECS value
146 and that the response received from dnsdist does not
147 have an EDNS pseudo-RR.
148 This time the response returned by the backend contains
149 an ECS option with scope set.
150 """
151 name = 'withoutedns.bereturnsecs.ecs.tests.powerdns.com.'
152 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
153 query = dns.message.make_query(name, 'A', 'IN')
154 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
155 response = dns.message.make_response(expectedQuery)
156 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
157 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
158 expectedResponse = dns.message.make_response(query)
159 rrset = dns.rrset.from_text(name,
160 3600,
161 dns.rdataclass.IN,
162 dns.rdatatype.A,
163 '127.0.0.1')
164 response.answer.append(rrset)
165 expectedResponse.answer.append(rrset)
166
167 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
168 self.assertTrue(receivedQuery)
169 self.assertTrue(receivedResponse)
170 receivedQuery.id = expectedQuery.id
171 self.assertEquals(expectedQuery, receivedQuery)
172 self.assertEquals(expectedResponse, receivedResponse)
173 self.assertEquals(receivedResponse.edns, -1)
174 self.assertEquals(len(receivedResponse.options), 0)
175
176 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
177 self.assertTrue(receivedQuery)
178 self.assertTrue(receivedResponse)
179 receivedQuery.id = expectedQuery.id
180 self.assertEquals(expectedQuery, receivedQuery)
181 self.assertEquals(expectedResponse, receivedResponse)
182 self.assertEquals(receivedResponse.edns, -1)
183 self.assertEquals(len(receivedResponse.options), 0)
184
185 def testWithEDNSNoECSResponseWithECS(self):
186 """
187 ECS: Existing EDNS without ECS (BE returning only the ECS option)
188
189 Send a query with EDNS but no ECS value.
190 Check that the query received by the responder
191 has a valid ECS value and that the response
192 received from dnsdist contains an EDNS pseudo-RR.
193 This time the response returned by the backend contains
194 an ECS option with scope set.
195 """
196 name = 'withednsnoecs.bereturnsecs.ecs.tests.powerdns.com.'
197 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
198 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
199 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
200 response = dns.message.make_response(expectedQuery)
201 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
202 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
203 expectedResponse = dns.message.make_response(query)
204 rrset = dns.rrset.from_text(name,
205 3600,
206 dns.rdataclass.IN,
207 dns.rdatatype.A,
208 '127.0.0.1')
209 response.answer.append(rrset)
210 expectedResponse.answer.append(rrset)
211
212 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
213 self.assertTrue(receivedQuery)
214 self.assertTrue(receivedResponse)
215 receivedQuery.id = expectedQuery.id
216 self.assertEquals(expectedQuery, receivedQuery)
217 self.assertEquals(expectedResponse, receivedResponse)
218 self.assertEquals(receivedResponse.edns, 0)
219 self.assertEquals(len(receivedResponse.options), 0)
220
221 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
222 self.assertTrue(receivedQuery)
223 self.assertTrue(receivedResponse)
224 receivedQuery.id = expectedQuery.id
225 self.assertEquals(expectedQuery, receivedQuery)
226 self.assertEquals(expectedResponse, receivedResponse)
227 self.assertEquals(receivedResponse.edns, 0)
228 self.assertEquals(len(receivedResponse.options), 0)
229
230 def testWithEDNSNoECSResponseWithCookiesThenECS(self):
231 """
232 ECS: Existing EDNS without ECS (BE returning Cookies then ECS options)
233
234 Send a query with EDNS but no ECS value.
235 Check that the query received by the responder
236 has a valid ECS value and that the response
237 received from dnsdist contains an EDNS pseudo-RR.
238 This time the response returned by the backend contains
239 one cookies then one ECS option.
240 """
241 name = 'withednsnoecs.bereturnscookiesthenecs.ecs.tests.powerdns.com.'
242 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
243 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
244 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
245 response = dns.message.make_response(expectedQuery)
246 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
247 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
248 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse])
249 expectedResponse = dns.message.make_response(query)
250 rrset = dns.rrset.from_text(name,
251 3600,
252 dns.rdataclass.IN,
253 dns.rdatatype.A,
254 '127.0.0.1')
255 response.answer.append(rrset)
256 expectedResponse.answer.append(rrset)
257
258 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
259 self.assertTrue(receivedQuery)
260 self.assertTrue(receivedResponse)
261 receivedQuery.id = expectedQuery.id
262 self.assertEquals(expectedQuery, receivedQuery)
263 self.assertEquals(expectedResponse, receivedResponse)
264 self.assertEquals(receivedResponse.edns, 0)
265 self.assertEquals(len(receivedResponse.options), 1)
266
267 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
268 self.assertTrue(receivedQuery)
269 self.assertTrue(receivedResponse)
270 receivedQuery.id = expectedQuery.id
271 self.assertEquals(expectedQuery, receivedQuery)
272 self.assertEquals(expectedResponse, receivedResponse)
273 self.assertEquals(receivedResponse.edns, 0)
274 self.assertEquals(len(receivedResponse.options), 1)
275
276 def testWithEDNSNoECSResponseWithECSThenCookies(self):
277 """
278 ECS: Existing EDNS without ECS (BE returning ECS then Cookies options)
279
280 Send a query with EDNS but no ECS value.
281 Check that the query received by the responder
282 has a valid ECS value and that the response
283 received from dnsdist contains an EDNS pseudo-RR.
284 This time the response returned by the backend contains
285 one ECS then one Cookies option.
286 """
287 name = 'withednsnoecs.bereturnsecsthencookies.ecs.tests.powerdns.com.'
288 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
289 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
290 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
291 response = dns.message.make_response(expectedQuery)
292 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
293 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
294 response.use_edns(edns=True, payload=4096, options=[ecsoResponse, ecoResponse])
295 expectedResponse = dns.message.make_response(query)
296 rrset = dns.rrset.from_text(name,
297 3600,
298 dns.rdataclass.IN,
299 dns.rdatatype.A,
300 '127.0.0.1')
301 response.answer.append(rrset)
302 expectedResponse.answer.append(rrset)
303
304 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
305 self.assertTrue(receivedQuery)
306 self.assertTrue(receivedResponse)
307 receivedQuery.id = expectedQuery.id
308 self.assertEquals(expectedQuery, receivedQuery)
309 self.assertEquals(expectedResponse, receivedResponse)
310 self.assertEquals(receivedResponse.edns, 0)
311 self.assertEquals(len(receivedResponse.options), 1)
312
313 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
314 self.assertTrue(receivedQuery)
315 self.assertTrue(receivedResponse)
316 receivedQuery.id = expectedQuery.id
317 self.assertEquals(expectedQuery, receivedQuery)
318 self.assertEquals(expectedResponse, receivedResponse)
319 self.assertEquals(receivedResponse.edns, 0)
320 self.assertEquals(len(receivedResponse.options), 1)
321
322 def testWithEDNSNoECSResponseWithCookiesThenECSThenCookies(self):
323 """
324 ECS: Existing EDNS without ECS (BE returning Cookies, ECS then Cookies options)
325
326 Send a query with EDNS but no ECS value.
327 Check that the query received by the responder
328 has a valid ECS value and that the response
329 received from dnsdist contains an EDNS pseudo-RR.
330 This time the response returned by the backend contains
331 one Cookies, one ECS then one Cookies option.
332 """
333 name = 'withednsnoecs.bereturnscookiesecscookies.ecs.tests.powerdns.com.'
334 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
335 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
336 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
337 response = dns.message.make_response(expectedQuery)
338 ecoResponse = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
339 ecsoResponse = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24, scope=24)
340 response.use_edns(edns=True, payload=4096, options=[ecoResponse, ecsoResponse, ecoResponse])
341 expectedResponse = dns.message.make_response(query)
342 rrset = dns.rrset.from_text(name,
343 3600,
344 dns.rdataclass.IN,
345 dns.rdatatype.A,
346 '127.0.0.1')
347 response.answer.append(rrset)
348 expectedResponse.answer.append(rrset)
349
350 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
351 self.assertTrue(receivedQuery)
352 self.assertTrue(receivedResponse)
353 receivedQuery.id = expectedQuery.id
354 self.assertEquals(expectedQuery, receivedQuery)
355 self.assertEquals(expectedResponse, receivedResponse)
356 self.assertEquals(receivedResponse.edns, 0)
357 self.assertEquals(len(receivedResponse.options), 2)
358
359 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
360 self.assertTrue(receivedQuery)
361 self.assertTrue(receivedResponse)
362 receivedQuery.id = expectedQuery.id
363 self.assertEquals(expectedQuery, receivedQuery)
364 self.assertEquals(expectedResponse, receivedResponse)
365 self.assertEquals(receivedResponse.edns, 0)
366 self.assertEquals(len(receivedResponse.options), 2)
367
368
369 class TestEdnsClientSubnetOverride(DNSDistTest):
370 """
371 dnsdist is configured to add the EDNS0 Client Subnet
372 option, overwriting any existing value.
373 """
374
375 _config_template = """
376 truncateTC(true)
377 setECSOverride(true)
378 setECSSourcePrefixV4(24)
379 setECSSourcePrefixV6(56)
380 newServer{address="127.0.0.1:%s", useClientSubnet=true}
381 """
382
383 def testWithoutEDNS(self):
384 """
385 ECS Override: No existing EDNS
386
387 Send a query without EDNS, check that the query
388 received by the responder has the correct ECS value
389 and that the response received from dnsdist does not
390 have an EDNS pseudo-RR.
391 """
392 name = 'withoutedns.overriden.ecs.tests.powerdns.com.'
393 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
394 query = dns.message.make_query(name, 'A', 'IN')
395 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
396 response = dns.message.make_response(expectedQuery)
397 expectedResponse = dns.message.make_response(query)
398 rrset = dns.rrset.from_text(name,
399 3600,
400 dns.rdataclass.IN,
401 dns.rdatatype.A,
402 '127.0.0.1')
403 response.answer.append(rrset)
404 expectedResponse.answer.append(rrset)
405
406 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
407 self.assertTrue(receivedQuery)
408 self.assertTrue(receivedResponse)
409 receivedQuery.id = expectedQuery.id
410 self.assertEquals(expectedQuery, receivedQuery)
411 self.assertEquals(expectedResponse, receivedResponse)
412 self.assertEquals(receivedResponse.edns, -1)
413 self.assertEquals(len(receivedResponse.options), 0)
414
415 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
416 self.assertTrue(receivedQuery)
417 self.assertTrue(receivedResponse)
418 receivedQuery.id = expectedQuery.id
419 self.assertEquals(expectedQuery, receivedQuery)
420 self.assertEquals(expectedResponse, receivedResponse)
421 self.assertEquals(receivedResponse.edns, -1)
422 self.assertEquals(len(receivedResponse.options), 0)
423
424 def testWithEDNSNoECS(self):
425 """
426 ECS Override: Existing EDNS without ECS
427
428 Send a query with EDNS but no ECS value.
429 Check that the query received by the responder
430 has a valid ECS value and that the response
431 received from dnsdist contains an EDNS pseudo-RR.
432 """
433 name = 'withednsnoecs.overriden.ecs.tests.powerdns.com.'
434 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
435 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
436 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
437 response = dns.message.make_response(expectedQuery)
438 expectedResponse = dns.message.make_response(query)
439 rrset = dns.rrset.from_text(name,
440 3600,
441 dns.rdataclass.IN,
442 dns.rdatatype.A,
443 '127.0.0.1')
444 response.answer.append(rrset)
445 expectedResponse.answer.append(rrset)
446
447 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
448 self.assertTrue(receivedQuery)
449 self.assertTrue(receivedResponse)
450 receivedQuery.id = expectedQuery.id
451 self.assertEquals(expectedQuery, receivedQuery)
452 self.assertEquals(expectedResponse, receivedResponse)
453 self.assertEquals(receivedResponse.edns, 0)
454 self.assertEquals(len(receivedResponse.options), 0)
455
456 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
457 self.assertTrue(receivedQuery)
458 self.assertTrue(receivedResponse)
459 receivedQuery.id = expectedQuery.id
460 self.assertEquals(expectedQuery, receivedQuery)
461 self.assertEquals(expectedResponse, receivedResponse)
462 self.assertEquals(receivedResponse.edns, 0)
463 self.assertEquals(len(receivedResponse.options), 0)
464
465 def testWithEDNSShorterInitialECS(self):
466 """
467 ECS Override: Existing EDNS with ECS (short)
468
469 Send a query with EDNS and a crafted ECS value.
470 Check that the query received by the responder
471 has an overwritten ECS value (not the initial one)
472 and that the response received from dnsdist contains
473 an EDNS pseudo-RR.
474 The initial ECS value is shorter than the one it will
475 replaced with.
476 """
477 name = 'withednsecs.overriden.ecs.tests.powerdns.com.'
478 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 8)
479 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
480 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
481 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
482 response = dns.message.make_response(query)
483 rrset = dns.rrset.from_text(name,
484 3600,
485 dns.rdataclass.IN,
486 dns.rdatatype.A,
487 '127.0.0.1')
488 response.answer.append(rrset)
489
490 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
491 self.assertTrue(receivedQuery)
492 self.assertTrue(receivedResponse)
493 receivedQuery.id = expectedQuery.id
494 self.assertEquals(expectedQuery, receivedQuery)
495 self.assertEquals(response, receivedResponse)
496 self.assertEquals(receivedResponse.edns, 0)
497 self.assertEquals(len(receivedResponse.options), 0)
498
499 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
500 self.assertTrue(receivedQuery)
501 self.assertTrue(receivedResponse)
502 receivedQuery.id = expectedQuery.id
503 self.assertEquals(expectedQuery, receivedQuery)
504 self.assertEquals(response, receivedResponse)
505 self.assertEquals(receivedResponse.edns, 0)
506 self.assertEquals(len(receivedResponse.options), 0)
507
508 def testWithEDNSLongerInitialECS(self):
509 """
510 ECS Override: Existing EDNS with ECS (long)
511
512 Send a query with EDNS and a crafted ECS value.
513 Check that the query received by the responder
514 has an overwritten ECS value (not the initial one)
515 and that the response received from dnsdist contains
516 an EDNS pseudo-RR.
517 The initial ECS value is longer than the one it will
518 replaced with.
519 """
520 name = 'withednsecs.overriden.ecs.tests.powerdns.com.'
521 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
522 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
523 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
524 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
525 response = dns.message.make_response(query)
526 rrset = dns.rrset.from_text(name,
527 3600,
528 dns.rdataclass.IN,
529 dns.rdatatype.A,
530 '127.0.0.1')
531 response.answer.append(rrset)
532
533 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
534 self.assertTrue(receivedQuery)
535 self.assertTrue(receivedResponse)
536 receivedQuery.id = expectedQuery.id
537 self.assertEquals(expectedQuery, receivedQuery)
538 self.assertEquals(response, receivedResponse)
539 self.assertEquals(receivedResponse.edns, 0)
540 self.assertEquals(len(receivedResponse.options), 0)
541
542 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
543 self.assertTrue(receivedQuery)
544 self.assertTrue(receivedResponse)
545 receivedQuery.id = expectedQuery.id
546 self.assertEquals(expectedQuery, receivedQuery)
547 self.assertEquals(response, receivedResponse)
548 self.assertEquals(receivedResponse.edns, 0)
549 self.assertEquals(len(receivedResponse.options), 0)
550
551 def testWithEDNSSameSizeInitialECS(self):
552 """
553 ECS Override: Existing EDNS with ECS (same)
554
555 Send a query with EDNS and a crafted ECS value.
556 Check that the query received by the responder
557 has an overwritten ECS value (not the initial one)
558 and that the response received from dnsdist contains
559 an EDNS pseudo-RR.
560 The initial ECS value is exactly the same size as
561 the one it will replaced with.
562 """
563 name = 'withednsecs.overriden.ecs.tests.powerdns.com.'
564 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
565 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
566 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
567 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
568 response = dns.message.make_response(query)
569 rrset = dns.rrset.from_text(name,
570 3600,
571 dns.rdataclass.IN,
572 dns.rdatatype.A,
573 '127.0.0.1')
574 response.answer.append(rrset)
575
576 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
577 self.assertTrue(receivedQuery)
578 self.assertTrue(receivedResponse)
579 receivedQuery.id = expectedQuery.id
580 self.assertEquals(expectedQuery, receivedQuery)
581 self.assertEquals(response, receivedResponse)
582 self.assertEquals(receivedResponse.edns, 0)
583 self.assertEquals(len(receivedResponse.options), 0)
584
585 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
586 self.assertTrue(receivedQuery)
587 self.assertTrue(receivedResponse)
588 receivedQuery.id = expectedQuery.id
589 self.assertEquals(expectedQuery, receivedQuery)
590 self.assertEquals(response, receivedResponse)
591 self.assertEquals(receivedResponse.edns, 0)
592 self.assertEquals(len(receivedResponse.options), 0)