]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #8141 from rgacogne/dnsdist-ocsp
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7 import clientsubnetoption
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 emptyECSText = 'No ECS received'
13 nameECS = 'ecs-echo.example.'
14 nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
15 ttlECS = 60
16 ecsReactorRunning = False
17
18 class ECSTest(RecursorTest):
19 _config_template_default = """
20 daemon=no
21 trace=yes
22 dont-query=
23 ecs-add-for=0.0.0.0/0
24 local-address=127.0.0.1
25 packetcache-ttl=0
26 packetcache-servfail-ttl=0
27 max-cache-ttl=600
28 threads=1
29 loglevel=9
30 disable-syslog=yes
31 """
32
33 def sendECSQuery(self, query, expected, expectedFirstTTL=None):
34 res = self.sendUDPQuery(query)
35
36 self.assertRcodeEqual(res, dns.rcode.NOERROR)
37 self.assertRRsetInAnswer(res, expected)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL is not None:
40 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
41 else:
42 expectedFirstTTL = res.answer[0].ttl
43
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
46 time.sleep(1)
47
48 res = self.sendUDPQuery(query)
49
50 self.assertRcodeEqual(res, dns.rcode.NOERROR)
51 self.assertRRsetInAnswer(res, expected)
52 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
53
54 def checkECSQueryHit(self, query, expected):
55 res = self.sendUDPQuery(query)
56
57 self.assertRcodeEqual(res, dns.rcode.NOERROR)
58 self.assertRRsetInAnswer(res, expected)
59 # this will break if you are not looking for the first RR, sorry!
60 self.assertLess(res.answer[0].ttl, ttlECS)
61
62 @classmethod
63 def startResponders(cls):
64 global ecsReactorRunning
65 print("Launching responders..")
66
67 address = cls._PREFIX + '.21'
68 port = 53
69
70 if not ecsReactorRunning:
71 reactor.listenUDP(port, UDPECSResponder(), interface=address)
72 ecsReactorRunning = True
73
74 if not reactor.running:
75 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
76 cls._UDPResponder.setDaemon(True)
77 cls._UDPResponder.start()
78
79 @classmethod
80 def setUpClass(cls):
81 cls.setUpSockets()
82
83 cls.startResponders()
84
85 confdir = os.path.join('configs', cls._confdir)
86 cls.createConfigDir(confdir)
87
88 cls.generateRecursorConfig(confdir)
89 cls.startRecursor(confdir, cls._recursorPort)
90
91 print("Launching tests..")
92
93 @classmethod
94 def tearDownClass(cls):
95 cls.tearDownRecursor()
96
97 class testNoECS(ECSTest):
98 _confdir = 'NoECS'
99
100 _config_template = """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os.environ['PREFIX'])
103
104 def testSendECS(self):
105 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
106 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
107 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
108 self.sendECSQuery(query, expected)
109
110 def testNoECS(self):
111 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
112 query = dns.message.make_query(nameECS, 'TXT')
113 self.sendECSQuery(query, expected)
114
115 def testRequireNoECS(self):
116 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
117
118 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
119 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
120 self.sendECSQuery(query, expected)
121
122 class testIncomingNoECS(ECSTest):
123 _confdir = 'IncomingNoECS'
124
125 _config_template = """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os.environ['PREFIX'])
129
130 def testSendECS(self):
131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
132
133 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
134 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
135 self.sendECSQuery(query, expected)
136
137 def testNoECS(self):
138 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
139
140 query = dns.message.make_query(nameECS, 'TXT')
141 self.sendECSQuery(query, expected)
142
143 def testRequireNoECS(self):
144 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
145
146 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
147 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
148 self.sendECSQuery(query, expected)
149
150 class testECSByName(ECSTest):
151 _confdir = 'ECSByName'
152
153 _config_template = """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os.environ['PREFIX'])
156
157 def testSendECS(self):
158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
159 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
160 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
161 self.sendECSQuery(query, expected)
162
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
165 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
166 self.checkECSQueryHit(query, expected)
167
168 def testNoECS(self):
169 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
170 query = dns.message.make_query(nameECS, 'TXT')
171 self.sendECSQuery(query, expected)
172
173 def testRequireNoECS(self):
174 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
175
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
178 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
179 self.sendECSQuery(query, expected)
180
181 class testECSByNameLarger(ECSTest):
182 _confdir = 'ECSByNameLarger'
183
184 _config_template = """edns-subnet-whitelist=ecs-echo.example.
185 ecs-ipv4-bits=32
186 forward-zones=ecs-echo.example=%s.21
187 ecs-ipv4-cache-bits=32
188 ecs-ipv6-cache-bits=128
189 """ % (os.environ['PREFIX'])
190
191 def testSendECS(self):
192 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
193 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
194 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
195 self.sendECSQuery(query, expected)
196
197 # check that a query in a different range is a miss
198 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
199 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
200 self.sendECSQuery(query, expected)
201
202 def testNoECS(self):
203 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
204 query = dns.message.make_query(nameECS, 'TXT')
205 self.sendECSQuery(query, expected)
206
207 def testRequireNoECS(self):
208 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
209
210 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
211 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
212 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
213 self.sendECSQuery(query, expected)
214
215 class testECSByNameSmaller(ECSTest):
216 _confdir = 'ECSByNameLarger'
217
218 _config_template = """edns-subnet-whitelist=ecs-echo.example.
219 ecs-ipv4-bits=16
220 forward-zones=ecs-echo.example=%s.21
221 """ % (os.environ['PREFIX'])
222
223 def testSendECS(self):
224 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
225 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
226 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
227 self.sendECSQuery(query, expected)
228
229 def testNoECS(self):
230 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
231 query = dns.message.make_query(nameECS, 'TXT')
232 self.sendECSQuery(query, expected)
233
234 def testRequireNoECS(self):
235 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
236
237 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
238 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
239 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
240 self.sendECSQuery(query, expected)
241
242 class testIncomingECSByName(ECSTest):
243 _confdir = 'ECSIncomingByName'
244
245 _config_template = """edns-subnet-whitelist=ecs-echo.example.
246 use-incoming-edns-subnet=yes
247 forward-zones=ecs-echo.example=%s.21
248 ecs-scope-zero-address=2001:db8::42
249 ecs-ipv4-cache-bits=32
250 ecs-ipv6-cache-bits=128
251 """ % (os.environ['PREFIX'])
252
253 def testSendECS(self):
254 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
255 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
256 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
257 self.sendECSQuery(query, expected, ttlECS)
258
259 # check that a query in the same ECS range is a hit
260 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
261 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
262 self.checkECSQueryHit(query, expected)
263
264 # check that a query in a different ECS range is a miss
265 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
266 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
267 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
268 self.sendECSQuery(query, expected)
269
270 def testNoECS(self):
271 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
272 query = dns.message.make_query(nameECS, 'TXT')
273 self.sendECSQuery(query, expected, ttlECS)
274
275 def testRequireNoECS(self):
276 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
277
278 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
279 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
280 self.sendECSQuery(query, expected, ttlECS)
281
282 class testIncomingECSByNameLarger(ECSTest):
283 _confdir = 'ECSIncomingByNameLarger'
284
285 _config_template = """edns-subnet-whitelist=ecs-echo.example.
286 use-incoming-edns-subnet=yes
287 ecs-ipv4-bits=32
288 forward-zones=ecs-echo.example=%s.21
289 ecs-scope-zero-address=192.168.0.1
290 ecs-ipv4-cache-bits=32
291 ecs-ipv6-cache-bits=128
292 """ % (os.environ['PREFIX'])
293
294 def testSendECS(self):
295 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
296
297 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
298 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
299 self.sendECSQuery(query, expected, ttlECS)
300
301 def testNoECS(self):
302 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
303
304 query = dns.message.make_query(nameECS, 'TXT')
305 self.sendECSQuery(query, expected, ttlECS)
306
307 def testRequireNoECS(self):
308 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
309
310 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
311 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
312 self.sendECSQuery(query, expected, ttlECS)
313
314 class testIncomingECSByNameSmaller(ECSTest):
315 _confdir = 'ECSIncomingByNameSmaller'
316
317 _config_template = """edns-subnet-whitelist=ecs-echo.example.
318 use-incoming-edns-subnet=yes
319 ecs-ipv4-bits=16
320 forward-zones=ecs-echo.example=%s.21
321 ecs-scope-zero-address=192.168.0.1
322 ecs-ipv4-cache-bits=32
323 ecs-ipv6-cache-bits=128
324 """ % (os.environ['PREFIX'])
325
326 def testSendECS(self):
327 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
328 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
329 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
330 self.sendECSQuery(query, expected)
331
332 def testNoECS(self):
333 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
334 query = dns.message.make_query(nameECS, 'TXT')
335 self.sendECSQuery(query, expected)
336
337 def testRequireNoECS(self):
338 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
339
340 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
341 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
342 self.sendECSQuery(query, expected, ttlECS)
343
344 class testIncomingECSByNameV6(ECSTest):
345 _confdir = 'ECSIncomingByNameV6'
346
347 _config_template = """edns-subnet-whitelist=ecs-echo.example.
348 use-incoming-edns-subnet=yes
349 ecs-ipv6-bits=128
350 ecs-ipv4-cache-bits=32
351 ecs-ipv6-cache-bits=128
352 forward-zones=ecs-echo.example=%s.21
353 query-local-address6=::1
354 """ % (os.environ['PREFIX'])
355
356 def testSendECS(self):
357 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
358 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
359 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
360 self.sendECSQuery(query, expected, ttlECS)
361
362 def testNoECS(self):
363 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
364
365 query = dns.message.make_query(nameECS, 'TXT')
366 res = self.sendUDPQuery(query)
367 self.sendECSQuery(query, expected, ttlECS)
368
369 def testRequireNoECS(self):
370 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
371 # but query-local-address6 is set to ::1
372 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
373
374 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
375 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
376 self.sendECSQuery(query, expected, ttlECS)
377
378 class testECSNameMismatch(ECSTest):
379 _confdir = 'ECSNameMismatch'
380
381 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
382 forward-zones=ecs-echo.example=%s.21
383 """ % (os.environ['PREFIX'])
384
385 def testSendECS(self):
386 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
387 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
388 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
389 self.sendECSQuery(query, expected)
390
391 def testNoECS(self):
392 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
393 query = dns.message.make_query(nameECS, 'TXT')
394 self.sendECSQuery(query, expected)
395
396 def testRequireNoECS(self):
397 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
398
399 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
400 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
401 self.sendECSQuery(query, expected)
402
403 class testECSByIP(ECSTest):
404 _confdir = 'ECSByIP'
405
406 _config_template = """edns-subnet-whitelist=%s.21
407 forward-zones=ecs-echo.example=%s.21
408 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
409
410 def testSendECS(self):
411 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
412 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
413 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
414 self.sendECSQuery(query, expected)
415
416 def testNoECS(self):
417 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
418 query = dns.message.make_query(nameECS, 'TXT')
419 self.sendECSQuery(query, expected)
420
421 def testRequireNoECS(self):
422 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
423
424 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
425 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
426 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
427 self.sendECSQuery(query, expected)
428
429 class testIncomingECSByIP(ECSTest):
430 _confdir = 'ECSIncomingByIP'
431
432 _config_template = """edns-subnet-whitelist=%s.21
433 use-incoming-edns-subnet=yes
434 forward-zones=ecs-echo.example=%s.21
435 ecs-scope-zero-address=::1
436 ecs-ipv4-cache-bits=32
437 ecs-ipv6-cache-bits=128
438 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
439
440 def testSendECS(self):
441 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
442
443 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
444 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
445 self.sendECSQuery(query, expected)
446
447 def testNoECS(self):
448 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
449 query = dns.message.make_query(nameECS, 'TXT')
450 self.sendECSQuery(query, expected)
451
452 def testRequireNoECS(self):
453 # we will get ::1 because ecs-scope-zero-addr is set to ::1
454 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
455
456 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
457 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
458 self.sendECSQuery(query, expected, ttlECS)
459
460 def testSendECSInvalidScope(self):
461 # test that the recursor does not cache with a more specific scope than the source it sent
462 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
463
464 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
465 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
466
467 self.sendECSQuery(query, expected)
468
469 class testECSIPMismatch(ECSTest):
470 _confdir = 'ECSIPMismatch'
471
472 _config_template = """edns-subnet-whitelist=192.0.2.1
473 forward-zones=ecs-echo.example=%s.21
474 """ % (os.environ['PREFIX'])
475
476 def testSendECS(self):
477 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
478 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
479 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
480 self.sendECSQuery(query, expected)
481
482 def testNoECS(self):
483 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
484 query = dns.message.make_query(nameECS, 'TXT')
485 res = self.sendUDPQuery(query)
486 self.sendECSQuery(query, expected)
487
488 def testRequireNoECS(self):
489 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
490
491 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
492 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
493 self.sendECSQuery(query, expected)
494
495 class UDPECSResponder(DatagramProtocol):
496 @staticmethod
497 def ipToStr(option):
498 if option.family == clientsubnetoption.FAMILY_IPV4:
499 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
500 elif option.family == clientsubnetoption.FAMILY_IPV6:
501 ip = socket.inet_ntop(socket.AF_INET6,
502 struct.pack('!QQ',
503 option.ip >> 64,
504 option.ip & (2 ** 64 - 1)))
505 return ip
506
507 def datagramReceived(self, datagram, address):
508 request = dns.message.from_wire(datagram)
509
510 response = dns.message.make_response(request)
511 response.flags |= dns.flags.AA
512 ecso = None
513
514 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
515
516 text = emptyECSText
517 for option in request.options:
518 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
519 text = self.ipToStr(option) + '/' + str(option.mask)
520
521 # Send a scope more specific than the received source for nameECSInvalidScope
522 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
523 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
524 else:
525 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
526
527 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
528 response.answer.append(answer)
529
530 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
531 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
532 response.answer.append(answer)
533 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
534 response.additional.append(additional)
535
536 if ecso:
537 response.options = [ecso]
538
539 self.transport.write(response.to_wire(), address)