]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
7 import clientsubnetoption
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 emptyECSText
= 'No ECS received'
13 nameECS
= 'ecs-echo.example.'
14 nameECSInvalidScope
= 'invalid-scope.ecs-echo.example.'
16 ecsReactorRunning
= False
18 class ECSTest(RecursorTest
):
19 _config_template_default
= """
24 local-address=127.0.0.1
26 packetcache-servfail-ttl=0
33 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None):
34 res
= self
.sendUDPQuery(query
)
36 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
37 self
.assertRRsetInAnswer(res
, expected
)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL
is not None:
40 self
.assertEqual(res
.answer
[0].ttl
, expectedFirstTTL
)
42 expectedFirstTTL
= res
.answer
[0].ttl
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
48 res
= self
.sendUDPQuery(query
)
50 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
51 self
.assertRRsetInAnswer(res
, expected
)
52 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
54 def checkECSQueryHit(self
, query
, expected
):
55 res
= self
.sendUDPQuery(query
)
57 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
58 self
.assertRRsetInAnswer(res
, expected
)
59 # this will break if you are not looking for the first RR, sorry!
60 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
63 def startResponders(cls
):
64 global ecsReactorRunning
65 print("Launching responders..")
67 address
= cls
._PREFIX
+ '.21'
70 if not ecsReactorRunning
:
71 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
72 ecsReactorRunning
= True
74 if not reactor
.running
:
75 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
76 cls
._UDPResponder
.setDaemon(True)
77 cls
._UDPResponder
.start()
85 confdir
= os
.path
.join('configs', cls
._confdir
)
86 cls
.createConfigDir(confdir
)
88 cls
.generateRecursorConfig(confdir
)
89 cls
.startRecursor(confdir
, cls
._recursorPort
)
91 print("Launching tests..")
94 def tearDownClass(cls
):
95 cls
.tearDownRecursor()
97 class testNoECS(ECSTest
):
100 _config_template
= """edns-subnet-whitelist=
101 forward-zones=ecs-echo.example=%s.21
102 """ % (os
.environ
['PREFIX'])
104 def testSendECS(self
):
105 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
106 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
107 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
108 self
.sendECSQuery(query
, expected
)
111 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
112 query
= dns
.message
.make_query(nameECS
, 'TXT')
113 self
.sendECSQuery(query
, expected
)
115 def testRequireNoECS(self
):
116 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
118 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
119 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
120 self
.sendECSQuery(query
, expected
)
122 class testIncomingNoECS(ECSTest
):
123 _confdir
= 'IncomingNoECS'
125 _config_template
= """edns-subnet-whitelist=
126 use-incoming-edns-subnet=yes
127 forward-zones=ecs-echo.example=%s.21
128 """ % (os
.environ
['PREFIX'])
130 def testSendECS(self
):
131 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
133 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
134 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
135 self
.sendECSQuery(query
, expected
)
138 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
140 query
= dns
.message
.make_query(nameECS
, 'TXT')
141 self
.sendECSQuery(query
, expected
)
143 def testRequireNoECS(self
):
144 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
146 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
147 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
148 self
.sendECSQuery(query
, expected
)
150 class testECSByName(ECSTest
):
151 _confdir
= 'ECSByName'
153 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
154 forward-zones=ecs-echo.example=%s.21
155 """ % (os
.environ
['PREFIX'])
157 def testSendECS(self
):
158 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
159 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
160 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
161 self
.sendECSQuery(query
, expected
)
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
165 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
166 self
.checkECSQueryHit(query
, expected
)
169 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
170 query
= dns
.message
.make_query(nameECS
, 'TXT')
171 self
.sendECSQuery(query
, expected
)
173 def testRequireNoECS(self
):
174 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
178 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
179 self
.sendECSQuery(query
, expected
)
181 class testECSByNameLarger(ECSTest
):
182 _confdir
= 'ECSByNameLarger'
184 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
186 forward-zones=ecs-echo.example=%s.21
187 ecs-ipv4-cache-bits=32
188 ecs-ipv6-cache-bits=128
189 """ % (os
.environ
['PREFIX'])
191 def testSendECS(self
):
192 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
193 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
194 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
195 self
.sendECSQuery(query
, expected
)
197 # check that a query in a different range is a miss
198 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
199 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
200 self
.sendECSQuery(query
, expected
)
203 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
204 query
= dns
.message
.make_query(nameECS
, 'TXT')
205 self
.sendECSQuery(query
, expected
)
207 def testRequireNoECS(self
):
208 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
210 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
211 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
212 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
213 self
.sendECSQuery(query
, expected
)
215 class testECSByNameSmaller(ECSTest
):
216 _confdir
= 'ECSByNameLarger'
218 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
220 forward-zones=ecs-echo.example=%s.21
221 """ % (os
.environ
['PREFIX'])
223 def testSendECS(self
):
224 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
225 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
226 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
227 self
.sendECSQuery(query
, expected
)
230 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
231 query
= dns
.message
.make_query(nameECS
, 'TXT')
232 self
.sendECSQuery(query
, expected
)
234 def testRequireNoECS(self
):
235 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
237 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
238 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
239 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
240 self
.sendECSQuery(query
, expected
)
242 class testIncomingECSByName(ECSTest
):
243 _confdir
= 'ECSIncomingByName'
245 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
246 use-incoming-edns-subnet=yes
247 forward-zones=ecs-echo.example=%s.21
248 ecs-scope-zero-address=2001:db8::42
249 ecs-ipv4-cache-bits=32
250 ecs-ipv6-cache-bits=128
251 """ % (os
.environ
['PREFIX'])
253 def testSendECS(self
):
254 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
255 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
256 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
257 self
.sendECSQuery(query
, expected
, ttlECS
)
259 # check that a query in the same ECS range is a hit
260 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
261 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
262 self
.checkECSQueryHit(query
, expected
)
264 # check that a query in a different ECS range is a miss
265 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
266 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
267 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
268 self
.sendECSQuery(query
, expected
)
271 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
272 query
= dns
.message
.make_query(nameECS
, 'TXT')
273 self
.sendECSQuery(query
, expected
, ttlECS
)
275 def testRequireNoECS(self
):
276 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
278 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
279 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
280 self
.sendECSQuery(query
, expected
, ttlECS
)
282 class testIncomingECSByNameLarger(ECSTest
):
283 _confdir
= 'ECSIncomingByNameLarger'
285 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
286 use-incoming-edns-subnet=yes
288 forward-zones=ecs-echo.example=%s.21
289 ecs-scope-zero-address=192.168.0.1
290 ecs-ipv4-cache-bits=32
291 ecs-ipv6-cache-bits=128
292 """ % (os
.environ
['PREFIX'])
294 def testSendECS(self
):
295 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
297 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
298 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
299 self
.sendECSQuery(query
, expected
, ttlECS
)
302 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
304 query
= dns
.message
.make_query(nameECS
, 'TXT')
305 self
.sendECSQuery(query
, expected
, ttlECS
)
307 def testRequireNoECS(self
):
308 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
310 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
311 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
312 self
.sendECSQuery(query
, expected
, ttlECS
)
314 class testIncomingECSByNameSmaller(ECSTest
):
315 _confdir
= 'ECSIncomingByNameSmaller'
317 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
318 use-incoming-edns-subnet=yes
320 forward-zones=ecs-echo.example=%s.21
321 ecs-scope-zero-address=192.168.0.1
322 ecs-ipv4-cache-bits=32
323 ecs-ipv6-cache-bits=128
324 """ % (os
.environ
['PREFIX'])
326 def testSendECS(self
):
327 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
328 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
329 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
330 self
.sendECSQuery(query
, expected
)
333 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
334 query
= dns
.message
.make_query(nameECS
, 'TXT')
335 self
.sendECSQuery(query
, expected
)
337 def testRequireNoECS(self
):
338 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
340 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
341 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
342 self
.sendECSQuery(query
, expected
, ttlECS
)
344 class testIncomingECSByNameV6(ECSTest
):
345 _confdir
= 'ECSIncomingByNameV6'
347 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
348 use-incoming-edns-subnet=yes
350 ecs-ipv4-cache-bits=32
351 ecs-ipv6-cache-bits=128
352 forward-zones=ecs-echo.example=%s.21
353 query-local-address6=::1
354 """ % (os
.environ
['PREFIX'])
356 def testSendECS(self
):
357 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
358 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
359 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
360 self
.sendECSQuery(query
, expected
, ttlECS
)
363 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
365 query
= dns
.message
.make_query(nameECS
, 'TXT')
366 res
= self
.sendUDPQuery(query
)
367 self
.sendECSQuery(query
, expected
, ttlECS
)
369 def testRequireNoECS(self
):
370 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
371 # but query-local-address6 is set to ::1
372 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
374 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
375 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
376 self
.sendECSQuery(query
, expected
, ttlECS
)
378 class testECSNameMismatch(ECSTest
):
379 _confdir
= 'ECSNameMismatch'
381 _config_template
= """edns-subnet-whitelist=not-the-right-name.example.
382 forward-zones=ecs-echo.example=%s.21
383 """ % (os
.environ
['PREFIX'])
385 def testSendECS(self
):
386 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
387 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
388 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
389 self
.sendECSQuery(query
, expected
)
392 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
393 query
= dns
.message
.make_query(nameECS
, 'TXT')
394 self
.sendECSQuery(query
, expected
)
396 def testRequireNoECS(self
):
397 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
399 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
400 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
401 self
.sendECSQuery(query
, expected
)
403 class testECSByIP(ECSTest
):
406 _config_template
= """edns-subnet-whitelist=%s.21
407 forward-zones=ecs-echo.example=%s.21
408 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
410 def testSendECS(self
):
411 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
412 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
413 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
414 self
.sendECSQuery(query
, expected
)
417 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
418 query
= dns
.message
.make_query(nameECS
, 'TXT')
419 self
.sendECSQuery(query
, expected
)
421 def testRequireNoECS(self
):
422 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
424 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
425 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
426 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
427 self
.sendECSQuery(query
, expected
)
429 class testIncomingECSByIP(ECSTest
):
430 _confdir
= 'ECSIncomingByIP'
432 _config_template
= """edns-subnet-whitelist=%s.21
433 use-incoming-edns-subnet=yes
434 forward-zones=ecs-echo.example=%s.21
435 ecs-scope-zero-address=::1
436 ecs-ipv4-cache-bits=32
437 ecs-ipv6-cache-bits=128
438 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
440 def testSendECS(self
):
441 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
443 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
444 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
445 self
.sendECSQuery(query
, expected
)
448 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
449 query
= dns
.message
.make_query(nameECS
, 'TXT')
450 self
.sendECSQuery(query
, expected
)
452 def testRequireNoECS(self
):
453 # we will get ::1 because ecs-scope-zero-addr is set to ::1
454 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
456 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
457 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
458 self
.sendECSQuery(query
, expected
, ttlECS
)
460 def testSendECSInvalidScope(self
):
461 # test that the recursor does not cache with a more specific scope than the source it sent
462 expected
= dns
.rrset
.from_text(nameECSInvalidScope
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
464 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
465 query
= dns
.message
.make_query(nameECSInvalidScope
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
467 self
.sendECSQuery(query
, expected
)
469 class testECSIPMismatch(ECSTest
):
470 _confdir
= 'ECSIPMismatch'
472 _config_template
= """edns-subnet-whitelist=192.0.2.1
473 forward-zones=ecs-echo.example=%s.21
474 """ % (os
.environ
['PREFIX'])
476 def testSendECS(self
):
477 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
478 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
479 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
480 self
.sendECSQuery(query
, expected
)
483 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
484 query
= dns
.message
.make_query(nameECS
, 'TXT')
485 res
= self
.sendUDPQuery(query
)
486 self
.sendECSQuery(query
, expected
)
488 def testRequireNoECS(self
):
489 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
491 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
492 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
493 self
.sendECSQuery(query
, expected
)
495 class UDPECSResponder(DatagramProtocol
):
498 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
499 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
500 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
501 ip
= socket
.inet_ntop(socket
.AF_INET6
,
504 option
.ip
& (2 ** 64 - 1)))
507 def datagramReceived(self
, datagram
, address
):
508 request
= dns
.message
.from_wire(datagram
)
510 response
= dns
.message
.make_response(request
)
511 response
.flags |
= dns
.flags
.AA
514 if (request
.question
[0].name
== dns
.name
.from_text(nameECS
) or request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
)) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
517 for option
in request
.options
:
518 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
519 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
521 # Send a scope more specific than the received source for nameECSInvalidScope
522 if request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
):
523 ecso
= clientsubnetoption
.ClientSubnetOption("192.0.42.42", 32, 32)
525 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
527 answer
= dns
.rrset
.from_text(request
.question
[0].name
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
528 response
.answer
.append(answer
)
530 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
531 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
532 response
.answer
.append(answer
)
533 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', os
.environ
['PREFIX'] + '.21')
534 response
.additional
.append(additional
)
537 response
.options
= [ecso
]
539 self
.transport
.write(response
.to_wire(), address
)