]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #11030 from omoerbeek/rec-incoming-tcp-finalize
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 import unittest
9 from recursortests import RecursorTest, have_ipv6
10 from twisted.internet.protocol import DatagramProtocol
11 from twisted.internet import reactor
12
13 emptyECSText = 'No ECS received'
14 nameECS = 'ecs-echo.example.'
15 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
16 ttlECS = 60
17 ecsReactorRunning = False
18 ecsReactorv6Running = False
19
20 class ECSTest(RecursorTest):
21 _config_template_default = """
22 daemon=no
23 trace=yes
24 dont-query=
25 ecs-add-for=0.0.0.0/0
26 local-address=127.0.0.1
27 packetcache-ttl=0
28 packetcache-servfail-ttl=0
29 max-cache-ttl=600
30 threads=1
31 loglevel=9
32 disable-syslog=yes
33 """
34
35 def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
36 res = self.sendUDPQuery(query)
37
38 self.assertRcodeEqual(res, dns.rcode.NOERROR)
39 self.assertRRsetInAnswer(res, expected)
40 # this will break if you are not looking for the first RR, sorry!
41 if expectedFirstTTL is not None:
42 self.assertTrue(res.answer[0].ttl == expectedFirstTTL or res.answer[0].ttl == expectedFirstTTL - 1)
43 else:
44 expectedFirstTTL = res.answer[0].ttl
45 self.assertEqual(res.edns, query.edns)
46
47 if scopeZeroResponse is not None:
48 self.assertEqual(res.edns, 0)
49 if scopeZeroResponse:
50 self.assertEqual(len(res.options), 1)
51 self.assertEqual(res.options[0].otype, 8)
52 self.assertEqual(res.options[0].scope, 0)
53 else:
54 self.assertEqual(len(res.options), 1)
55 self.assertEqual(res.options[0].otype, 8)
56 self.assertNotEqual(res.options[0].scope, 0)
57
58 # wait one second, check that the TTL has been
59 # decreased indicating a cache hit
60 time.sleep(1)
61
62 res = self.sendUDPQuery(query)
63
64 self.assertRcodeEqual(res, dns.rcode.NOERROR)
65 self.assertRRsetInAnswer(res, expected)
66 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
67 self.assertEqual(res.edns, query.edns)
68
69 def checkECSQueryHit(self, query, expected):
70 res = self.sendUDPQuery(query)
71
72 self.assertRcodeEqual(res, dns.rcode.NOERROR)
73 self.assertRRsetInAnswer(res, expected)
74 # this will break if you are not looking for the first RR, sorry!
75 self.assertLess(res.answer[0].ttl, ttlECS)
76
77 @classmethod
78 def startResponders(cls):
79 global ecsReactorRunning
80 global ecsReactorv6Running
81 print("Launching responders..")
82
83 address = cls._PREFIX + '.21'
84 port = 53
85
86 if not ecsReactorRunning:
87 reactor.listenUDP(port, UDPECSResponder(), interface=address)
88 ecsReactorRunning = True
89
90 if not ecsReactorv6Running and have_ipv6():
91 reactor.listenUDP(53000, UDPECSResponder(), interface='::1')
92 ecsReactorv6Running = True
93
94 if not reactor.running:
95 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
96 cls._UDPResponder.setDaemon(True)
97 cls._UDPResponder.start()
98
99 @classmethod
100 def setUpClass(cls):
101 cls.setUpSockets()
102
103 cls.startResponders()
104
105 confdir = os.path.join('configs', cls._confdir)
106 cls.createConfigDir(confdir)
107
108 cls.generateRecursorConfig(confdir)
109 cls.startRecursor(confdir, cls._recursorPort)
110
111 print("Launching tests..")
112
113 @classmethod
114 def tearDownClass(cls):
115 cls.tearDownRecursor()
116
117 class testNoECS(ECSTest):
118 _confdir = 'NoECS'
119
120 _config_template = """edns-subnet-allow-list=
121 forward-zones=ecs-echo.example=%s.21
122 """ % (os.environ['PREFIX'])
123
124 def testSendECS(self):
125 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
126 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
127 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
128 self.sendECSQuery(query, expected)
129
130 def testNoECS(self):
131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
132 query = dns.message.make_query(nameECS, 'TXT')
133 self.sendECSQuery(query, expected)
134
135 def testRequireNoECS(self):
136 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
137
138 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
139 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
140 self.sendECSQuery(query, expected)
141
142 class testIncomingNoECS(ECSTest):
143 _confdir = 'IncomingNoECS'
144
145 _config_template = """edns-subnet-allow-list=
146 use-incoming-edns-subnet=yes
147 forward-zones=ecs-echo.example=%s.21
148 """ % (os.environ['PREFIX'])
149
150 def testSendECS(self):
151 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
152
153 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
154 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
155 self.sendECSQuery(query, expected, scopeZeroResponse=True)
156
157 def testNoECS(self):
158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
159
160 query = dns.message.make_query(nameECS, 'TXT')
161 self.sendECSQuery(query, expected)
162
163 def testRequireNoECS(self):
164 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
165
166 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
167 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
168 self.sendECSQuery(query, expected, scopeZeroResponse=True)
169
170 class testECSByName(ECSTest):
171 _confdir = 'ECSByName'
172
173 _config_template = """edns-subnet-allow-list=ecs-echo.example.
174 forward-zones=ecs-echo.example=%s.21
175 """ % (os.environ['PREFIX'])
176
177 def testSendECS(self):
178 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
179 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
180 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
181 self.sendECSQuery(query, expected)
182
183 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
184 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
185 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
186 self.checkECSQueryHit(query, expected)
187
188 def testNoECS(self):
189 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
190 query = dns.message.make_query(nameECS, 'TXT')
191 self.sendECSQuery(query, expected)
192
193 def testRequireNoECS(self):
194 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
195
196 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
197 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
198 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
199 self.sendECSQuery(query, expected)
200
201 class testECSByNameLarger(ECSTest):
202 _confdir = 'ECSByNameLarger'
203
204 _config_template = """edns-subnet-allow-list=ecs-echo.example.
205 ecs-ipv4-bits=32
206 forward-zones=ecs-echo.example=%s.21
207 ecs-ipv4-cache-bits=32
208 ecs-ipv6-cache-bits=128
209 """ % (os.environ['PREFIX'])
210
211 def testSendECS(self):
212 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
213 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
214 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
215 self.sendECSQuery(query, expected)
216
217 # check that a query in a different range is a miss
218 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
219 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
220 self.sendECSQuery(query, expected)
221
222 def testNoECS(self):
223 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
224 query = dns.message.make_query(nameECS, 'TXT')
225 self.sendECSQuery(query, expected)
226
227 def testRequireNoECS(self):
228 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
229
230 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
231 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
232 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
233 self.sendECSQuery(query, expected)
234
235 class testECSByNameSmaller(ECSTest):
236 _confdir = 'ECSByNameLarger'
237
238 _config_template = """edns-subnet-allow-list=ecs-echo.example.
239 ecs-ipv4-bits=16
240 forward-zones=ecs-echo.example=%s.21
241 """ % (os.environ['PREFIX'])
242
243 def testSendECS(self):
244 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
245 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
246 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
247 self.sendECSQuery(query, expected)
248
249 def testNoECS(self):
250 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
251 query = dns.message.make_query(nameECS, 'TXT')
252 self.sendECSQuery(query, expected)
253
254 def testRequireNoECS(self):
255 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
256
257 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
258 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
259 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
260 self.sendECSQuery(query, expected)
261
262 class testIncomingECSByName(ECSTest):
263 _confdir = 'ECSIncomingByName'
264
265 _config_template = """edns-subnet-allow-list=ecs-echo.example.
266 use-incoming-edns-subnet=yes
267 forward-zones=ecs-echo.example=%s.21
268 ecs-scope-zero-address=2001:db8::42
269 ecs-ipv4-cache-bits=32
270 ecs-ipv6-cache-bits=128
271 """ % (os.environ['PREFIX'])
272
273 def testSendECS(self):
274 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
275 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
276 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
277 self.sendECSQuery(query, expected, ttlECS)
278
279 # check that a query in the same ECS range is a hit
280 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
281 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
282 self.checkECSQueryHit(query, expected)
283
284 # check that a query in a different ECS range is a miss
285 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
286 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
287 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
288 self.sendECSQuery(query, expected)
289
290 def testNoECS(self):
291 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
292 query = dns.message.make_query(nameECS, 'TXT')
293 self.sendECSQuery(query, expected, ttlECS)
294
295 def testRequireNoECS(self):
296 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
297
298 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
299 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
300 self.sendECSQuery(query, expected, ttlECS)
301
302 class testIncomingECSByNameLarger(ECSTest):
303 _confdir = 'ECSIncomingByNameLarger'
304
305 _config_template = """edns-subnet-allow-list=ecs-echo.example.
306 use-incoming-edns-subnet=yes
307 ecs-ipv4-bits=32
308 forward-zones=ecs-echo.example=%s.21
309 ecs-scope-zero-address=192.168.0.1
310 ecs-ipv4-cache-bits=32
311 ecs-ipv6-cache-bits=128
312 """ % (os.environ['PREFIX'])
313
314 def testSendECS(self):
315 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
316
317 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
318 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
319 self.sendECSQuery(query, expected, ttlECS)
320
321 def testNoECS(self):
322 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
323
324 query = dns.message.make_query(nameECS, 'TXT')
325 self.sendECSQuery(query, expected, ttlECS)
326
327 def testRequireNoECS(self):
328 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
329
330 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
331 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
332 self.sendECSQuery(query, expected, ttlECS)
333
334 class testIncomingECSByNameSmaller(ECSTest):
335 _confdir = 'ECSIncomingByNameSmaller'
336
337 _config_template = """edns-subnet-allow-list=ecs-echo.example.
338 use-incoming-edns-subnet=yes
339 ecs-ipv4-bits=16
340 forward-zones=ecs-echo.example=%s.21
341 ecs-scope-zero-address=192.168.0.1
342 ecs-ipv4-cache-bits=32
343 ecs-ipv6-cache-bits=128
344 """ % (os.environ['PREFIX'])
345
346 def testSendECS(self):
347 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
348 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
349 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
350 self.sendECSQuery(query, expected)
351
352 def testNoECS(self):
353 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
354 query = dns.message.make_query(nameECS, 'TXT')
355 self.sendECSQuery(query, expected)
356
357 def testRequireNoECS(self):
358 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
359
360 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
361 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
362 self.sendECSQuery(query, expected, ttlECS)
363
364 @unittest.skipIf(not have_ipv6(), "No IPv6")
365 class testIncomingECSByNameV6(ECSTest):
366 _confdir = 'ECSIncomingByNameV6'
367
368 _config_template = """edns-subnet-allow-list=ecs-echo.example.
369 use-incoming-edns-subnet=yes
370 ecs-ipv6-bits=128
371 ecs-ipv4-cache-bits=32
372 ecs-ipv6-cache-bits=128
373 query-local-address=::1
374 forward-zones=ecs-echo.example=[::1]:53000
375 """
376
377 def testSendECS(self):
378 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
379 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
380 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
381 self.sendECSQuery(query, expected, ttlECS)
382
383 def testNoECS(self):
384 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
385
386 query = dns.message.make_query(nameECS, 'TXT')
387 res = self.sendUDPQuery(query)
388 self.sendECSQuery(query, expected, ttlECS)
389
390 def testRequireNoECS(self):
391 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
392 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
393
394 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
395 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
396 self.sendECSQuery(query, expected, ttlECS)
397
398 class testECSNameMismatch(ECSTest):
399 _confdir = 'ECSNameMismatch'
400
401 _config_template = """edns-subnet-allow-list=not-the-right-name.example.
402 forward-zones=ecs-echo.example=%s.21
403 """ % (os.environ['PREFIX'])
404
405 def testSendECS(self):
406 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
407 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
408 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
409 self.sendECSQuery(query, expected)
410
411 def testNoECS(self):
412 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
413 query = dns.message.make_query(nameECS, 'TXT')
414 self.sendECSQuery(query, expected)
415
416 def testRequireNoECS(self):
417 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
418
419 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
420 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
421 self.sendECSQuery(query, expected)
422
423 class testECSByIP(ECSTest):
424 _confdir = 'ECSByIP'
425
426 _config_template = """edns-subnet-allow-list=%s.21
427 forward-zones=ecs-echo.example=%s.21
428 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
429
430 def testSendECS(self):
431 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
432 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
433 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
434 self.sendECSQuery(query, expected)
435
436 def testNoECS(self):
437 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
438 query = dns.message.make_query(nameECS, 'TXT')
439 self.sendECSQuery(query, expected)
440
441 def testRequireNoECS(self):
442 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
443
444 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
445 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
446 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
447 self.sendECSQuery(query, expected)
448
449 class testIncomingECSByIP(ECSTest):
450 _confdir = 'ECSIncomingByIP'
451
452 _config_template = """edns-subnet-allow-list=%s.21
453 use-incoming-edns-subnet=yes
454 forward-zones=ecs-echo.example=%s.21
455 ecs-scope-zero-address=::1
456 ecs-ipv4-cache-bits=32
457 ecs-ipv6-cache-bits=128
458 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
459
460 def testSendECS(self):
461 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
462
463 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
464 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
465 self.sendECSQuery(query, expected)
466
467 def testNoECS(self):
468 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
469 query = dns.message.make_query(nameECS, 'TXT')
470 self.sendECSQuery(query, expected)
471
472 def testRequireNoECS(self):
473 # we will get ::1 because ecs-scope-zero-addr is set to ::1
474 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
475
476 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
477 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
478 self.sendECSQuery(query, expected, ttlECS)
479
480 def testSendECSInvalidScope(self):
481 # test that the recursor does not cache with a more specific scope than the source it sent
482 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
483
484 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
485 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
486
487 self.sendECSQuery(query, expected)
488
489 class testECSIPMismatch(ECSTest):
490 _confdir = 'ECSIPMismatch'
491
492 _config_template = """edns-subnet-allow-list=192.0.2.1
493 forward-zones=ecs-echo.example=%s.21
494 """ % (os.environ['PREFIX'])
495
496 def testSendECS(self):
497 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
498 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
499 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
500 self.sendECSQuery(query, expected)
501
502 def testNoECS(self):
503 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
504 query = dns.message.make_query(nameECS, 'TXT')
505 res = self.sendUDPQuery(query)
506 self.sendECSQuery(query, expected)
507
508 def testRequireNoECS(self):
509 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
510
511 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
512 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
513 self.sendECSQuery(query, expected)
514
515 class testECSWithProxyProtocoldRecursorTest(ECSTest):
516 _confdir = 'ECSWithProxyProtocol'
517 _config_template = """
518 ecs-add-for=2001:db8::1/128
519 edns-subnet-allow-list=ecs-echo.example.
520 forward-zones=ecs-echo.example=%s.21
521 proxy-protocol-from=127.0.0.1/32
522 allow-from=2001:db8::1/128
523 """ % (os.environ['PREFIX'])
524
525 def testProxyProtocolPlusECS(self):
526 qname = nameECS
527 expected = dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', '2001:db8::/56')
528
529 query = dns.message.make_query(qname, 'TXT', use_edns=True)
530 for method in ("sendUDPQueryWithProxyProtocol", "sendTCPQueryWithProxyProtocol"):
531 sender = getattr(self, method)
532 res = sender(query, True, '2001:db8::1', '2001:db8::2', 0, 65535)
533 self.assertRcodeEqual(res, dns.rcode.NOERROR)
534 self.assertRRsetInAnswer(res, expected)
535
536 class testTooLargeToAddZeroScope(RecursorTest):
537
538 _confdir = 'TooLargeToAddZeroScope'
539 _config_template_default = """
540 use-incoming-edns-subnet=yes
541 dnssec=validate
542 daemon=no
543 trace=yes
544 packetcache-ttl=0
545 packetcache-servfail-ttl=0
546 max-cache-ttl=15
547 threads=1
548 loglevel=9
549 disable-syslog=yes
550 log-common-errors=yes
551 """
552 _config_template = """
553 """
554 _lua_dns_script_file = """
555 function preresolve(dq)
556 if dq.qname == newDN('toolarge.ecs.') then
557 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
558 return true
559 end
560 return false
561 end
562 """ % ('A'*447)
563
564 _roothints = None
565
566 @classmethod
567 def setUpClass(cls):
568
569 # we don't need all the auth stuff
570 cls.setUpSockets()
571 cls.startResponders()
572
573 confdir = os.path.join('configs', cls._confdir)
574 cls.createConfigDir(confdir)
575
576 cls.generateRecursorConfig(confdir)
577 cls.startRecursor(confdir, cls._recursorPort)
578
579 @classmethod
580 def tearDownClass(cls):
581 cls.tearDownRecursor()
582
583 @classmethod
584 def generateRecursorConfig(cls, confdir):
585 super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir)
586
587 def testTooLarge(self):
588 qname = 'toolarge.ecs.'
589 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
590 query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
591
592 # should not have an ECS Option since the packet is too large already
593 res = self.sendUDPQuery(query, timeout=5.0)
594 self.assertRcodeEqual(res, dns.rcode.NOERROR)
595 self.assertEqual(len(res.answer), 1)
596 self.assertEqual(res.edns, 0)
597 self.assertEqual(len(res.options), 0)
598
599 res = self.sendTCPQuery(query, timeout=5.0)
600 self.assertRcodeEqual(res, dns.rcode.NOERROR)
601 self.assertEqual(len(res.answer), 1)
602 self.assertEqual(res.edns, 0)
603 self.assertEqual(len(res.options), 1)
604 self.assertEqual(res.options[0].otype, 8)
605 self.assertEqual(res.options[0].scope, 0)
606
607 class UDPECSResponder(DatagramProtocol):
608 @staticmethod
609 def ipToStr(option):
610 if option.family == clientsubnetoption.FAMILY_IPV4:
611 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
612 elif option.family == clientsubnetoption.FAMILY_IPV6:
613 ip = socket.inet_ntop(socket.AF_INET6,
614 struct.pack('!QQ',
615 option.ip >> 64,
616 option.ip & (2 ** 64 - 1)))
617 return ip
618
619 def datagramReceived(self, datagram, address):
620 request = dns.message.from_wire(datagram)
621
622 response = dns.message.make_response(request)
623 response.flags |= dns.flags.AA
624 ecso = None
625
626 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
627
628 text = emptyECSText
629 for option in request.options:
630 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
631 text = self.ipToStr(option) + '/' + str(option.mask)
632
633 # Send a scope more specific than the received source for nameECSInvalidScope
634 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
635 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
636 else:
637 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
638
639 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
640 response.answer.append(answer)
641
642 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
643 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
644 response.answer.append(answer)
645 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
646 response.additional.append(additional)
647
648 if ecso:
649 response.use_edns(options = [ecso])
650
651 self.transport.write(response.to_wire(), address)