]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #9106 from omoerbeek/release-cycles
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 emptyECSText = 'No ECS received'
13 nameECS = 'ecs-echo.example.'
14 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
15 ttlECS = 60
16 ecsReactorRunning = False
17
18 class ECSTest(RecursorTest):
19 _config_template_default = """
20 daemon=no
21 trace=yes
22 dont-query=
23 ecs-add-for=0.0.0.0/0
24 local-address=127.0.0.1
25 packetcache-ttl=0
26 packetcache-servfail-ttl=0
27 max-cache-ttl=600
28 threads=1
29 loglevel=9
30 disable-syslog=yes
31 """
32
33 def sendECSQuery(self, query, expected, expectedFirstTTL=None):
34 res = self.sendUDPQuery(query)
35
36 self.assertRcodeEqual(res, dns.rcode.NOERROR)
37 self.assertRRsetInAnswer(res, expected)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL is not None:
40 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
41 else:
42 expectedFirstTTL = res.answer[0].ttl
43
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
46 time.sleep(1)
47
48 res = self.sendUDPQuery(query)
49
50 self.assertRcodeEqual(res, dns.rcode.NOERROR)
51 self.assertRRsetInAnswer(res, expected)
52 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
53
54 def checkECSQueryHit(self, query, expected):
55 res = self.sendUDPQuery(query)
56
57 self.assertRcodeEqual(res, dns.rcode.NOERROR)
58 self.assertRRsetInAnswer(res, expected)
59 # this will break if you are not looking for the first RR, sorry!
60 self.assertLess(res.answer[0].ttl, ttlECS)
61
62 @classmethod
63 def startResponders(cls):
64 global ecsReactorRunning
65 print("Launching responders..")
66
67 address = cls._PREFIX + '.21'
68 port = 53
69
70 if not ecsReactorRunning:
71 reactor.listenUDP(port, UDPECSResponder(), interface=address)
72 ecsReactorRunning = True
73
74 if not reactor.running:
75 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
76 cls._UDPResponder.setDaemon(True)
77 cls._UDPResponder.start()
78
79 @classmethod
80 def setUpClass(cls):
81 cls.setUpSockets()
82
83 cls.startResponders()
84
85 confdir = os.path.join('configs', cls._confdir)
86 cls.createConfigDir(confdir)
87
88 cls.generateRecursorConfig(confdir)
89 cls.startRecursor(confdir, cls._recursorPort)
90
91 print("Launching tests..")
92
93 @classmethod
94 def tearDownClass(cls):
95 cls.tearDownRecursor()
96
97 class testNoECS(ECSTest):
98 _confdir = 'NoECS'
99
100 _config_template = """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os.environ['PREFIX'])
103
104 def testSendECS(self):
105 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
106 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
107 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
108 self.sendECSQuery(query, expected)
109
110 def testNoECS(self):
111 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
112 query = dns.message.make_query(nameECS, 'TXT')
113 self.sendECSQuery(query, expected)
114
115 def testRequireNoECS(self):
116 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
117
118 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
119 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
120 self.sendECSQuery(query, expected)
121
122 class testIncomingNoECS(ECSTest):
123 _confdir = 'IncomingNoECS'
124
125 _config_template = """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os.environ['PREFIX'])
129
130 def testSendECS(self):
131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
132
133 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
134 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
135 self.sendECSQuery(query, expected)
136
137 def testNoECS(self):
138 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
139
140 query = dns.message.make_query(nameECS, 'TXT')
141 self.sendECSQuery(query, expected)
142
143 def testRequireNoECS(self):
144 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
145
146 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
147 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
148 self.sendECSQuery(query, expected)
149
150 class testECSByName(ECSTest):
151 _confdir = 'ECSByName'
152
153 _config_template = """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os.environ['PREFIX'])
156
157 def testSendECS(self):
158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
159 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
160 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
161 self.sendECSQuery(query, expected)
162
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
165 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
166 self.checkECSQueryHit(query, expected)
167
168 def testNoECS(self):
169 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
170 query = dns.message.make_query(nameECS, 'TXT')
171 self.sendECSQuery(query, expected)
172
173 def testRequireNoECS(self):
174 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
175
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
178 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
179 self.sendECSQuery(query, expected)
180
181 class testECSByNameLarger(ECSTest):
182 _confdir = 'ECSByNameLarger'
183
184 _config_template = """edns-subnet-whitelist=ecs-echo.example.
185 ecs-ipv4-bits=32
186 forward-zones=ecs-echo.example=%s.21
187 ecs-ipv4-cache-bits=32
188 ecs-ipv6-cache-bits=128
189 """ % (os.environ['PREFIX'])
190
191 def testSendECS(self):
192 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
193 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
194 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
195 self.sendECSQuery(query, expected)
196
197 # check that a query in a different range is a miss
198 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
199 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
200 self.sendECSQuery(query, expected)
201
202 def testNoECS(self):
203 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
204 query = dns.message.make_query(nameECS, 'TXT')
205 self.sendECSQuery(query, expected)
206
207 def testRequireNoECS(self):
208 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
209
210 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
211 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
212 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
213 self.sendECSQuery(query, expected)
214
215 class testECSByNameSmaller(ECSTest):
216 _confdir = 'ECSByNameLarger'
217
218 _config_template = """edns-subnet-whitelist=ecs-echo.example.
219 ecs-ipv4-bits=16
220 forward-zones=ecs-echo.example=%s.21
221 """ % (os.environ['PREFIX'])
222
223 def testSendECS(self):
224 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
225 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
226 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
227 self.sendECSQuery(query, expected)
228
229 def testNoECS(self):
230 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
231 query = dns.message.make_query(nameECS, 'TXT')
232 self.sendECSQuery(query, expected)
233
234 def testRequireNoECS(self):
235 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
236
237 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
238 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
239 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
240 self.sendECSQuery(query, expected)
241
242 class testIncomingECSByName(ECSTest):
243 _confdir = 'ECSIncomingByName'
244
245 _config_template = """edns-subnet-whitelist=ecs-echo.example.
246 use-incoming-edns-subnet=yes
247 forward-zones=ecs-echo.example=%s.21
248 ecs-scope-zero-address=2001:db8::42
249 ecs-ipv4-cache-bits=32
250 ecs-ipv6-cache-bits=128
251 """ % (os.environ['PREFIX'])
252
253 def testSendECS(self):
254 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
255 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
256 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
257 self.sendECSQuery(query, expected, ttlECS)
258
259 # check that a query in the same ECS range is a hit
260 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
261 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
262 self.checkECSQueryHit(query, expected)
263
264 # check that a query in a different ECS range is a miss
265 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
266 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
267 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
268 self.sendECSQuery(query, expected)
269
270 def testNoECS(self):
271 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
272 query = dns.message.make_query(nameECS, 'TXT')
273 self.sendECSQuery(query, expected, ttlECS)
274
275 def testRequireNoECS(self):
276 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
277
278 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
279 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
280 self.sendECSQuery(query, expected, ttlECS)
281
282 class testIncomingECSByNameLarger(ECSTest):
283 _confdir = 'ECSIncomingByNameLarger'
284
285 _config_template = """edns-subnet-whitelist=ecs-echo.example.
286 use-incoming-edns-subnet=yes
287 ecs-ipv4-bits=32
288 forward-zones=ecs-echo.example=%s.21
289 ecs-scope-zero-address=192.168.0.1
290 ecs-ipv4-cache-bits=32
291 ecs-ipv6-cache-bits=128
292 """ % (os.environ['PREFIX'])
293
294 def testSendECS(self):
295 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
296
297 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
298 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
299 self.sendECSQuery(query, expected, ttlECS)
300
301 def testNoECS(self):
302 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
303
304 query = dns.message.make_query(nameECS, 'TXT')
305 self.sendECSQuery(query, expected, ttlECS)
306
307 def testRequireNoECS(self):
308 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
309
310 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
311 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
312 self.sendECSQuery(query, expected, ttlECS)
313
314 class testIncomingECSByNameSmaller(ECSTest):
315 _confdir = 'ECSIncomingByNameSmaller'
316
317 _config_template = """edns-subnet-whitelist=ecs-echo.example.
318 use-incoming-edns-subnet=yes
319 ecs-ipv4-bits=16
320 forward-zones=ecs-echo.example=%s.21
321 ecs-scope-zero-address=192.168.0.1
322 ecs-ipv4-cache-bits=32
323 ecs-ipv6-cache-bits=128
324 """ % (os.environ['PREFIX'])
325
326 def testSendECS(self):
327 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
328 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
329 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
330 self.sendECSQuery(query, expected)
331
332 def testNoECS(self):
333 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
334 query = dns.message.make_query(nameECS, 'TXT')
335 self.sendECSQuery(query, expected)
336
337 def testRequireNoECS(self):
338 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
339
340 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
341 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
342 self.sendECSQuery(query, expected, ttlECS)
343
344 class testIncomingECSByNameV6(ECSTest):
345 _confdir = 'ECSIncomingByNameV6'
346
347 _config_template = """edns-subnet-whitelist=ecs-echo.example.
348 use-incoming-edns-subnet=yes
349 ecs-ipv6-bits=128
350 ecs-ipv4-cache-bits=32
351 ecs-ipv6-cache-bits=128
352 forward-zones=ecs-echo.example=%s.21
353 query-local-address=::1
354 """ % (os.environ['PREFIX'])
355
356 def testSendECS(self):
357 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
358 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
359 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
360 self.sendECSQuery(query, expected, ttlECS)
361
362 def testNoECS(self):
363 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
364
365 query = dns.message.make_query(nameECS, 'TXT')
366 res = self.sendUDPQuery(query)
367 self.sendECSQuery(query, expected, ttlECS)
368
369 def testRequireNoECS(self):
370 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
371 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
372
373 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
374 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
375 self.sendECSQuery(query, expected, ttlECS)
376
377 class testECSNameMismatch(ECSTest):
378 _confdir = 'ECSNameMismatch'
379
380 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
381 forward-zones=ecs-echo.example=%s.21
382 """ % (os.environ['PREFIX'])
383
384 def testSendECS(self):
385 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
386 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
387 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
388 self.sendECSQuery(query, expected)
389
390 def testNoECS(self):
391 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
392 query = dns.message.make_query(nameECS, 'TXT')
393 self.sendECSQuery(query, expected)
394
395 def testRequireNoECS(self):
396 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
397
398 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
399 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
400 self.sendECSQuery(query, expected)
401
402 class testECSByIP(ECSTest):
403 _confdir = 'ECSByIP'
404
405 _config_template = """edns-subnet-whitelist=%s.21
406 forward-zones=ecs-echo.example=%s.21
407 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
408
409 def testSendECS(self):
410 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
411 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
412 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
413 self.sendECSQuery(query, expected)
414
415 def testNoECS(self):
416 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
417 query = dns.message.make_query(nameECS, 'TXT')
418 self.sendECSQuery(query, expected)
419
420 def testRequireNoECS(self):
421 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
422
423 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
424 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
425 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
426 self.sendECSQuery(query, expected)
427
428 class testIncomingECSByIP(ECSTest):
429 _confdir = 'ECSIncomingByIP'
430
431 _config_template = """edns-subnet-whitelist=%s.21
432 use-incoming-edns-subnet=yes
433 forward-zones=ecs-echo.example=%s.21
434 ecs-scope-zero-address=::1
435 ecs-ipv4-cache-bits=32
436 ecs-ipv6-cache-bits=128
437 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
438
439 def testSendECS(self):
440 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
441
442 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
443 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
444 self.sendECSQuery(query, expected)
445
446 def testNoECS(self):
447 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
448 query = dns.message.make_query(nameECS, 'TXT')
449 self.sendECSQuery(query, expected)
450
451 def testRequireNoECS(self):
452 # we will get ::1 because ecs-scope-zero-addr is set to ::1
453 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
454
455 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
456 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
457 self.sendECSQuery(query, expected, ttlECS)
458
459 def testSendECSInvalidScope(self):
460 # test that the recursor does not cache with a more specific scope than the source it sent
461 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
462
463 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
464 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
465
466 self.sendECSQuery(query, expected)
467
468 class testECSIPMismatch(ECSTest):
469 _confdir = 'ECSIPMismatch'
470
471 _config_template = """edns-subnet-whitelist=192.0.2.1
472 forward-zones=ecs-echo.example=%s.21
473 """ % (os.environ['PREFIX'])
474
475 def testSendECS(self):
476 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
477 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
478 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
479 self.sendECSQuery(query, expected)
480
481 def testNoECS(self):
482 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
483 query = dns.message.make_query(nameECS, 'TXT')
484 res = self.sendUDPQuery(query)
485 self.sendECSQuery(query, expected)
486
487 def testRequireNoECS(self):
488 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
489
490 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
491 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
492 self.sendECSQuery(query, expected)
493
494 class UDPECSResponder(DatagramProtocol):
495 @staticmethod
496 def ipToStr(option):
497 if option.family == clientsubnetoption.FAMILY_IPV4:
498 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
499 elif option.family == clientsubnetoption.FAMILY_IPV6:
500 ip = socket.inet_ntop(socket.AF_INET6,
501 struct.pack('!QQ',
502 option.ip >> 64,
503 option.ip & (2 ** 64 - 1)))
504 return ip
505
506 def datagramReceived(self, datagram, address):
507 request = dns.message.from_wire(datagram)
508
509 response = dns.message.make_response(request)
510 response.flags |= dns.flags.AA
511 ecso = None
512
513 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
514
515 text = emptyECSText
516 for option in request.options:
517 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
518 text = self.ipToStr(option) + '/' + str(option.mask)
519
520 # Send a scope more specific than the received source for nameECSInvalidScope
521 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
522 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
523 else:
524 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
525
526 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
527 response.answer.append(answer)
528
529 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
530 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
531 response.answer.append(answer)
532 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
533 response.additional.append(additional)
534
535 if ecso:
536 response.options = [ecso]
537
538 self.transport.write(response.to_wire(), address)