]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
7 import clientsubnetoption
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 emptyECSText
= 'No ECS received'
13 nameECS
= 'ecs-echo.example.'
14 nameECSInvalidScope
= 'invalid-scope.ecs-echo.example.'
16 ecsReactorRunning
= False
18 class ECSTest(RecursorTest
):
19 _config_template_default
= """
24 local-address=127.0.0.1
26 packetcache-servfail-ttl=0
33 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None):
34 res
= self
.sendUDPQuery(query
)
36 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
37 self
.assertRRsetInAnswer(res
, expected
)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL
is not None:
40 self
.assertEqual(res
.answer
[0].ttl
, expectedFirstTTL
)
42 expectedFirstTTL
= res
.answer
[0].ttl
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
48 res
= self
.sendUDPQuery(query
)
50 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
51 self
.assertRRsetInAnswer(res
, expected
)
52 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
54 def checkECSQueryHit(self
, query
, expected
):
55 res
= self
.sendUDPQuery(query
)
57 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
58 self
.assertRRsetInAnswer(res
, expected
)
59 # this will break if you are not looking for the first RR, sorry!
60 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
63 def startResponders(cls
):
64 global ecsReactorRunning
65 print("Launching responders..")
67 address
= cls
._PREFIX
+ '.21'
70 if not ecsReactorRunning
:
71 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
72 ecsReactorRunning
= True
74 if not reactor
.running
:
75 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
76 cls
._UDPResponder
.setDaemon(True)
77 cls
._UDPResponder
.start()
85 confdir
= os
.path
.join('configs', cls
._confdir
)
86 cls
.createConfigDir(confdir
)
88 cls
.generateRecursorConfig(confdir
)
89 cls
.startRecursor(confdir
, cls
._recursorPort
)
91 print("Launching tests..")
94 def tearDownClass(cls
):
95 cls
.tearDownRecursor()
97 class testNoECS(ECSTest
):
100 _config_template
= """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os
.environ
['PREFIX'])
104 def testSendECS(self
):
105 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
106 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
107 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
108 self
.sendECSQuery(query
, expected
)
111 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
112 query
= dns
.message
.make_query(nameECS
, 'TXT')
113 self
.sendECSQuery(query
, expected
)
115 def testRequireNoECS(self
):
116 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
118 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
119 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
120 self
.sendECSQuery(query
, expected
)
122 class testIncomingNoECS(ECSTest
):
123 _confdir
= 'IncomingNoECS'
125 _config_template
= """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os
.environ
['PREFIX'])
130 def testSendECS(self
):
131 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
133 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
134 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
135 self
.sendECSQuery(query
, expected
)
138 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
140 query
= dns
.message
.make_query(nameECS
, 'TXT')
141 self
.sendECSQuery(query
, expected
)
143 def testRequireNoECS(self
):
144 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
146 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
147 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
148 self
.sendECSQuery(query
, expected
)
150 class testECSByName(ECSTest
):
151 _confdir
= 'ECSByName'
153 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os
.environ
['PREFIX'])
157 def testSendECS(self
):
158 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
159 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
160 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
161 self
.sendECSQuery(query
, expected
)
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
165 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
166 self
.checkECSQueryHit(query
, expected
)
169 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
170 query
= dns
.message
.make_query(nameECS
, 'TXT')
171 self
.sendECSQuery(query
, expected
)
173 def testRequireNoECS(self
):
174 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
178 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
179 self
.sendECSQuery(query
, expected
)
181 class testECSByNameLarger(ECSTest
):
182 _confdir
= 'ECSByNameLarger'
184 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
186 forward-zones=ecs-echo.example=%s.21
187 """ % (os
.environ
['PREFIX'])
189 def testSendECS(self
):
190 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
191 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
192 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
193 self
.sendECSQuery(query
, expected
)
195 # check that a query in a different range is a miss
196 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
197 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
198 self
.sendECSQuery(query
, expected
)
201 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
202 query
= dns
.message
.make_query(nameECS
, 'TXT')
203 self
.sendECSQuery(query
, expected
)
205 def testRequireNoECS(self
):
206 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
208 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
209 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
210 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
211 self
.sendECSQuery(query
, expected
)
213 class testECSByNameSmaller(ECSTest
):
214 _confdir
= 'ECSByNameLarger'
216 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
218 forward-zones=ecs-echo.example=%s.21
219 """ % (os
.environ
['PREFIX'])
221 def testSendECS(self
):
222 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
223 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
224 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
225 self
.sendECSQuery(query
, expected
)
228 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
229 query
= dns
.message
.make_query(nameECS
, 'TXT')
230 self
.sendECSQuery(query
, expected
)
232 def testRequireNoECS(self
):
233 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
235 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
236 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
237 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
238 self
.sendECSQuery(query
, expected
)
240 class testIncomingECSByName(ECSTest
):
241 _confdir
= 'ECSIncomingByName'
243 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
244 use-incoming-edns-subnet=yes
245 forward-zones=ecs-echo.example=%s.21
246 ecs-scope-zero-address=2001:db8::42
247 """ % (os
.environ
['PREFIX'])
249 def testSendECS(self
):
250 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
251 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
252 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
253 self
.sendECSQuery(query
, expected
, ttlECS
)
255 # check that a query in the same ECS range is a hit
256 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
257 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
258 self
.checkECSQueryHit(query
, expected
)
260 # check that a query in a different ECS range is a miss
261 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
262 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
263 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
264 self
.sendECSQuery(query
, expected
)
267 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
268 query
= dns
.message
.make_query(nameECS
, 'TXT')
269 self
.sendECSQuery(query
, expected
, ttlECS
)
271 def testRequireNoECS(self
):
272 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
274 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
275 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
276 self
.sendECSQuery(query
, expected
, ttlECS
)
278 class testIncomingECSByNameLarger(ECSTest
):
279 _confdir
= 'ECSIncomingByNameLarger'
281 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
282 use-incoming-edns-subnet=yes
284 forward-zones=ecs-echo.example=%s.21
285 ecs-scope-zero-address=192.168.0.1
286 """ % (os
.environ
['PREFIX'])
288 def testSendECS(self
):
289 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
291 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
292 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
293 self
.sendECSQuery(query
, expected
, ttlECS
)
296 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
298 query
= dns
.message
.make_query(nameECS
, 'TXT')
299 self
.sendECSQuery(query
, expected
, ttlECS
)
301 def testRequireNoECS(self
):
302 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
304 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
305 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
306 self
.sendECSQuery(query
, expected
, ttlECS
)
308 class testIncomingECSByNameSmaller(ECSTest
):
309 _confdir
= 'ECSIncomingByNameSmaller'
311 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
312 use-incoming-edns-subnet=yes
314 forward-zones=ecs-echo.example=%s.21
315 ecs-scope-zero-address=192.168.0.1
316 """ % (os
.environ
['PREFIX'])
318 def testSendECS(self
):
319 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
320 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
321 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
322 self
.sendECSQuery(query
, expected
)
325 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
326 query
= dns
.message
.make_query(nameECS
, 'TXT')
327 self
.sendECSQuery(query
, expected
)
329 def testRequireNoECS(self
):
330 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
332 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
333 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
334 self
.sendECSQuery(query
, expected
, ttlECS
)
336 class testIncomingECSByNameV6(ECSTest
):
337 _confdir
= 'ECSIncomingByNameV6'
339 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
340 use-incoming-edns-subnet=yes
342 forward-zones=ecs-echo.example=%s.21
343 query-local-address6=::1
344 """ % (os
.environ
['PREFIX'])
346 def testSendECS(self
):
347 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
348 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
349 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
350 self
.sendECSQuery(query
, expected
, ttlECS
)
353 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
355 query
= dns
.message
.make_query(nameECS
, 'TXT')
356 res
= self
.sendUDPQuery(query
)
357 self
.sendECSQuery(query
, expected
, ttlECS
)
359 def testRequireNoECS(self
):
360 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
361 # but query-local-address6 is set to ::1
362 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
364 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
365 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
366 self
.sendECSQuery(query
, expected
, ttlECS
)
368 class testECSNameMismatch(ECSTest
):
369 _confdir
= 'ECSNameMismatch'
371 _config_template
= """edns-subnet-whitelist=not-the-right-name.example.
372 forward-zones=ecs-echo.example=%s.21
373 """ % (os
.environ
['PREFIX'])
375 def testSendECS(self
):
376 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
377 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
378 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
379 self
.sendECSQuery(query
, expected
)
382 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
383 query
= dns
.message
.make_query(nameECS
, 'TXT')
384 self
.sendECSQuery(query
, expected
)
386 def testRequireNoECS(self
):
387 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
389 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
390 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
391 self
.sendECSQuery(query
, expected
)
393 class testECSByIP(ECSTest
):
396 _config_template
= """edns-subnet-whitelist=%s.21
397 forward-zones=ecs-echo.example=%s.21
398 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
400 def testSendECS(self
):
401 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
402 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
403 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
404 self
.sendECSQuery(query
, expected
)
407 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
408 query
= dns
.message
.make_query(nameECS
, 'TXT')
409 self
.sendECSQuery(query
, expected
)
411 def testRequireNoECS(self
):
412 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
414 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
415 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
416 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
417 self
.sendECSQuery(query
, expected
)
419 class testIncomingECSByIP(ECSTest
):
420 _confdir
= 'ECSIncomingByIP'
422 _config_template
= """edns-subnet-whitelist=%s.21
423 use-incoming-edns-subnet=yes
424 forward-zones=ecs-echo.example=%s.21
425 ecs-scope-zero-address=::1
426 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
428 def testSendECS(self
):
429 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
431 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
432 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
433 self
.sendECSQuery(query
, expected
)
436 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
437 query
= dns
.message
.make_query(nameECS
, 'TXT')
438 self
.sendECSQuery(query
, expected
)
440 def testRequireNoECS(self
):
441 # we will get ::1 because ecs-scope-zero-addr is set to ::1
442 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
444 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
445 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
446 self
.sendECSQuery(query
, expected
, ttlECS
)
448 def testSendECSInvalidScope(self
):
449 # test that the recursor does not cache with a more specific scope than the source it sent
450 expected
= dns
.rrset
.from_text(nameECSInvalidScope
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
452 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
453 query
= dns
.message
.make_query(nameECSInvalidScope
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
455 self
.sendECSQuery(query
, expected
)
457 class testECSIPMismatch(ECSTest
):
458 _confdir
= 'ECSIPMismatch'
460 _config_template
= """edns-subnet-whitelist=192.0.2.1
461 forward-zones=ecs-echo.example=%s.21
462 """ % (os
.environ
['PREFIX'])
464 def testSendECS(self
):
465 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
466 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
467 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
468 self
.sendECSQuery(query
, expected
)
471 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
472 query
= dns
.message
.make_query(nameECS
, 'TXT')
473 res
= self
.sendUDPQuery(query
)
474 self
.sendECSQuery(query
, expected
)
476 def testRequireNoECS(self
):
477 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
479 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
480 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
481 self
.sendECSQuery(query
, expected
)
483 class UDPECSResponder(DatagramProtocol
):
486 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
487 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
488 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
489 ip
= socket
.inet_ntop(socket
.AF_INET6
,
492 option
.ip
& (2 ** 64 - 1)))
495 def datagramReceived(self
, datagram
, address
):
496 request
= dns
.message
.from_wire(datagram
)
498 response
= dns
.message
.make_response(request
)
499 response
.flags |
= dns
.flags
.AA
502 if (request
.question
[0].name
== dns
.name
.from_text(nameECS
) or request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
)) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
505 for option
in request
.options
:
506 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
507 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
509 # Send a scope more specific than the received source for nameECSInvalidScope
510 if request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
):
511 ecso
= clientsubnetoption
.ClientSubnetOption("192.0.42.42", 32, 32)
513 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
515 answer
= dns
.rrset
.from_text(request
.question
[0].name
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
516 response
.answer
.append(answer
)
518 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
519 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
520 response
.answer
.append(answer
)
521 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', cls
._PREFIX
+ '.21')
522 response
.additional
.append(additional
)
525 response
.options
= [ecso
]
527 self
.transport
.write(response
.to_wire(), address
)