]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #15791 from miodvallat/udon
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 import unittest
9 from recursortests import RecursorTest, have_ipv6
10 from twisted.internet.protocol import DatagramProtocol
11 from twisted.internet import reactor
12
13 emptyECSText = 'No ECS received'
14 nameECS = 'ecs-echo.example.'
15 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
16 ttlECS = 60
17 ecsReactorRunning = False
18 ecsReactorv6Running = False
19
20 class ECSTest(RecursorTest):
21 _config_template_default = """
22 daemon=no
23 trace=yes
24 dont-query=
25 local-address=127.0.0.1
26 packetcache-ttl=15
27 packetcache-servfail-ttl=15
28 max-cache-ttl=600
29 threads=2
30 loglevel=9
31 disable-syslog=yes
32 log-common-errors=yes
33 statistics-interval=0
34 ecs-add-for=0.0.0.0/0
35 """
36
37 def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
38 res = self.sendUDPQuery(query)
39
40 self.assertRcodeEqual(res, dns.rcode.NOERROR)
41 self.assertRRsetInAnswer(res, expected)
42 # this will break if you are not looking for the first RR, sorry!
43 if expectedFirstTTL is not None:
44 self.assertTrue(res.answer[0].ttl == expectedFirstTTL or res.answer[0].ttl == expectedFirstTTL - 1)
45 else:
46 expectedFirstTTL = res.answer[0].ttl
47 self.assertEqual(res.edns, query.edns)
48
49 if scopeZeroResponse is not None:
50 self.assertEqual(res.edns, 0)
51 if scopeZeroResponse:
52 self.assertEqual(len(res.options), 1)
53 self.assertEqual(res.options[0].otype, 8)
54 self.assertEqual(res.options[0].scope, 0)
55 else:
56 self.assertEqual(len(res.options), 1)
57 self.assertEqual(res.options[0].otype, 8)
58 self.assertNotEqual(res.options[0].scope, 0)
59
60 # wait one second, check that the TTL has been
61 # decreased indicating a cache hit
62 time.sleep(1)
63
64 res = self.sendUDPQuery(query)
65
66 self.assertRcodeEqual(res, dns.rcode.NOERROR)
67 self.assertRRsetInAnswer(res, expected)
68 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
69 self.assertEqual(res.edns, query.edns)
70
71 def checkECSQueryHit(self, query, expected):
72 res = self.sendUDPQuery(query)
73
74 self.assertRcodeEqual(res, dns.rcode.NOERROR)
75 self.assertRRsetInAnswer(res, expected)
76 # this will break if you are not looking for the first RR, sorry!
77 self.assertLess(res.answer[0].ttl, ttlECS)
78
79 @classmethod
80 def startResponders(cls):
81 global ecsReactorRunning
82 global ecsReactorv6Running
83 print("Launching responders..")
84
85 address = cls._PREFIX + '.21'
86 port = 53
87
88 if not ecsReactorRunning:
89 reactor.listenUDP(port, UDPECSResponder(), interface=address)
90 ecsReactorRunning = True
91
92 if not ecsReactorv6Running and have_ipv6():
93 reactor.listenUDP(53000, UDPECSResponder(), interface='::1')
94 ecsReactorv6Running = True
95
96 if not reactor.running:
97 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
98 cls._UDPResponder.daemon = True
99 cls._UDPResponder.start()
100
101 class NoECSTest(ECSTest):
102 _confdir = 'NoECS'
103
104 _config_template = """edns-subnet-allow-list=
105 forward-zones=ecs-echo.example=%s.21
106 """ % (os.environ['PREFIX'])
107
108 def testSendECS(self):
109 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
110 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
111 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
112 self.sendECSQuery(query, expected)
113
114 def testNoECS(self):
115 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
116 query = dns.message.make_query(nameECS, 'TXT')
117 self.sendECSQuery(query, expected)
118
119 def testRequireNoECS(self):
120 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
121
122 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
123 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
124 self.sendECSQuery(query, expected)
125
126 class IncomingNoECSTest(ECSTest):
127 _confdir = 'IncomingNoECS'
128
129 _config_template = """edns-subnet-allow-list=
130 use-incoming-edns-subnet=yes
131 forward-zones=ecs-echo.example=%s.21
132 """ % (os.environ['PREFIX'])
133
134 def testSendECS(self):
135 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
136
137 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
138 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
139 self.sendECSQuery(query, expected, scopeZeroResponse=True)
140
141 def testNoECS(self):
142 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
143
144 query = dns.message.make_query(nameECS, 'TXT')
145 self.sendECSQuery(query, expected)
146
147 def testRequireNoECS(self):
148 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
149
150 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
151 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
152 self.sendECSQuery(query, expected, scopeZeroResponse=True)
153
154 class ECSByNameTest(ECSTest):
155 _confdir = 'ECSByName'
156
157 _config_template = """edns-subnet-allow-list=ecs-echo.example.
158 forward-zones=ecs-echo.example=%s.21
159 """ % (os.environ['PREFIX'])
160
161 def testSendECS(self):
162 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
163 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
164 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
165 self.sendECSQuery(query, expected)
166
167 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
168 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
169 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
170 self.checkECSQueryHit(query, expected)
171
172 def testNoECS(self):
173 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
174 query = dns.message.make_query(nameECS, 'TXT')
175 self.sendECSQuery(query, expected)
176
177 def testRequireNoECS(self):
178 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
179
180 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
181 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
182 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
183 self.sendECSQuery(query, expected)
184
185 class ECSByNameLargerTest(ECSTest):
186 _confdir = 'ECSByNameLarger'
187
188 _config_template = """edns-subnet-allow-list=ecs-echo.example.
189 ecs-ipv4-bits=32
190 forward-zones=ecs-echo.example=%s.21
191 ecs-ipv4-cache-bits=32
192 ecs-ipv6-cache-bits=128
193 """ % (os.environ['PREFIX'])
194
195 def testSendECS(self):
196 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
197 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
198 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
199 self.sendECSQuery(query, expected)
200
201 # check that a query in a different range is a miss
202 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
203 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
204 self.sendECSQuery(query, expected)
205
206 def testNoECS(self):
207 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
208 query = dns.message.make_query(nameECS, 'TXT')
209 self.sendECSQuery(query, expected)
210
211 def testRequireNoECS(self):
212 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
213
214 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
215 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
216 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
217 self.sendECSQuery(query, expected)
218
219 class ECSByNameSmallerTest(ECSTest):
220 _confdir = 'ECSByNameSmaller'
221
222 _config_template = """edns-subnet-allow-list=ecs-echo.example.
223 ecs-ipv4-bits=16
224 forward-zones=ecs-echo.example=%s.21
225 """ % (os.environ['PREFIX'])
226
227 def testSendECS(self):
228 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
229 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
230 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
231 self.sendECSQuery(query, expected)
232
233 def testNoECS(self):
234 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
235 query = dns.message.make_query(nameECS, 'TXT')
236 self.sendECSQuery(query, expected)
237
238 def testRequireNoECS(self):
239 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
240
241 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
242 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
243 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
244 self.sendECSQuery(query, expected)
245
246 class IncomingECSByNameTest(ECSTest):
247 _confdir = 'IncomingECSByName'
248
249 _config_template = """edns-subnet-allow-list=ecs-echo.example.
250 use-incoming-edns-subnet=yes
251 forward-zones=ecs-echo.example=%s.21
252 ecs-scope-zero-address=2001:db8::42
253 ecs-ipv4-cache-bits=32
254 ecs-ipv6-cache-bits=128
255 """ % (os.environ['PREFIX'])
256
257 def testSendECS(self):
258 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
259 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
260 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
261 self.sendECSQuery(query, expected, ttlECS)
262
263 # check that a query in the same ECS range is a hit
264 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
265 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
266 self.checkECSQueryHit(query, expected)
267
268 # check that a query in a different ECS range is a miss
269 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
270 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
271 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
272 self.sendECSQuery(query, expected)
273
274 def testNoECS(self):
275 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
276 query = dns.message.make_query(nameECS, 'TXT')
277 self.sendECSQuery(query, expected, ttlECS)
278
279 def testRequireNoECS(self):
280 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
281
282 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
283 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
284 self.sendECSQuery(query, expected, ttlECS)
285
286 class IncomingECSByNameLargerTest(ECSTest):
287 _confdir = 'IncomingECSByNameLarger'
288
289 _config_template = """edns-subnet-allow-list=ecs-echo.example.
290 use-incoming-edns-subnet=yes
291 ecs-ipv4-bits=32
292 forward-zones=ecs-echo.example=%s.21
293 ecs-scope-zero-address=192.168.0.1
294 ecs-ipv4-cache-bits=32
295 ecs-ipv6-cache-bits=128
296 """ % (os.environ['PREFIX'])
297
298 def testSendECS(self):
299 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
300
301 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
302 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
303 self.sendECSQuery(query, expected, ttlECS)
304
305 def testNoECS(self):
306 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
307
308 query = dns.message.make_query(nameECS, 'TXT')
309 self.sendECSQuery(query, expected, ttlECS)
310
311 def testRequireNoECS(self):
312 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
313
314 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
315 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
316 self.sendECSQuery(query, expected, ttlECS)
317
318 class IncomingECSByNameSmallerTest(ECSTest):
319 _confdir = 'IncomingECSByNameSmaller'
320
321 _config_template = """edns-subnet-allow-list=ecs-echo.example.
322 use-incoming-edns-subnet=yes
323 ecs-ipv4-bits=16
324 forward-zones=ecs-echo.example=%s.21
325 ecs-scope-zero-address=192.168.0.1
326 ecs-ipv4-cache-bits=32
327 ecs-ipv6-cache-bits=128
328 """ % (os.environ['PREFIX'])
329
330 def testSendECS(self):
331 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
332 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
333 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
334 self.sendECSQuery(query, expected)
335
336 def testNoECS(self):
337 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
338 query = dns.message.make_query(nameECS, 'TXT')
339 self.sendECSQuery(query, expected)
340
341 def testRequireNoECS(self):
342 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
343
344 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
345 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
346 self.sendECSQuery(query, expected, ttlECS)
347
348 @unittest.skipIf(not have_ipv6(), "No IPv6")
349 class IncomingECSByNameV6Test(ECSTest):
350 _confdir = 'IncomingECSByNameV6'
351
352 _config_template = """edns-subnet-allow-list=ecs-echo.example.
353 use-incoming-edns-subnet=yes
354 ecs-ipv6-bits=128
355 ecs-ipv4-cache-bits=32
356 ecs-ipv6-cache-bits=128
357 query-local-address=::1
358 forward-zones=ecs-echo.example=[::1]:53000
359 """
360
361 def testSendECS(self):
362 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
363 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
364 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
365 self.sendECSQuery(query, expected, ttlECS)
366
367 def testNoECS(self):
368 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
369
370 query = dns.message.make_query(nameECS, 'TXT')
371 res = self.sendUDPQuery(query)
372 self.sendECSQuery(query, expected, ttlECS)
373
374 def testRequireNoECS(self):
375 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
376 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
377
378 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
379 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
380 self.sendECSQuery(query, expected, ttlECS)
381
382 class ECSNameMismatchTest(ECSTest):
383 _confdir = 'ECSNameMismatch'
384
385 _config_template = """edns-subnet-allow-list=not-the-right-name.example.
386 forward-zones=ecs-echo.example=%s.21
387 """ % (os.environ['PREFIX'])
388
389 def testSendECS(self):
390 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
391 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
392 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
393 self.sendECSQuery(query, expected)
394
395 def testNoECS(self):
396 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
397 query = dns.message.make_query(nameECS, 'TXT')
398 self.sendECSQuery(query, expected)
399
400 def testRequireNoECS(self):
401 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
402
403 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
404 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
405 self.sendECSQuery(query, expected)
406
407 class ECSByIPTest(ECSTest):
408 _confdir = 'ECSByIP'
409
410 _config_template = """edns-subnet-allow-list=%s.21
411 forward-zones=ecs-echo.example=%s.21
412 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
413
414 def testSendECS(self):
415 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
416 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
417 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
418 self.sendECSQuery(query, expected)
419
420 def testNoECS(self):
421 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
422 query = dns.message.make_query(nameECS, 'TXT')
423 self.sendECSQuery(query, expected)
424
425 def testRequireNoECS(self):
426 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
427
428 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
429 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
430 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
431 self.sendECSQuery(query, expected)
432
433 class IncomingECSByIPTest(ECSTest):
434 _confdir = 'IncomingECSByIP'
435
436 _config_template = """edns-subnet-allow-list=%s.21
437 use-incoming-edns-subnet=yes
438 forward-zones=ecs-echo.example=%s.21
439 ecs-scope-zero-address=::1
440 ecs-ipv4-cache-bits=32
441 ecs-ipv6-cache-bits=128
442 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
443
444 def testSendECS(self):
445 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
446
447 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
448 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
449 self.sendECSQuery(query, expected)
450
451 def testNoECS(self):
452 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
453 query = dns.message.make_query(nameECS, 'TXT')
454 self.sendECSQuery(query, expected)
455
456 def testRequireNoECS(self):
457 # we will get ::1 because ecs-scope-zero-addr is set to ::1
458 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
459
460 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
461 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
462 self.sendECSQuery(query, expected, ttlECS)
463
464 def testSendECSInvalidScope(self):
465 # test that the recursor does not cache with a more specific scope than the source it sent
466 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
467
468 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
469 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
470
471 self.sendECSQuery(query, expected)
472
473 class ECSIPMismatchTest(ECSTest):
474 _confdir = 'ECSIPMismatch'
475
476 _config_template = """edns-subnet-allow-list=192.0.2.1
477 forward-zones=ecs-echo.example=%s.21
478 """ % (os.environ['PREFIX'])
479
480 def testSendECS(self):
481 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
482 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
483 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
484 self.sendECSQuery(query, expected)
485
486 def testNoECS(self):
487 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
488 query = dns.message.make_query(nameECS, 'TXT')
489 res = self.sendUDPQuery(query)
490 self.sendECSQuery(query, expected)
491
492 def testRequireNoECS(self):
493 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
494
495 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
496 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
497 self.sendECSQuery(query, expected)
498
499 class ECSWithProxyProtocolRecursorTest(ECSTest):
500 _confdir = 'ECSWithProxyProtocolRecursor'
501 _config_template = """
502 ecs-add-for=2001:db8::1/128
503 edns-subnet-allow-list=ecs-echo.example.
504 forward-zones=ecs-echo.example=%s.21
505 proxy-protocol-from=127.0.0.1/32
506 allow-from=2001:db8::1/128
507 """ % (os.environ['PREFIX'])
508
509 def testProxyProtocolPlusECS(self):
510 qname = nameECS
511 expected = dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', '2001:db8::/56')
512
513 query = dns.message.make_query(qname, 'TXT', use_edns=True)
514 for method in ("sendUDPQueryWithProxyProtocol", "sendTCPQueryWithProxyProtocol"):
515 sender = getattr(self, method)
516 res = sender(query, True, '2001:db8::1', '2001:db8::2', 0, 65535)
517 self.assertRcodeEqual(res, dns.rcode.NOERROR)
518 self.assertRRsetInAnswer(res, expected)
519
520 class TooLargeToAddZeroScopeTest(RecursorTest):
521
522 _confdir = 'TooLargeToAddZeroScope'
523 _config_template = """
524 use-incoming-edns-subnet=yes
525 dnssec=validate
526 """
527 _lua_dns_script_file = """
528 function preresolve(dq)
529 if dq.qname == newDN('toolarge.ecs.') then
530 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
531 return true
532 end
533 return false
534 end
535 """ % ('A'*447)
536
537 _roothints = None
538
539
540 @classmethod
541 def generateRecursorConfig(cls, confdir):
542 super(TooLargeToAddZeroScopeTest, cls).generateRecursorConfig(confdir)
543
544 def testTooLarge(self):
545 qname = 'toolarge.ecs.'
546 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
547 query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
548
549 # should not have an ECS Option since the packet is too large already
550 res = self.sendUDPQuery(query, timeout=5.0)
551 self.assertRcodeEqual(res, dns.rcode.NOERROR)
552 self.assertEqual(len(res.answer), 1)
553 self.assertEqual(res.edns, 0)
554 self.assertEqual(len(res.options), 0)
555
556 res = self.sendTCPQuery(query, timeout=5.0)
557 self.assertRcodeEqual(res, dns.rcode.NOERROR)
558 self.assertEqual(len(res.answer), 1)
559 self.assertEqual(res.edns, 0)
560 self.assertEqual(len(res.options), 1)
561 self.assertEqual(res.options[0].otype, 8)
562 self.assertEqual(res.options[0].scope, 0)
563
564 class UDPECSResponder(DatagramProtocol):
565 @staticmethod
566 def ipToStr(option):
567 if option.family == clientsubnetoption.FAMILY_IPV4:
568 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
569 elif option.family == clientsubnetoption.FAMILY_IPV6:
570 ip = socket.inet_ntop(socket.AF_INET6,
571 struct.pack('!QQ',
572 option.ip >> 64,
573 option.ip & (2 ** 64 - 1)))
574 return ip
575
576 def datagramReceived(self, datagram, address):
577 request = dns.message.from_wire(datagram)
578
579 response = dns.message.make_response(request)
580 response.flags |= dns.flags.AA
581 ecso = None
582
583 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
584
585 text = emptyECSText
586 for option in request.options:
587 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
588 text = self.ipToStr(option) + '/' + str(option.mask)
589
590 # Send a scope more specific than the received source for nameECSInvalidScope
591 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
592 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
593 else:
594 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
595
596 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
597 response.answer.append(answer)
598
599 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
600 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
601 response.answer.append(answer)
602 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
603 response.additional.append(additional)
604
605 if ecso:
606 response.use_edns(options = [ecso])
607
608 self.transport.write(response.to_wire(), address)