]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
tests: stop using deprecated unittest names
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 import unittest
9 from recursortests import RecursorTest, have_ipv6
10 from twisted.internet.protocol import DatagramProtocol
11 from twisted.internet import reactor
12
13 emptyECSText = 'No ECS received'
14 nameECS = 'ecs-echo.example.'
15 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
16 ttlECS = 60
17 ecsReactorRunning = False
18 ecsReactorv6Running = False
19
20 class ECSTest(RecursorTest):
21 _config_template_default = """
22 daemon=no
23 trace=yes
24 dont-query=
25 ecs-add-for=0.0.0.0/0
26 local-address=127.0.0.1
27 packetcache-ttl=0
28 packetcache-servfail-ttl=0
29 max-cache-ttl=600
30 threads=1
31 loglevel=9
32 disable-syslog=yes
33 """
34
35 def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
36 res = self.sendUDPQuery(query)
37
38 self.assertRcodeEqual(res, dns.rcode.NOERROR)
39 self.assertRRsetInAnswer(res, expected)
40 # this will break if you are not looking for the first RR, sorry!
41 if expectedFirstTTL is not None:
42 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
43 else:
44 expectedFirstTTL = res.answer[0].ttl
45 self.assertEqual(res.edns, query.edns)
46
47 if scopeZeroResponse is not None:
48 self.assertEqual(res.edns, 0)
49 if scopeZeroResponse:
50 self.assertEqual(len(res.options), 1)
51 self.assertEqual(res.options[0].otype, 8)
52 self.assertEqual(res.options[0].scope, 0)
53 else:
54 self.assertEqual(len(res.options), 1)
55 self.assertEqual(res.options[0].otype, 8)
56 self.assertNotEqual(res.options[0].scope, 0)
57
58 # wait one second, check that the TTL has been
59 # decreased indicating a cache hit
60 time.sleep(1)
61
62 res = self.sendUDPQuery(query)
63
64 self.assertRcodeEqual(res, dns.rcode.NOERROR)
65 self.assertRRsetInAnswer(res, expected)
66 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
67 self.assertEqual(res.edns, query.edns)
68
69 def checkECSQueryHit(self, query, expected):
70 res = self.sendUDPQuery(query)
71
72 self.assertRcodeEqual(res, dns.rcode.NOERROR)
73 self.assertRRsetInAnswer(res, expected)
74 # this will break if you are not looking for the first RR, sorry!
75 self.assertLess(res.answer[0].ttl, ttlECS)
76
77 @classmethod
78 def startResponders(cls):
79 global ecsReactorRunning
80 global ecsReactorv6Running
81 print("Launching responders..")
82
83 address = cls._PREFIX + '.21'
84 port = 53
85
86 if not ecsReactorRunning:
87 reactor.listenUDP(port, UDPECSResponder(), interface=address)
88 ecsReactorRunning = True
89
90 if not ecsReactorv6Running and have_ipv6():
91 reactor.listenUDP(53000, UDPECSResponder(), interface='::1')
92 ecsReactorv6Running = True
93
94 if not reactor.running:
95 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
96 cls._UDPResponder.setDaemon(True)
97 cls._UDPResponder.start()
98
99 @classmethod
100 def setUpClass(cls):
101 cls.setUpSockets()
102
103 cls.startResponders()
104
105 confdir = os.path.join('configs', cls._confdir)
106 cls.createConfigDir(confdir)
107
108 cls.generateRecursorConfig(confdir)
109 cls.startRecursor(confdir, cls._recursorPort)
110
111 print("Launching tests..")
112
113 @classmethod
114 def tearDownClass(cls):
115 cls.tearDownRecursor()
116
117 class testNoECS(ECSTest):
118 _confdir = 'NoECS'
119
120 _config_template = """edns-subnet-whitelist=
121 forward-zones=ecs-echo.example=%s.21
122 """ % (os.environ['PREFIX'])
123
124 def testSendECS(self):
125 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
126 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
127 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
128 self.sendECSQuery(query, expected)
129
130 def testNoECS(self):
131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
132 query = dns.message.make_query(nameECS, 'TXT')
133 self.sendECSQuery(query, expected)
134
135 def testRequireNoECS(self):
136 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
137
138 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
139 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
140 self.sendECSQuery(query, expected)
141
142 class testIncomingNoECS(ECSTest):
143 _confdir = 'IncomingNoECS'
144
145 _config_template = """edns-subnet-whitelist=
146 use-incoming-edns-subnet=yes
147 forward-zones=ecs-echo.example=%s.21
148 """ % (os.environ['PREFIX'])
149
150 def testSendECS(self):
151 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
152
153 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
154 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
155 self.sendECSQuery(query, expected, scopeZeroResponse=True)
156
157 def testNoECS(self):
158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
159
160 query = dns.message.make_query(nameECS, 'TXT')
161 self.sendECSQuery(query, expected)
162
163 def testRequireNoECS(self):
164 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
165
166 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
167 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
168 self.sendECSQuery(query, expected, scopeZeroResponse=True)
169
170 class testECSByName(ECSTest):
171 _confdir = 'ECSByName'
172
173 _config_template = """edns-subnet-whitelist=ecs-echo.example.
174 forward-zones=ecs-echo.example=%s.21
175 """ % (os.environ['PREFIX'])
176
177 def testSendECS(self):
178 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
179 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
180 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
181 self.sendECSQuery(query, expected)
182
183 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
184 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
185 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
186 self.checkECSQueryHit(query, expected)
187
188 def testNoECS(self):
189 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
190 query = dns.message.make_query(nameECS, 'TXT')
191 self.sendECSQuery(query, expected)
192
193 def testRequireNoECS(self):
194 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
195
196 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
197 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
198 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
199 self.sendECSQuery(query, expected)
200
201 class testECSByNameLarger(ECSTest):
202 _confdir = 'ECSByNameLarger'
203
204 _config_template = """edns-subnet-whitelist=ecs-echo.example.
205 ecs-ipv4-bits=32
206 forward-zones=ecs-echo.example=%s.21
207 ecs-ipv4-cache-bits=32
208 ecs-ipv6-cache-bits=128
209 """ % (os.environ['PREFIX'])
210
211 def testSendECS(self):
212 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
213 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
214 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
215 self.sendECSQuery(query, expected)
216
217 # check that a query in a different range is a miss
218 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
219 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
220 self.sendECSQuery(query, expected)
221
222 def testNoECS(self):
223 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
224 query = dns.message.make_query(nameECS, 'TXT')
225 self.sendECSQuery(query, expected)
226
227 def testRequireNoECS(self):
228 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
229
230 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
231 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
232 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
233 self.sendECSQuery(query, expected)
234
235 class testECSByNameSmaller(ECSTest):
236 _confdir = 'ECSByNameLarger'
237
238 _config_template = """edns-subnet-whitelist=ecs-echo.example.
239 ecs-ipv4-bits=16
240 forward-zones=ecs-echo.example=%s.21
241 """ % (os.environ['PREFIX'])
242
243 def testSendECS(self):
244 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
245 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
246 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
247 self.sendECSQuery(query, expected)
248
249 def testNoECS(self):
250 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
251 query = dns.message.make_query(nameECS, 'TXT')
252 self.sendECSQuery(query, expected)
253
254 def testRequireNoECS(self):
255 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
256
257 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
258 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
259 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
260 self.sendECSQuery(query, expected)
261
262 class testIncomingECSByName(ECSTest):
263 _confdir = 'ECSIncomingByName'
264
265 _config_template = """edns-subnet-whitelist=ecs-echo.example.
266 use-incoming-edns-subnet=yes
267 forward-zones=ecs-echo.example=%s.21
268 ecs-scope-zero-address=2001:db8::42
269 ecs-ipv4-cache-bits=32
270 ecs-ipv6-cache-bits=128
271 """ % (os.environ['PREFIX'])
272
273 def testSendECS(self):
274 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
275 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
276 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
277 self.sendECSQuery(query, expected, ttlECS)
278
279 # check that a query in the same ECS range is a hit
280 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
281 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
282 self.checkECSQueryHit(query, expected)
283
284 # check that a query in a different ECS range is a miss
285 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
286 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
287 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
288 self.sendECSQuery(query, expected)
289
290 def testNoECS(self):
291 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
292 query = dns.message.make_query(nameECS, 'TXT')
293 self.sendECSQuery(query, expected, ttlECS)
294
295 def testRequireNoECS(self):
296 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
297
298 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
299 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
300 self.sendECSQuery(query, expected, ttlECS)
301
302 class testIncomingECSByNameLarger(ECSTest):
303 _confdir = 'ECSIncomingByNameLarger'
304
305 _config_template = """edns-subnet-whitelist=ecs-echo.example.
306 use-incoming-edns-subnet=yes
307 ecs-ipv4-bits=32
308 forward-zones=ecs-echo.example=%s.21
309 ecs-scope-zero-address=192.168.0.1
310 ecs-ipv4-cache-bits=32
311 ecs-ipv6-cache-bits=128
312 """ % (os.environ['PREFIX'])
313
314 def testSendECS(self):
315 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
316
317 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
318 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
319 self.sendECSQuery(query, expected, ttlECS)
320
321 def testNoECS(self):
322 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
323
324 query = dns.message.make_query(nameECS, 'TXT')
325 self.sendECSQuery(query, expected, ttlECS)
326
327 def testRequireNoECS(self):
328 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
329
330 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
331 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
332 self.sendECSQuery(query, expected, ttlECS)
333
334 class testIncomingECSByNameSmaller(ECSTest):
335 _confdir = 'ECSIncomingByNameSmaller'
336
337 _config_template = """edns-subnet-whitelist=ecs-echo.example.
338 use-incoming-edns-subnet=yes
339 ecs-ipv4-bits=16
340 forward-zones=ecs-echo.example=%s.21
341 ecs-scope-zero-address=192.168.0.1
342 ecs-ipv4-cache-bits=32
343 ecs-ipv6-cache-bits=128
344 """ % (os.environ['PREFIX'])
345
346 def testSendECS(self):
347 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
348 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
349 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
350 self.sendECSQuery(query, expected)
351
352 def testNoECS(self):
353 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
354 query = dns.message.make_query(nameECS, 'TXT')
355 self.sendECSQuery(query, expected)
356
357 def testRequireNoECS(self):
358 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
359
360 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
361 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
362 self.sendECSQuery(query, expected, ttlECS)
363
364 @unittest.skipIf(not have_ipv6(), "No IPv6")
365 class testIncomingECSByNameV6(ECSTest):
366 _confdir = 'ECSIncomingByNameV6'
367
368 _config_template = """edns-subnet-whitelist=ecs-echo.example.
369 use-incoming-edns-subnet=yes
370 ecs-ipv6-bits=128
371 ecs-ipv4-cache-bits=32
372 ecs-ipv6-cache-bits=128
373 query-local-address=::1
374 forward-zones=ecs-echo.example=[::1]:53000
375 """
376
377 def testSendECS(self):
378 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
379 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
380 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
381 self.sendECSQuery(query, expected, ttlECS)
382
383 def testNoECS(self):
384 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
385
386 query = dns.message.make_query(nameECS, 'TXT')
387 res = self.sendUDPQuery(query)
388 self.sendECSQuery(query, expected, ttlECS)
389
390 def testRequireNoECS(self):
391 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
392 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
393
394 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
395 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
396 self.sendECSQuery(query, expected, ttlECS)
397
398 class testECSNameMismatch(ECSTest):
399 _confdir = 'ECSNameMismatch'
400
401 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
402 forward-zones=ecs-echo.example=%s.21
403 """ % (os.environ['PREFIX'])
404
405 def testSendECS(self):
406 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
407 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
408 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
409 self.sendECSQuery(query, expected)
410
411 def testNoECS(self):
412 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
413 query = dns.message.make_query(nameECS, 'TXT')
414 self.sendECSQuery(query, expected)
415
416 def testRequireNoECS(self):
417 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
418
419 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
420 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
421 self.sendECSQuery(query, expected)
422
423 class testECSByIP(ECSTest):
424 _confdir = 'ECSByIP'
425
426 _config_template = """edns-subnet-whitelist=%s.21
427 forward-zones=ecs-echo.example=%s.21
428 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
429
430 def testSendECS(self):
431 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
432 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
433 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
434 self.sendECSQuery(query, expected)
435
436 def testNoECS(self):
437 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
438 query = dns.message.make_query(nameECS, 'TXT')
439 self.sendECSQuery(query, expected)
440
441 def testRequireNoECS(self):
442 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
443
444 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
445 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
446 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
447 self.sendECSQuery(query, expected)
448
449 class testIncomingECSByIP(ECSTest):
450 _confdir = 'ECSIncomingByIP'
451
452 _config_template = """edns-subnet-whitelist=%s.21
453 use-incoming-edns-subnet=yes
454 forward-zones=ecs-echo.example=%s.21
455 ecs-scope-zero-address=::1
456 ecs-ipv4-cache-bits=32
457 ecs-ipv6-cache-bits=128
458 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
459
460 def testSendECS(self):
461 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
462
463 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
464 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
465 self.sendECSQuery(query, expected)
466
467 def testNoECS(self):
468 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
469 query = dns.message.make_query(nameECS, 'TXT')
470 self.sendECSQuery(query, expected)
471
472 def testRequireNoECS(self):
473 # we will get ::1 because ecs-scope-zero-addr is set to ::1
474 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
475
476 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
477 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
478 self.sendECSQuery(query, expected, ttlECS)
479
480 def testSendECSInvalidScope(self):
481 # test that the recursor does not cache with a more specific scope than the source it sent
482 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
483
484 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
485 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
486
487 self.sendECSQuery(query, expected)
488
489 class testECSIPMismatch(ECSTest):
490 _confdir = 'ECSIPMismatch'
491
492 _config_template = """edns-subnet-whitelist=192.0.2.1
493 forward-zones=ecs-echo.example=%s.21
494 """ % (os.environ['PREFIX'])
495
496 def testSendECS(self):
497 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
498 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
499 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
500 self.sendECSQuery(query, expected)
501
502 def testNoECS(self):
503 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
504 query = dns.message.make_query(nameECS, 'TXT')
505 res = self.sendUDPQuery(query)
506 self.sendECSQuery(query, expected)
507
508 def testRequireNoECS(self):
509 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
510
511 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
512 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
513 self.sendECSQuery(query, expected)
514
515 class testTooLargeToAddZeroScope(RecursorTest):
516
517 _confdir = 'TooLargeToAddZeroScope'
518 _config_template_default = """
519 use-incoming-edns-subnet=yes
520 dnssec=validate
521 daemon=no
522 trace=yes
523 packetcache-ttl=0
524 packetcache-servfail-ttl=0
525 max-cache-ttl=15
526 threads=1
527 loglevel=9
528 disable-syslog=yes
529 log-common-errors=yes
530 """
531 _config_template = """
532 """
533 _lua_dns_script_file = """
534 function preresolve(dq)
535 if dq.qname == newDN('toolarge.ecs.') then
536 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
537 return true
538 end
539 return false
540 end
541 """ % ('A'*447)
542
543 _roothints = None
544
545 @classmethod
546 def setUpClass(cls):
547
548 # we don't need all the auth stuff
549 cls.setUpSockets()
550 cls.startResponders()
551
552 confdir = os.path.join('configs', cls._confdir)
553 cls.createConfigDir(confdir)
554
555 cls.generateRecursorConfig(confdir)
556 cls.startRecursor(confdir, cls._recursorPort)
557
558 @classmethod
559 def tearDownClass(cls):
560 cls.tearDownRecursor()
561
562 @classmethod
563 def generateRecursorConfig(cls, confdir):
564 super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir)
565
566 def testTooLarge(self):
567 qname = 'toolarge.ecs.'
568 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
569 query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
570
571 # should not have an ECS Option since the packet is too large already
572 res = self.sendUDPQuery(query, timeout=5.0)
573 self.assertRcodeEqual(res, dns.rcode.NOERROR)
574 self.assertEqual(len(res.answer), 1)
575 self.assertEqual(res.edns, 0)
576 self.assertEqual(len(res.options), 0)
577
578 res = self.sendTCPQuery(query, timeout=5.0)
579 self.assertRcodeEqual(res, dns.rcode.NOERROR)
580 self.assertEqual(len(res.answer), 1)
581 self.assertEqual(res.edns, 0)
582 self.assertEqual(len(res.options), 1)
583 self.assertEqual(res.options[0].otype, 8)
584 self.assertEqual(res.options[0].scope, 0)
585
586 class UDPECSResponder(DatagramProtocol):
587 @staticmethod
588 def ipToStr(option):
589 if option.family == clientsubnetoption.FAMILY_IPV4:
590 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
591 elif option.family == clientsubnetoption.FAMILY_IPV6:
592 ip = socket.inet_ntop(socket.AF_INET6,
593 struct.pack('!QQ',
594 option.ip >> 64,
595 option.ip & (2 ** 64 - 1)))
596 return ip
597
598 def datagramReceived(self, datagram, address):
599 request = dns.message.from_wire(datagram)
600
601 response = dns.message.make_response(request)
602 response.flags |= dns.flags.AA
603 ecso = None
604
605 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
606
607 text = emptyECSText
608 for option in request.options:
609 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
610 text = self.ipToStr(option) + '/' + str(option.mask)
611
612 # Send a scope more specific than the received source for nameECSInvalidScope
613 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
614 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
615 else:
616 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
617
618 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
619 response.answer.append(answer)
620
621 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
622 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
623 response.answer.append(answer)
624 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
625 response.additional.append(additional)
626
627 if ecso:
628 response.use_edns(options = [ecso])
629
630 self.transport.write(response.to_wire(), address)