]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #14237 from romeroalx/fix-docs-pip-pinning
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 import unittest
9 from recursortests import RecursorTest, have_ipv6
10 from twisted.internet.protocol import DatagramProtocol
11 from twisted.internet import reactor
12
13 emptyECSText = 'No ECS received'
14 nameECS = 'ecs-echo.example.'
15 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
16 ttlECS = 60
17 ecsReactorRunning = False
18 ecsReactorv6Running = False
19
20 class ECSTest(RecursorTest):
21 _config_template_default = """
22 daemon=no
23 trace=yes
24 dont-query=
25 local-address=127.0.0.1
26 packetcache-ttl=15
27 packetcache-servfail-ttl=15
28 max-cache-ttl=600
29 threads=2
30 loglevel=9
31 disable-syslog=yes
32 log-common-errors=yes
33 statistics-interval=0
34 ecs-add-for=0.0.0.0/0
35 """
36
37 def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
38 res = self.sendUDPQuery(query)
39
40 self.assertRcodeEqual(res, dns.rcode.NOERROR)
41 self.assertRRsetInAnswer(res, expected)
42 # this will break if you are not looking for the first RR, sorry!
43 if expectedFirstTTL is not None:
44 self.assertTrue(res.answer[0].ttl == expectedFirstTTL or res.answer[0].ttl == expectedFirstTTL - 1)
45 else:
46 expectedFirstTTL = res.answer[0].ttl
47 self.assertEqual(res.edns, query.edns)
48
49 if scopeZeroResponse is not None:
50 self.assertEqual(res.edns, 0)
51 if scopeZeroResponse:
52 self.assertEqual(len(res.options), 1)
53 self.assertEqual(res.options[0].otype, 8)
54 self.assertEqual(res.options[0].scope, 0)
55 else:
56 self.assertEqual(len(res.options), 1)
57 self.assertEqual(res.options[0].otype, 8)
58 self.assertNotEqual(res.options[0].scope, 0)
59
60 # wait one second, check that the TTL has been
61 # decreased indicating a cache hit
62 time.sleep(1)
63
64 res = self.sendUDPQuery(query)
65
66 self.assertRcodeEqual(res, dns.rcode.NOERROR)
67 self.assertRRsetInAnswer(res, expected)
68 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
69 self.assertEqual(res.edns, query.edns)
70
71 def checkECSQueryHit(self, query, expected):
72 res = self.sendUDPQuery(query)
73
74 self.assertRcodeEqual(res, dns.rcode.NOERROR)
75 self.assertRRsetInAnswer(res, expected)
76 # this will break if you are not looking for the first RR, sorry!
77 self.assertLess(res.answer[0].ttl, ttlECS)
78
79 @classmethod
80 def startResponders(cls):
81 global ecsReactorRunning
82 global ecsReactorv6Running
83 print("Launching responders..")
84
85 address = cls._PREFIX + '.21'
86 port = 53
87
88 if not ecsReactorRunning:
89 reactor.listenUDP(port, UDPECSResponder(), interface=address)
90 ecsReactorRunning = True
91
92 if not ecsReactorv6Running and have_ipv6():
93 reactor.listenUDP(53000, UDPECSResponder(), interface='::1')
94 ecsReactorv6Running = True
95
96 if not reactor.running:
97 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
98 cls._UDPResponder.daemon = True
99 cls._UDPResponder.start()
100
101 @classmethod
102 def setUpClass(cls):
103 cls.setUpSockets()
104
105 cls.startResponders()
106
107 confdir = os.path.join('configs', cls._confdir)
108 cls.createConfigDir(confdir)
109
110 cls.generateRecursorConfig(confdir)
111 cls.startRecursor(confdir, cls._recursorPort)
112
113 print("Launching tests..")
114
115 @classmethod
116 def tearDownClass(cls):
117 cls.tearDownRecursor()
118
119 class testNoECS(ECSTest):
120 _confdir = 'NoECS'
121
122 _config_template = """edns-subnet-allow-list=
123 forward-zones=ecs-echo.example=%s.21
124 """ % (os.environ['PREFIX'])
125
126 def testSendECS(self):
127 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
128 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
129 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
130 self.sendECSQuery(query, expected)
131
132 def testNoECS(self):
133 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
134 query = dns.message.make_query(nameECS, 'TXT')
135 self.sendECSQuery(query, expected)
136
137 def testRequireNoECS(self):
138 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
139
140 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
141 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
142 self.sendECSQuery(query, expected)
143
144 class testIncomingNoECS(ECSTest):
145 _confdir = 'IncomingNoECS'
146
147 _config_template = """edns-subnet-allow-list=
148 use-incoming-edns-subnet=yes
149 forward-zones=ecs-echo.example=%s.21
150 """ % (os.environ['PREFIX'])
151
152 def testSendECS(self):
153 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
154
155 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
156 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
157 self.sendECSQuery(query, expected, scopeZeroResponse=True)
158
159 def testNoECS(self):
160 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
161
162 query = dns.message.make_query(nameECS, 'TXT')
163 self.sendECSQuery(query, expected)
164
165 def testRequireNoECS(self):
166 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
167
168 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
169 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
170 self.sendECSQuery(query, expected, scopeZeroResponse=True)
171
172 class testECSByName(ECSTest):
173 _confdir = 'ECSByName'
174
175 _config_template = """edns-subnet-allow-list=ecs-echo.example.
176 forward-zones=ecs-echo.example=%s.21
177 """ % (os.environ['PREFIX'])
178
179 def testSendECS(self):
180 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
181 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
182 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
183 self.sendECSQuery(query, expected)
184
185 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
186 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
187 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
188 self.checkECSQueryHit(query, expected)
189
190 def testNoECS(self):
191 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
192 query = dns.message.make_query(nameECS, 'TXT')
193 self.sendECSQuery(query, expected)
194
195 def testRequireNoECS(self):
196 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
197
198 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
199 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
200 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
201 self.sendECSQuery(query, expected)
202
203 class testECSByNameLarger(ECSTest):
204 _confdir = 'ECSByNameLarger'
205
206 _config_template = """edns-subnet-allow-list=ecs-echo.example.
207 ecs-ipv4-bits=32
208 forward-zones=ecs-echo.example=%s.21
209 ecs-ipv4-cache-bits=32
210 ecs-ipv6-cache-bits=128
211 """ % (os.environ['PREFIX'])
212
213 def testSendECS(self):
214 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
215 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
216 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
217 self.sendECSQuery(query, expected)
218
219 # check that a query in a different range is a miss
220 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
221 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
222 self.sendECSQuery(query, expected)
223
224 def testNoECS(self):
225 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
226 query = dns.message.make_query(nameECS, 'TXT')
227 self.sendECSQuery(query, expected)
228
229 def testRequireNoECS(self):
230 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
231
232 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
233 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
234 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
235 self.sendECSQuery(query, expected)
236
237 class testECSByNameSmaller(ECSTest):
238 _confdir = 'ECSByNameLarger'
239
240 _config_template = """edns-subnet-allow-list=ecs-echo.example.
241 ecs-ipv4-bits=16
242 forward-zones=ecs-echo.example=%s.21
243 """ % (os.environ['PREFIX'])
244
245 def testSendECS(self):
246 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
247 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
248 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
249 self.sendECSQuery(query, expected)
250
251 def testNoECS(self):
252 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
253 query = dns.message.make_query(nameECS, 'TXT')
254 self.sendECSQuery(query, expected)
255
256 def testRequireNoECS(self):
257 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
258
259 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
260 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
261 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
262 self.sendECSQuery(query, expected)
263
264 class testIncomingECSByName(ECSTest):
265 _confdir = 'ECSIncomingByName'
266
267 _config_template = """edns-subnet-allow-list=ecs-echo.example.
268 use-incoming-edns-subnet=yes
269 forward-zones=ecs-echo.example=%s.21
270 ecs-scope-zero-address=2001:db8::42
271 ecs-ipv4-cache-bits=32
272 ecs-ipv6-cache-bits=128
273 """ % (os.environ['PREFIX'])
274
275 def testSendECS(self):
276 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
277 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
278 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
279 self.sendECSQuery(query, expected, ttlECS)
280
281 # check that a query in the same ECS range is a hit
282 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
283 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
284 self.checkECSQueryHit(query, expected)
285
286 # check that a query in a different ECS range is a miss
287 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
288 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
289 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
290 self.sendECSQuery(query, expected)
291
292 def testNoECS(self):
293 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
294 query = dns.message.make_query(nameECS, 'TXT')
295 self.sendECSQuery(query, expected, ttlECS)
296
297 def testRequireNoECS(self):
298 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
299
300 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
301 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
302 self.sendECSQuery(query, expected, ttlECS)
303
304 class testIncomingECSByNameLarger(ECSTest):
305 _confdir = 'ECSIncomingByNameLarger'
306
307 _config_template = """edns-subnet-allow-list=ecs-echo.example.
308 use-incoming-edns-subnet=yes
309 ecs-ipv4-bits=32
310 forward-zones=ecs-echo.example=%s.21
311 ecs-scope-zero-address=192.168.0.1
312 ecs-ipv4-cache-bits=32
313 ecs-ipv6-cache-bits=128
314 """ % (os.environ['PREFIX'])
315
316 def testSendECS(self):
317 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
318
319 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
320 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
321 self.sendECSQuery(query, expected, ttlECS)
322
323 def testNoECS(self):
324 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
325
326 query = dns.message.make_query(nameECS, 'TXT')
327 self.sendECSQuery(query, expected, ttlECS)
328
329 def testRequireNoECS(self):
330 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
331
332 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
333 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
334 self.sendECSQuery(query, expected, ttlECS)
335
336 class testIncomingECSByNameSmaller(ECSTest):
337 _confdir = 'ECSIncomingByNameSmaller'
338
339 _config_template = """edns-subnet-allow-list=ecs-echo.example.
340 use-incoming-edns-subnet=yes
341 ecs-ipv4-bits=16
342 forward-zones=ecs-echo.example=%s.21
343 ecs-scope-zero-address=192.168.0.1
344 ecs-ipv4-cache-bits=32
345 ecs-ipv6-cache-bits=128
346 """ % (os.environ['PREFIX'])
347
348 def testSendECS(self):
349 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
350 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
351 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
352 self.sendECSQuery(query, expected)
353
354 def testNoECS(self):
355 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
356 query = dns.message.make_query(nameECS, 'TXT')
357 self.sendECSQuery(query, expected)
358
359 def testRequireNoECS(self):
360 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
361
362 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
363 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
364 self.sendECSQuery(query, expected, ttlECS)
365
366 @unittest.skipIf(not have_ipv6(), "No IPv6")
367 class testIncomingECSByNameV6(ECSTest):
368 _confdir = 'ECSIncomingByNameV6'
369
370 _config_template = """edns-subnet-allow-list=ecs-echo.example.
371 use-incoming-edns-subnet=yes
372 ecs-ipv6-bits=128
373 ecs-ipv4-cache-bits=32
374 ecs-ipv6-cache-bits=128
375 query-local-address=::1
376 forward-zones=ecs-echo.example=[::1]:53000
377 """
378
379 def testSendECS(self):
380 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
381 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
382 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
383 self.sendECSQuery(query, expected, ttlECS)
384
385 def testNoECS(self):
386 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
387
388 query = dns.message.make_query(nameECS, 'TXT')
389 res = self.sendUDPQuery(query)
390 self.sendECSQuery(query, expected, ttlECS)
391
392 def testRequireNoECS(self):
393 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
394 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
395
396 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
397 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
398 self.sendECSQuery(query, expected, ttlECS)
399
400 class testECSNameMismatch(ECSTest):
401 _confdir = 'ECSNameMismatch'
402
403 _config_template = """edns-subnet-allow-list=not-the-right-name.example.
404 forward-zones=ecs-echo.example=%s.21
405 """ % (os.environ['PREFIX'])
406
407 def testSendECS(self):
408 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
409 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
410 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
411 self.sendECSQuery(query, expected)
412
413 def testNoECS(self):
414 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
415 query = dns.message.make_query(nameECS, 'TXT')
416 self.sendECSQuery(query, expected)
417
418 def testRequireNoECS(self):
419 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
420
421 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
422 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
423 self.sendECSQuery(query, expected)
424
425 class testECSByIP(ECSTest):
426 _confdir = 'ECSByIP'
427
428 _config_template = """edns-subnet-allow-list=%s.21
429 forward-zones=ecs-echo.example=%s.21
430 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
431
432 def testSendECS(self):
433 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
434 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
435 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
436 self.sendECSQuery(query, expected)
437
438 def testNoECS(self):
439 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
440 query = dns.message.make_query(nameECS, 'TXT')
441 self.sendECSQuery(query, expected)
442
443 def testRequireNoECS(self):
444 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
445
446 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
447 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
448 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
449 self.sendECSQuery(query, expected)
450
451 class testIncomingECSByIP(ECSTest):
452 _confdir = 'ECSIncomingByIP'
453
454 _config_template = """edns-subnet-allow-list=%s.21
455 use-incoming-edns-subnet=yes
456 forward-zones=ecs-echo.example=%s.21
457 ecs-scope-zero-address=::1
458 ecs-ipv4-cache-bits=32
459 ecs-ipv6-cache-bits=128
460 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
461
462 def testSendECS(self):
463 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
464
465 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
466 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
467 self.sendECSQuery(query, expected)
468
469 def testNoECS(self):
470 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
471 query = dns.message.make_query(nameECS, 'TXT')
472 self.sendECSQuery(query, expected)
473
474 def testRequireNoECS(self):
475 # we will get ::1 because ecs-scope-zero-addr is set to ::1
476 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
477
478 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
479 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
480 self.sendECSQuery(query, expected, ttlECS)
481
482 def testSendECSInvalidScope(self):
483 # test that the recursor does not cache with a more specific scope than the source it sent
484 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
485
486 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
487 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
488
489 self.sendECSQuery(query, expected)
490
491 class testECSIPMismatch(ECSTest):
492 _confdir = 'ECSIPMismatch'
493
494 _config_template = """edns-subnet-allow-list=192.0.2.1
495 forward-zones=ecs-echo.example=%s.21
496 """ % (os.environ['PREFIX'])
497
498 def testSendECS(self):
499 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
500 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
501 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
502 self.sendECSQuery(query, expected)
503
504 def testNoECS(self):
505 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
506 query = dns.message.make_query(nameECS, 'TXT')
507 res = self.sendUDPQuery(query)
508 self.sendECSQuery(query, expected)
509
510 def testRequireNoECS(self):
511 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
512
513 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
514 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
515 self.sendECSQuery(query, expected)
516
517 class testECSWithProxyProtocoldRecursorTest(ECSTest):
518 _confdir = 'ECSWithProxyProtocol'
519 _config_template = """
520 ecs-add-for=2001:db8::1/128
521 edns-subnet-allow-list=ecs-echo.example.
522 forward-zones=ecs-echo.example=%s.21
523 proxy-protocol-from=127.0.0.1/32
524 allow-from=2001:db8::1/128
525 """ % (os.environ['PREFIX'])
526
527 def testProxyProtocolPlusECS(self):
528 qname = nameECS
529 expected = dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', '2001:db8::/56')
530
531 query = dns.message.make_query(qname, 'TXT', use_edns=True)
532 for method in ("sendUDPQueryWithProxyProtocol", "sendTCPQueryWithProxyProtocol"):
533 sender = getattr(self, method)
534 res = sender(query, True, '2001:db8::1', '2001:db8::2', 0, 65535)
535 self.assertRcodeEqual(res, dns.rcode.NOERROR)
536 self.assertRRsetInAnswer(res, expected)
537
538 class testTooLargeToAddZeroScope(RecursorTest):
539
540 _confdir = 'TooLargeToAddZeroScope'
541 _config_template = """
542 use-incoming-edns-subnet=yes
543 dnssec=validate
544 """
545 _lua_dns_script_file = """
546 function preresolve(dq)
547 if dq.qname == newDN('toolarge.ecs.') then
548 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
549 return true
550 end
551 return false
552 end
553 """ % ('A'*447)
554
555 _roothints = None
556
557 @classmethod
558 def setUpClass(cls):
559
560 # we don't need all the auth stuff
561 cls.setUpSockets()
562 cls.startResponders()
563
564 confdir = os.path.join('configs', cls._confdir)
565 cls.createConfigDir(confdir)
566
567 cls.generateRecursorConfig(confdir)
568 cls.startRecursor(confdir, cls._recursorPort)
569
570 @classmethod
571 def tearDownClass(cls):
572 cls.tearDownRecursor()
573
574 @classmethod
575 def generateRecursorConfig(cls, confdir):
576 super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir)
577
578 def testTooLarge(self):
579 qname = 'toolarge.ecs.'
580 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
581 query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
582
583 # should not have an ECS Option since the packet is too large already
584 res = self.sendUDPQuery(query, timeout=5.0)
585 self.assertRcodeEqual(res, dns.rcode.NOERROR)
586 self.assertEqual(len(res.answer), 1)
587 self.assertEqual(res.edns, 0)
588 self.assertEqual(len(res.options), 0)
589
590 res = self.sendTCPQuery(query, timeout=5.0)
591 self.assertRcodeEqual(res, dns.rcode.NOERROR)
592 self.assertEqual(len(res.answer), 1)
593 self.assertEqual(res.edns, 0)
594 self.assertEqual(len(res.options), 1)
595 self.assertEqual(res.options[0].otype, 8)
596 self.assertEqual(res.options[0].scope, 0)
597
598 class UDPECSResponder(DatagramProtocol):
599 @staticmethod
600 def ipToStr(option):
601 if option.family == clientsubnetoption.FAMILY_IPV4:
602 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
603 elif option.family == clientsubnetoption.FAMILY_IPV6:
604 ip = socket.inet_ntop(socket.AF_INET6,
605 struct.pack('!QQ',
606 option.ip >> 64,
607 option.ip & (2 ** 64 - 1)))
608 return ip
609
610 def datagramReceived(self, datagram, address):
611 request = dns.message.from_wire(datagram)
612
613 response = dns.message.make_response(request)
614 response.flags |= dns.flags.AA
615 ecso = None
616
617 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
618
619 text = emptyECSText
620 for option in request.options:
621 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
622 text = self.ipToStr(option) + '/' + str(option.mask)
623
624 # Send a scope more specific than the received source for nameECSInvalidScope
625 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
626 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
627 else:
628 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
629
630 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
631 response.answer.append(answer)
632
633 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
634 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
635 response.answer.append(answer)
636 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
637 response.additional.append(additional)
638
639 if ecso:
640 response.use_edns(options = [ecso])
641
642 self.transport.write(response.to_wire(), address)