]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #6127 from cmouse/fix-deps
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 emptyECSText = 'No ECS received'
13 nameECS = 'ecs-echo.example.'
14 ttlECS = 60
15 ecsReactorRunning = False
16
17 class ECSTest(RecursorTest):
18 _config_template_default = """
19 daemon=no
20 trace=yes
21 dont-query=
22 local-address=127.0.0.1
23 packetcache-ttl=0
24 packetcache-servfail-ttl=0
25 max-cache-ttl=600
26 threads=1
27 loglevel=9
28 disable-syslog=yes
29 """
30
31 def sendECSQuery(self, query, expected, expectedFirstTTL=None):
32 res = self.sendUDPQuery(query)
33
34 self.assertRcodeEqual(res, dns.rcode.NOERROR)
35 self.assertRRsetInAnswer(res, expected)
36 # this will break if you are not looking for the first RR, sorry!
37 if expectedFirstTTL is not None:
38 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
39 else:
40 expectedFirstTTL = res.answer[0].ttl
41
42 # wait one second, check that the TTL has been
43 # decreased indicating a cache hit
44 time.sleep(1)
45
46 res = self.sendUDPQuery(query)
47
48 self.assertRcodeEqual(res, dns.rcode.NOERROR)
49 self.assertRRsetInAnswer(res, expected)
50 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
51
52 def checkECSQueryHit(self, query, expected):
53 res = self.sendUDPQuery(query)
54
55 self.assertRcodeEqual(res, dns.rcode.NOERROR)
56 self.assertRRsetInAnswer(res, expected)
57 # this will break if you are not looking for the first RR, sorry!
58 self.assertLess(res.answer[0].ttl, ttlECS)
59
60 @classmethod
61 def startResponders(cls):
62 global ecsReactorRunning
63 print("Launching responders..")
64
65 address = cls._PREFIX + '.21'
66 port = 53
67
68 if not ecsReactorRunning:
69 reactor.listenUDP(port, UDPECSResponder(), interface=address)
70 ecsReactorRunning = True
71
72 if not reactor.running:
73 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
74 cls._UDPResponder.setDaemon(True)
75 cls._UDPResponder.start()
76
77 @classmethod
78 def setUpClass(cls):
79 cls.setUpSockets()
80
81 cls.startResponders()
82
83 confdir = os.path.join('configs', cls._confdir)
84 cls.createConfigDir(confdir)
85
86 cls.generateRecursorConfig(confdir)
87 cls.startRecursor(confdir, cls._recursorPort)
88
89 print("Launching tests..")
90
91 @classmethod
92 def tearDownClass(cls):
93 cls.tearDownRecursor()
94
95 class testNoECS(ECSTest):
96 _confdir = 'NoECS'
97
98 _config_template = """edns-subnet-whitelist=
99 forward-zones=ecs-echo.example=%s.21
100 """ % (os.environ['PREFIX'])
101
102 def testSendECS(self):
103 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
104 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
105 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
106 self.sendECSQuery(query, expected)
107
108 def testNoECS(self):
109 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
110 query = dns.message.make_query(nameECS, 'TXT')
111 self.sendECSQuery(query, expected)
112
113 def testRequireNoECS(self):
114 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
115
116 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
117 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
118 self.sendECSQuery(query, expected)
119
120 class testIncomingNoECS(ECSTest):
121 _confdir = 'IncomingNoECS'
122
123 _config_template = """edns-subnet-whitelist=
124 use-incoming-edns-subnet=yes
125 forward-zones=ecs-echo.example=%s.21
126 """ % (os.environ['PREFIX'])
127
128 def testSendECS(self):
129 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
130
131 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
132 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
133 self.sendECSQuery(query, expected)
134
135 def testNoECS(self):
136 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
137
138 query = dns.message.make_query(nameECS, 'TXT')
139 self.sendECSQuery(query, expected)
140
141 def testRequireNoECS(self):
142 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
143
144 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
145 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
146 self.sendECSQuery(query, expected)
147
148 class testECSByName(ECSTest):
149 _confdir = 'ECSByName'
150
151 _config_template = """edns-subnet-whitelist=ecs-echo.example.
152 forward-zones=ecs-echo.example=%s.21
153 """ % (os.environ['PREFIX'])
154
155 def testSendECS(self):
156 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
157 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
158 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
159 self.sendECSQuery(query, expected)
160
161 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
162 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
163 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
164 self.checkECSQueryHit(query, expected)
165
166 def testNoECS(self):
167 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
168 query = dns.message.make_query(nameECS, 'TXT')
169 self.sendECSQuery(query, expected)
170
171 def testRequireNoECS(self):
172 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
173
174 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
175 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
176 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
177 self.sendECSQuery(query, expected)
178
179 class testECSByNameLarger(ECSTest):
180 _confdir = 'ECSByNameLarger'
181
182 _config_template = """edns-subnet-whitelist=ecs-echo.example.
183 ecs-ipv4-bits=32
184 forward-zones=ecs-echo.example=%s.21
185 """ % (os.environ['PREFIX'])
186
187 def testSendECS(self):
188 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
189 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
190 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
191 self.sendECSQuery(query, expected)
192
193 # check that a query in a different range is a miss
194 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
195 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
196 self.sendECSQuery(query, expected)
197
198 def testNoECS(self):
199 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
200 query = dns.message.make_query(nameECS, 'TXT')
201 self.sendECSQuery(query, expected)
202
203 def testRequireNoECS(self):
204 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
205
206 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
207 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
208 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
209 self.sendECSQuery(query, expected)
210
211 class testECSByNameSmaller(ECSTest):
212 _confdir = 'ECSByNameLarger'
213
214 _config_template = """edns-subnet-whitelist=ecs-echo.example.
215 ecs-ipv4-bits=16
216 forward-zones=ecs-echo.example=%s.21
217 """ % (os.environ['PREFIX'])
218
219 def testSendECS(self):
220 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
221 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
222 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
223 self.sendECSQuery(query, expected)
224
225 def testNoECS(self):
226 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
227 query = dns.message.make_query(nameECS, 'TXT')
228 self.sendECSQuery(query, expected)
229
230 def testRequireNoECS(self):
231 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
232
233 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
234 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
235 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
236 self.sendECSQuery(query, expected)
237
238 class testIncomingECSByName(ECSTest):
239 _confdir = 'ECSIncomingByName'
240
241 _config_template = """edns-subnet-whitelist=ecs-echo.example.
242 use-incoming-edns-subnet=yes
243 forward-zones=ecs-echo.example=%s.21
244 ecs-scope-zero-address=2001:db8::42
245 """ % (os.environ['PREFIX'])
246
247 def testSendECS(self):
248 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
249 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
250 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
251 self.sendECSQuery(query, expected, ttlECS)
252
253 # check that a query in the same ECS range is a hit
254 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
255 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
256 self.checkECSQueryHit(query, expected)
257
258 # check that a query in a different ECS range is a miss
259 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
260 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
261 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
262 self.sendECSQuery(query, expected)
263
264 def testNoECS(self):
265 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
266 query = dns.message.make_query(nameECS, 'TXT')
267 self.sendECSQuery(query, expected, ttlECS)
268
269 def testRequireNoECS(self):
270 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
271
272 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
273 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
274 self.sendECSQuery(query, expected, ttlECS)
275
276 class testIncomingECSByNameLarger(ECSTest):
277 _confdir = 'ECSIncomingByNameLarger'
278
279 _config_template = """edns-subnet-whitelist=ecs-echo.example.
280 use-incoming-edns-subnet=yes
281 ecs-ipv4-bits=32
282 forward-zones=ecs-echo.example=%s.21
283 ecs-scope-zero-address=192.168.0.1
284 """ % (os.environ['PREFIX'])
285
286 def testSendECS(self):
287 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
288
289 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
290 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
291 self.sendECSQuery(query, expected, ttlECS)
292
293 def testNoECS(self):
294 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
295
296 query = dns.message.make_query(nameECS, 'TXT')
297 self.sendECSQuery(query, expected, ttlECS)
298
299 def testRequireNoECS(self):
300 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
301
302 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
303 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
304 self.sendECSQuery(query, expected, ttlECS)
305
306 class testIncomingECSByNameSmaller(ECSTest):
307 _confdir = 'ECSIncomingByNameSmaller'
308
309 _config_template = """edns-subnet-whitelist=ecs-echo.example.
310 use-incoming-edns-subnet=yes
311 ecs-ipv4-bits=16
312 forward-zones=ecs-echo.example=%s.21
313 ecs-scope-zero-address=192.168.0.1
314 """ % (os.environ['PREFIX'])
315
316 def testSendECS(self):
317 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
318 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
319 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
320 self.sendECSQuery(query, expected)
321
322 def testNoECS(self):
323 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
324 query = dns.message.make_query(nameECS, 'TXT')
325 self.sendECSQuery(query, expected)
326
327 def testRequireNoECS(self):
328 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
329
330 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
331 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
332 self.sendECSQuery(query, expected, ttlECS)
333
334 class testIncomingECSByNameV6(ECSTest):
335 _confdir = 'ECSIncomingByNameV6'
336
337 _config_template = """edns-subnet-whitelist=ecs-echo.example.
338 use-incoming-edns-subnet=yes
339 ecs-ipv6-bits=128
340 forward-zones=ecs-echo.example=%s.21
341 query-local-address6=::1
342 """ % (os.environ['PREFIX'])
343
344 def testSendECS(self):
345 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
346 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
347 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
348 self.sendECSQuery(query, expected, ttlECS)
349
350 def testNoECS(self):
351 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
352
353 query = dns.message.make_query(nameECS, 'TXT')
354 res = self.sendUDPQuery(query)
355 self.sendECSQuery(query, expected, ttlECS)
356
357 def testRequireNoECS(self):
358 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
359 # but query-local-address6 is set to ::1
360 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
361
362 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
363 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
364 self.sendECSQuery(query, expected, ttlECS)
365
366 class testECSNameMismatch(ECSTest):
367 _confdir = 'ECSNameMismatch'
368
369 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
370 forward-zones=ecs-echo.example=%s.21
371 """ % (os.environ['PREFIX'])
372
373 def testSendECS(self):
374 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
375 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
376 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
377 self.sendECSQuery(query, expected)
378
379 def testNoECS(self):
380 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
381 query = dns.message.make_query(nameECS, 'TXT')
382 self.sendECSQuery(query, expected)
383
384 def testRequireNoECS(self):
385 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
386
387 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
388 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
389 self.sendECSQuery(query, expected)
390
391 class testECSByIP(ECSTest):
392 _confdir = 'ECSByIP'
393
394 _config_template = """edns-subnet-whitelist=%s.21
395 forward-zones=ecs-echo.example=%s.21
396 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
397
398 def testSendECS(self):
399 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
400 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
401 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
402 self.sendECSQuery(query, expected)
403
404 def testNoECS(self):
405 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
406 query = dns.message.make_query(nameECS, 'TXT')
407 self.sendECSQuery(query, expected)
408
409 def testRequireNoECS(self):
410 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
411
412 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
413 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
414 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
415 self.sendECSQuery(query, expected)
416
417 class testIncomingECSByIP(ECSTest):
418 _confdir = 'ECSIncomingByIP'
419
420 _config_template = """edns-subnet-whitelist=%s.21
421 use-incoming-edns-subnet=yes
422 forward-zones=ecs-echo.example=%s.21
423 ecs-scope-zero-address=::1
424 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
425
426 def testSendECS(self):
427 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
428
429 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
430 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
431 self.sendECSQuery(query, expected)
432
433 def testNoECS(self):
434 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
435 query = dns.message.make_query(nameECS, 'TXT')
436 self.sendECSQuery(query, expected)
437
438 def testRequireNoECS(self):
439 # we will get ::1 because ecs-scope-zero-addr is set to ::1
440 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
441
442 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
443 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
444 self.sendECSQuery(query, expected, ttlECS)
445
446 class testECSIPMismatch(ECSTest):
447 _confdir = 'ECSIPMismatch'
448
449 _config_template = """edns-subnet-whitelist=192.0.2.1
450 forward-zones=ecs-echo.example=%s.21
451 """ % (os.environ['PREFIX'])
452
453 def testSendECS(self):
454 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
455 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
456 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
457 self.sendECSQuery(query, expected)
458
459 def testNoECS(self):
460 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
461 query = dns.message.make_query(nameECS, 'TXT')
462 res = self.sendUDPQuery(query)
463 self.sendECSQuery(query, expected)
464
465 def testRequireNoECS(self):
466 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
467
468 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
469 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
470 self.sendECSQuery(query, expected)
471
472 class UDPECSResponder(DatagramProtocol):
473 @staticmethod
474 def ipToStr(option):
475 if option.family == clientsubnetoption.FAMILY_IPV4:
476 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
477 elif option.family == clientsubnetoption.FAMILY_IPV6:
478 ip = socket.inet_ntop(socket.AF_INET6,
479 struct.pack('!QQ',
480 option.ip >> 64,
481 option.ip & (2 ** 64 - 1)))
482 return ip
483
484 def datagramReceived(self, datagram, address):
485 request = dns.message.from_wire(datagram)
486
487 response = dns.message.make_response(request)
488 response.flags |= dns.flags.AA
489 ecso = None
490
491 if request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.TXT:
492 text = emptyECSText
493 for option in request.options:
494 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
495 text = self.ipToStr(option) + '/' + str(option.mask)
496 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
497
498 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', text)
499 response.answer.append(answer)
500 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
501 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
502 response.answer.append(answer)
503 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', cls._PREFIX + '.21')
504 response.additional.append(additional)
505
506 if ecso:
507 response.options = [ecso]
508
509 self.transport.write(response.to_wire(), address)