]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #7323 from rgacogne/dnsdist-check-timeout
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 emptyECSText = 'No ECS received'
13 nameECS = 'ecs-echo.example.'
14 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
15 ttlECS = 60
16 ecsReactorRunning = False
17
18 class ECSTest(RecursorTest):
19 _config_template_default = """
20 daemon=no
21 trace=yes
22 dont-query=
23 ecs-add-for=0.0.0.0/0
24 local-address=127.0.0.1
25 packetcache-ttl=0
26 packetcache-servfail-ttl=0
27 max-cache-ttl=600
28 threads=1
29 loglevel=9
30 disable-syslog=yes
31 """
32
33 def sendECSQuery(self, query, expected, expectedFirstTTL=None):
34 res = self.sendUDPQuery(query)
35
36 self.assertRcodeEqual(res, dns.rcode.NOERROR)
37 self.assertRRsetInAnswer(res, expected)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL is not None:
40 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
41 else:
42 expectedFirstTTL = res.answer[0].ttl
43
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
46 time.sleep(1)
47
48 res = self.sendUDPQuery(query)
49
50 self.assertRcodeEqual(res, dns.rcode.NOERROR)
51 self.assertRRsetInAnswer(res, expected)
52 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
53
54 def checkECSQueryHit(self, query, expected):
55 res = self.sendUDPQuery(query)
56
57 self.assertRcodeEqual(res, dns.rcode.NOERROR)
58 self.assertRRsetInAnswer(res, expected)
59 # this will break if you are not looking for the first RR, sorry!
60 self.assertLess(res.answer[0].ttl, ttlECS)
61
62 @classmethod
63 def startResponders(cls):
64 global ecsReactorRunning
65 print("Launching responders..")
66
67 address = cls._PREFIX + '.21'
68 port = 53
69
70 if not ecsReactorRunning:
71 reactor.listenUDP(port, UDPECSResponder(), interface=address)
72 ecsReactorRunning = True
73
74 if not reactor.running:
75 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
76 cls._UDPResponder.setDaemon(True)
77 cls._UDPResponder.start()
78
79 @classmethod
80 def setUpClass(cls):
81 cls.setUpSockets()
82
83 cls.startResponders()
84
85 confdir = os.path.join('configs', cls._confdir)
86 cls.createConfigDir(confdir)
87
88 cls.generateRecursorConfig(confdir)
89 cls.startRecursor(confdir, cls._recursorPort)
90
91 print("Launching tests..")
92
93 @classmethod
94 def tearDownClass(cls):
95 cls.tearDownRecursor()
96
97 class testNoECS(ECSTest):
98 _confdir = 'NoECS'
99
100 _config_template = """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os.environ['PREFIX'])
103
104 def testSendECS(self):
105 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
106 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
107 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
108 self.sendECSQuery(query, expected)
109
110 def testNoECS(self):
111 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
112 query = dns.message.make_query(nameECS, 'TXT')
113 self.sendECSQuery(query, expected)
114
115 def testRequireNoECS(self):
116 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
117
118 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
119 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
120 self.sendECSQuery(query, expected)
121
122 class testIncomingNoECS(ECSTest):
123 _confdir = 'IncomingNoECS'
124
125 _config_template = """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os.environ['PREFIX'])
129
130 def testSendECS(self):
131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
132
133 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
134 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
135 self.sendECSQuery(query, expected)
136
137 def testNoECS(self):
138 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
139
140 query = dns.message.make_query(nameECS, 'TXT')
141 self.sendECSQuery(query, expected)
142
143 def testRequireNoECS(self):
144 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
145
146 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
147 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
148 self.sendECSQuery(query, expected)
149
150 class testECSByName(ECSTest):
151 _confdir = 'ECSByName'
152
153 _config_template = """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os.environ['PREFIX'])
156
157 def testSendECS(self):
158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
159 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
160 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
161 self.sendECSQuery(query, expected)
162
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
165 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
166 self.checkECSQueryHit(query, expected)
167
168 def testNoECS(self):
169 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
170 query = dns.message.make_query(nameECS, 'TXT')
171 self.sendECSQuery(query, expected)
172
173 def testRequireNoECS(self):
174 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
175
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
178 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
179 self.sendECSQuery(query, expected)
180
181 class testECSByNameLarger(ECSTest):
182 _confdir = 'ECSByNameLarger'
183
184 _config_template = """edns-subnet-whitelist=ecs-echo.example.
185 ecs-ipv4-bits=32
186 forward-zones=ecs-echo.example=%s.21
187 """ % (os.environ['PREFIX'])
188
189 def testSendECS(self):
190 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
191 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
192 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
193 self.sendECSQuery(query, expected)
194
195 # check that a query in a different range is a miss
196 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
197 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
198 self.sendECSQuery(query, expected)
199
200 def testNoECS(self):
201 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
202 query = dns.message.make_query(nameECS, 'TXT')
203 self.sendECSQuery(query, expected)
204
205 def testRequireNoECS(self):
206 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
207
208 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
209 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
210 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
211 self.sendECSQuery(query, expected)
212
213 class testECSByNameSmaller(ECSTest):
214 _confdir = 'ECSByNameLarger'
215
216 _config_template = """edns-subnet-whitelist=ecs-echo.example.
217 ecs-ipv4-bits=16
218 forward-zones=ecs-echo.example=%s.21
219 """ % (os.environ['PREFIX'])
220
221 def testSendECS(self):
222 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
223 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
224 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
225 self.sendECSQuery(query, expected)
226
227 def testNoECS(self):
228 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
229 query = dns.message.make_query(nameECS, 'TXT')
230 self.sendECSQuery(query, expected)
231
232 def testRequireNoECS(self):
233 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
234
235 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
236 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
237 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
238 self.sendECSQuery(query, expected)
239
240 class testIncomingECSByName(ECSTest):
241 _confdir = 'ECSIncomingByName'
242
243 _config_template = """edns-subnet-whitelist=ecs-echo.example.
244 use-incoming-edns-subnet=yes
245 forward-zones=ecs-echo.example=%s.21
246 ecs-scope-zero-address=2001:db8::42
247 """ % (os.environ['PREFIX'])
248
249 def testSendECS(self):
250 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
251 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
252 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
253 self.sendECSQuery(query, expected, ttlECS)
254
255 # check that a query in the same ECS range is a hit
256 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
257 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
258 self.checkECSQueryHit(query, expected)
259
260 # check that a query in a different ECS range is a miss
261 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
262 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
263 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
264 self.sendECSQuery(query, expected)
265
266 def testNoECS(self):
267 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
268 query = dns.message.make_query(nameECS, 'TXT')
269 self.sendECSQuery(query, expected, ttlECS)
270
271 def testRequireNoECS(self):
272 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
273
274 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
275 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
276 self.sendECSQuery(query, expected, ttlECS)
277
278 class testIncomingECSByNameLarger(ECSTest):
279 _confdir = 'ECSIncomingByNameLarger'
280
281 _config_template = """edns-subnet-whitelist=ecs-echo.example.
282 use-incoming-edns-subnet=yes
283 ecs-ipv4-bits=32
284 forward-zones=ecs-echo.example=%s.21
285 ecs-scope-zero-address=192.168.0.1
286 """ % (os.environ['PREFIX'])
287
288 def testSendECS(self):
289 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
290
291 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
292 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
293 self.sendECSQuery(query, expected, ttlECS)
294
295 def testNoECS(self):
296 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
297
298 query = dns.message.make_query(nameECS, 'TXT')
299 self.sendECSQuery(query, expected, ttlECS)
300
301 def testRequireNoECS(self):
302 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
303
304 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
305 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
306 self.sendECSQuery(query, expected, ttlECS)
307
308 class testIncomingECSByNameSmaller(ECSTest):
309 _confdir = 'ECSIncomingByNameSmaller'
310
311 _config_template = """edns-subnet-whitelist=ecs-echo.example.
312 use-incoming-edns-subnet=yes
313 ecs-ipv4-bits=16
314 forward-zones=ecs-echo.example=%s.21
315 ecs-scope-zero-address=192.168.0.1
316 """ % (os.environ['PREFIX'])
317
318 def testSendECS(self):
319 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
320 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
321 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
322 self.sendECSQuery(query, expected)
323
324 def testNoECS(self):
325 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
326 query = dns.message.make_query(nameECS, 'TXT')
327 self.sendECSQuery(query, expected)
328
329 def testRequireNoECS(self):
330 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
331
332 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
333 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
334 self.sendECSQuery(query, expected, ttlECS)
335
336 class testIncomingECSByNameV6(ECSTest):
337 _confdir = 'ECSIncomingByNameV6'
338
339 _config_template = """edns-subnet-whitelist=ecs-echo.example.
340 use-incoming-edns-subnet=yes
341 ecs-ipv6-bits=128
342 forward-zones=ecs-echo.example=%s.21
343 query-local-address6=::1
344 """ % (os.environ['PREFIX'])
345
346 def testSendECS(self):
347 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
348 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
349 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
350 self.sendECSQuery(query, expected, ttlECS)
351
352 def testNoECS(self):
353 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
354
355 query = dns.message.make_query(nameECS, 'TXT')
356 res = self.sendUDPQuery(query)
357 self.sendECSQuery(query, expected, ttlECS)
358
359 def testRequireNoECS(self):
360 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
361 # but query-local-address6 is set to ::1
362 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
363
364 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
365 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
366 self.sendECSQuery(query, expected, ttlECS)
367
368 class testECSNameMismatch(ECSTest):
369 _confdir = 'ECSNameMismatch'
370
371 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
372 forward-zones=ecs-echo.example=%s.21
373 """ % (os.environ['PREFIX'])
374
375 def testSendECS(self):
376 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
377 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
378 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
379 self.sendECSQuery(query, expected)
380
381 def testNoECS(self):
382 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
383 query = dns.message.make_query(nameECS, 'TXT')
384 self.sendECSQuery(query, expected)
385
386 def testRequireNoECS(self):
387 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
388
389 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
390 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
391 self.sendECSQuery(query, expected)
392
393 class testECSByIP(ECSTest):
394 _confdir = 'ECSByIP'
395
396 _config_template = """edns-subnet-whitelist=%s.21
397 forward-zones=ecs-echo.example=%s.21
398 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
399
400 def testSendECS(self):
401 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
402 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
403 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
404 self.sendECSQuery(query, expected)
405
406 def testNoECS(self):
407 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
408 query = dns.message.make_query(nameECS, 'TXT')
409 self.sendECSQuery(query, expected)
410
411 def testRequireNoECS(self):
412 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
413
414 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
415 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
416 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
417 self.sendECSQuery(query, expected)
418
419 class testIncomingECSByIP(ECSTest):
420 _confdir = 'ECSIncomingByIP'
421
422 _config_template = """edns-subnet-whitelist=%s.21
423 use-incoming-edns-subnet=yes
424 forward-zones=ecs-echo.example=%s.21
425 ecs-scope-zero-address=::1
426 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
427
428 def testSendECS(self):
429 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
430
431 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
432 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
433 self.sendECSQuery(query, expected)
434
435 def testNoECS(self):
436 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
437 query = dns.message.make_query(nameECS, 'TXT')
438 self.sendECSQuery(query, expected)
439
440 def testRequireNoECS(self):
441 # we will get ::1 because ecs-scope-zero-addr is set to ::1
442 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
443
444 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
445 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
446 self.sendECSQuery(query, expected, ttlECS)
447
448 def testSendECSInvalidScope(self):
449 # test that the recursor does not cache with a more specific scope than the source it sent
450 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
451
452 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
453 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
454
455 self.sendECSQuery(query, expected)
456
457 class testECSIPMismatch(ECSTest):
458 _confdir = 'ECSIPMismatch'
459
460 _config_template = """edns-subnet-whitelist=192.0.2.1
461 forward-zones=ecs-echo.example=%s.21
462 """ % (os.environ['PREFIX'])
463
464 def testSendECS(self):
465 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
466 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
467 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
468 self.sendECSQuery(query, expected)
469
470 def testNoECS(self):
471 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
472 query = dns.message.make_query(nameECS, 'TXT')
473 res = self.sendUDPQuery(query)
474 self.sendECSQuery(query, expected)
475
476 def testRequireNoECS(self):
477 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
478
479 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
480 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
481 self.sendECSQuery(query, expected)
482
483 class UDPECSResponder(DatagramProtocol):
484 @staticmethod
485 def ipToStr(option):
486 if option.family == clientsubnetoption.FAMILY_IPV4:
487 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
488 elif option.family == clientsubnetoption.FAMILY_IPV6:
489 ip = socket.inet_ntop(socket.AF_INET6,
490 struct.pack('!QQ',
491 option.ip >> 64,
492 option.ip & (2 ** 64 - 1)))
493 return ip
494
495 def datagramReceived(self, datagram, address):
496 request = dns.message.from_wire(datagram)
497
498 response = dns.message.make_response(request)
499 response.flags |= dns.flags.AA
500 ecso = None
501
502 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
503
504 text = emptyECSText
505 for option in request.options:
506 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
507 text = self.ipToStr(option) + '/' + str(option.mask)
508
509 # Send a scope more specific than the received source for nameECSInvalidScope
510 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
511 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
512 else:
513 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
514
515 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
516 response.answer.append(answer)
517
518 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
519 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
520 response.answer.append(answer)
521 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
522 response.additional.append(additional)
523
524 if ecso:
525 response.options = [ecso]
526
527 self.transport.write(response.to_wire(), address)