]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
7 import clientsubnetoption
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 emptyECSText
= 'No ECS received'
13 nameECS
= 'ecs-echo.example.'
14 nameECSInvalidScope
= 'invalid-scope.ecs-echo.example.'
16 ecsReactorRunning
= False
18 class ECSTest(RecursorTest
):
19 _config_template_default
= """
24 local-address=127.0.0.1
26 packetcache-servfail-ttl=0
33 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None):
34 res
= self
.sendUDPQuery(query
)
36 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
37 self
.assertRRsetInAnswer(res
, expected
)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL
is not None:
40 self
.assertEqual(res
.answer
[0].ttl
, expectedFirstTTL
)
42 expectedFirstTTL
= res
.answer
[0].ttl
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
48 res
= self
.sendUDPQuery(query
)
50 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
51 self
.assertRRsetInAnswer(res
, expected
)
52 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
54 def checkECSQueryHit(self
, query
, expected
):
55 res
= self
.sendUDPQuery(query
)
57 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
58 self
.assertRRsetInAnswer(res
, expected
)
59 # this will break if you are not looking for the first RR, sorry!
60 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
63 def startResponders(cls
):
64 global ecsReactorRunning
65 print("Launching responders..")
67 address
= cls
._PREFIX
+ '.21'
70 if not ecsReactorRunning
:
71 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
72 ecsReactorRunning
= True
74 if not reactor
.running
:
75 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
76 cls
._UDPResponder
.setDaemon(True)
77 cls
._UDPResponder
.start()
85 confdir
= os
.path
.join('configs', cls
._confdir
)
86 cls
.createConfigDir(confdir
)
88 cls
.generateRecursorConfig(confdir
)
89 cls
.startRecursor(confdir
, cls
._recursorPort
)
91 print("Launching tests..")
94 def tearDownClass(cls
):
95 cls
.tearDownRecursor()
97 class testNoECS(ECSTest
):
100 _config_template
= """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os
.environ
['PREFIX'])
104 def testSendECS(self
):
105 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
106 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
107 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
108 self
.sendECSQuery(query
, expected
)
111 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
112 query
= dns
.message
.make_query(nameECS
, 'TXT')
113 self
.sendECSQuery(query
, expected
)
115 def testRequireNoECS(self
):
116 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
118 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
119 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
120 self
.sendECSQuery(query
, expected
)
122 class testIncomingNoECS(ECSTest
):
123 _confdir
= 'IncomingNoECS'
125 _config_template
= """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os
.environ
['PREFIX'])
130 def testSendECS(self
):
131 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
133 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
134 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
135 self
.sendECSQuery(query
, expected
)
138 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
140 query
= dns
.message
.make_query(nameECS
, 'TXT')
141 self
.sendECSQuery(query
, expected
)
143 def testRequireNoECS(self
):
144 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
146 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
147 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
148 self
.sendECSQuery(query
, expected
)
150 class testECSByName(ECSTest
):
151 _confdir
= 'ECSByName'
153 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os
.environ
['PREFIX'])
157 def testSendECS(self
):
158 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
159 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
160 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
161 self
.sendECSQuery(query
, expected
)
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
165 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
166 self
.checkECSQueryHit(query
, expected
)
169 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
170 query
= dns
.message
.make_query(nameECS
, 'TXT')
171 self
.sendECSQuery(query
, expected
)
173 def testRequireNoECS(self
):
174 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
178 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
179 self
.sendECSQuery(query
, expected
)
181 class testECSByNameLarger(ECSTest
):
182 _confdir
= 'ECSByNameLarger'
184 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
186 forward-zones=ecs-echo.example=%s.21
187 ecs-ipv4-cache-bits=32
188 ecs-ipv6-cache-bits=128
189 """ % (os
.environ
['PREFIX'])
191 def testSendECS(self
):
192 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
193 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
194 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
195 self
.sendECSQuery(query
, expected
)
197 # check that a query in a different range is a miss
198 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
199 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
200 self
.sendECSQuery(query
, expected
)
203 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
204 query
= dns
.message
.make_query(nameECS
, 'TXT')
205 self
.sendECSQuery(query
, expected
)
207 def testRequireNoECS(self
):
208 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
210 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
211 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
212 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
213 self
.sendECSQuery(query
, expected
)
215 class testECSByNameSmaller(ECSTest
):
216 _confdir
= 'ECSByNameLarger'
218 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
220 forward-zones=ecs-echo.example=%s.21
221 """ % (os
.environ
['PREFIX'])
223 def testSendECS(self
):
224 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
225 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
226 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
227 self
.sendECSQuery(query
, expected
)
230 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
231 query
= dns
.message
.make_query(nameECS
, 'TXT')
232 self
.sendECSQuery(query
, expected
)
234 def testRequireNoECS(self
):
235 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
237 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
238 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
239 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
240 self
.sendECSQuery(query
, expected
)
242 class testIncomingECSByName(ECSTest
):
243 _confdir
= 'ECSIncomingByName'
245 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
246 use-incoming-edns-subnet=yes
247 forward-zones=ecs-echo.example=%s.21
248 ecs-scope-zero-address=2001:db8::42
249 ecs-ipv4-cache-bits=32
250 ecs-ipv6-cache-bits=128
251 """ % (os
.environ
['PREFIX'])
253 def testSendECS(self
):
254 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
255 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
256 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
257 self
.sendECSQuery(query
, expected
, ttlECS
)
259 # check that a query in the same ECS range is a hit
260 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
261 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
262 self
.checkECSQueryHit(query
, expected
)
264 # check that a query in a different ECS range is a miss
265 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
266 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
267 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
268 self
.sendECSQuery(query
, expected
)
271 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
272 query
= dns
.message
.make_query(nameECS
, 'TXT')
273 self
.sendECSQuery(query
, expected
, ttlECS
)
275 def testRequireNoECS(self
):
276 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
278 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
279 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
280 self
.sendECSQuery(query
, expected
, ttlECS
)
282 class testIncomingECSByNameLarger(ECSTest
):
283 _confdir
= 'ECSIncomingByNameLarger'
285 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
286 use-incoming-edns-subnet=yes
288 forward-zones=ecs-echo.example=%s.21
289 ecs-scope-zero-address=192.168.0.1
290 ecs-ipv4-cache-bits=32
291 ecs-ipv6-cache-bits=128
292 """ % (os
.environ
['PREFIX'])
294 def testSendECS(self
):
295 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
297 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
298 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
299 self
.sendECSQuery(query
, expected
, ttlECS
)
302 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
304 query
= dns
.message
.make_query(nameECS
, 'TXT')
305 self
.sendECSQuery(query
, expected
, ttlECS
)
307 def testRequireNoECS(self
):
308 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
310 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
311 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
312 self
.sendECSQuery(query
, expected
, ttlECS
)
314 class testIncomingECSByNameSmaller(ECSTest
):
315 _confdir
= 'ECSIncomingByNameSmaller'
317 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
318 use-incoming-edns-subnet=yes
320 forward-zones=ecs-echo.example=%s.21
321 ecs-scope-zero-address=192.168.0.1
322 ecs-ipv4-cache-bits=32
323 ecs-ipv6-cache-bits=128
324 """ % (os
.environ
['PREFIX'])
326 def testSendECS(self
):
327 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
328 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
329 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
330 self
.sendECSQuery(query
, expected
)
333 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
334 query
= dns
.message
.make_query(nameECS
, 'TXT')
335 self
.sendECSQuery(query
, expected
)
337 def testRequireNoECS(self
):
338 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
340 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
341 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
342 self
.sendECSQuery(query
, expected
, ttlECS
)
344 class testIncomingECSByNameV6(ECSTest
):
345 _confdir
= 'ECSIncomingByNameV6'
347 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
348 use-incoming-edns-subnet=yes
350 ecs-ipv4-cache-bits=32
351 ecs-ipv6-cache-bits=128
352 forward-zones=ecs-echo.example=%s.21
353 query-local-address=::1
354 """ % (os
.environ
['PREFIX'])
356 def testSendECS(self
):
357 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
358 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
359 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
360 self
.sendECSQuery(query
, expected
, ttlECS
)
363 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
365 query
= dns
.message
.make_query(nameECS
, 'TXT')
366 res
= self
.sendUDPQuery(query
)
367 self
.sendECSQuery(query
, expected
, ttlECS
)
369 def testRequireNoECS(self
):
370 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
371 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
373 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
374 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
375 self
.sendECSQuery(query
, expected
, ttlECS
)
377 class testECSNameMismatch(ECSTest
):
378 _confdir
= 'ECSNameMismatch'
380 _config_template
= """edns-subnet-whitelist=not-the-right-name.example.
381 forward-zones=ecs-echo.example=%s.21
382 """ % (os
.environ
['PREFIX'])
384 def testSendECS(self
):
385 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
386 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
387 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
388 self
.sendECSQuery(query
, expected
)
391 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
392 query
= dns
.message
.make_query(nameECS
, 'TXT')
393 self
.sendECSQuery(query
, expected
)
395 def testRequireNoECS(self
):
396 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
398 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
399 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
400 self
.sendECSQuery(query
, expected
)
402 class testECSByIP(ECSTest
):
405 _config_template
= """edns-subnet-whitelist=%s.21
406 forward-zones=ecs-echo.example=%s.21
407 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
409 def testSendECS(self
):
410 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
411 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
412 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
413 self
.sendECSQuery(query
, expected
)
416 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
417 query
= dns
.message
.make_query(nameECS
, 'TXT')
418 self
.sendECSQuery(query
, expected
)
420 def testRequireNoECS(self
):
421 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
423 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
424 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
425 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
426 self
.sendECSQuery(query
, expected
)
428 class testIncomingECSByIP(ECSTest
):
429 _confdir
= 'ECSIncomingByIP'
431 _config_template
= """edns-subnet-whitelist=%s.21
432 use-incoming-edns-subnet=yes
433 forward-zones=ecs-echo.example=%s.21
434 ecs-scope-zero-address=::1
435 ecs-ipv4-cache-bits=32
436 ecs-ipv6-cache-bits=128
437 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
439 def testSendECS(self
):
440 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
442 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
443 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
444 self
.sendECSQuery(query
, expected
)
447 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
448 query
= dns
.message
.make_query(nameECS
, 'TXT')
449 self
.sendECSQuery(query
, expected
)
451 def testRequireNoECS(self
):
452 # we will get ::1 because ecs-scope-zero-addr is set to ::1
453 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
455 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
456 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
457 self
.sendECSQuery(query
, expected
, ttlECS
)
459 def testSendECSInvalidScope(self
):
460 # test that the recursor does not cache with a more specific scope than the source it sent
461 expected
= dns
.rrset
.from_text(nameECSInvalidScope
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
463 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
464 query
= dns
.message
.make_query(nameECSInvalidScope
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
466 self
.sendECSQuery(query
, expected
)
468 class testECSIPMismatch(ECSTest
):
469 _confdir
= 'ECSIPMismatch'
471 _config_template
= """edns-subnet-whitelist=192.0.2.1
472 forward-zones=ecs-echo.example=%s.21
473 """ % (os
.environ
['PREFIX'])
475 def testSendECS(self
):
476 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
477 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
478 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
479 self
.sendECSQuery(query
, expected
)
482 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
483 query
= dns
.message
.make_query(nameECS
, 'TXT')
484 res
= self
.sendUDPQuery(query
)
485 self
.sendECSQuery(query
, expected
)
487 def testRequireNoECS(self
):
488 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
490 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
491 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
492 self
.sendECSQuery(query
, expected
)
494 class UDPECSResponder(DatagramProtocol
):
497 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
498 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
499 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
500 ip
= socket
.inet_ntop(socket
.AF_INET6
,
503 option
.ip
& (2 ** 64 - 1)))
506 def datagramReceived(self
, datagram
, address
):
507 request
= dns
.message
.from_wire(datagram
)
509 response
= dns
.message
.make_response(request
)
510 response
.flags |
= dns
.flags
.AA
513 if (request
.question
[0].name
== dns
.name
.from_text(nameECS
) or request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
)) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
516 for option
in request
.options
:
517 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
518 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
520 # Send a scope more specific than the received source for nameECSInvalidScope
521 if request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
):
522 ecso
= clientsubnetoption
.ClientSubnetOption("192.0.42.42", 32, 32)
524 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
526 answer
= dns
.rrset
.from_text(request
.question
[0].name
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
527 response
.answer
.append(answer
)
529 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
530 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
531 response
.answer
.append(answer
)
532 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', os
.environ
['PREFIX'] + '.21')
533 response
.additional
.append(additional
)
536 response
.options
= [ecso
]
538 self
.transport
.write(response
.to_wire(), address
)