7 import clientsubnetoption
9 from recursortests
import RecursorTest
, have_ipv6
10 from twisted
.internet
.protocol
import DatagramProtocol
11 from twisted
.internet
import reactor
13 emptyECSText
= 'No ECS received'
14 nameECS
= 'ecs-echo.example.'
15 nameECSInvalidScope
= 'invalid-scope.ecs-echo.example.'
17 ecsReactorRunning
= False
18 ecsReactorv6Running
= False
20 class ECSTest(RecursorTest
):
21 _config_template_default
= """
25 local-address=127.0.0.1
27 packetcache-servfail-ttl=15
37 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None, scopeZeroResponse
=None):
38 res
= self
.sendUDPQuery(query
)
40 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
41 self
.assertRRsetInAnswer(res
, expected
)
42 # this will break if you are not looking for the first RR, sorry!
43 if expectedFirstTTL
is not None:
44 self
.assertTrue(res
.answer
[0].ttl
== expectedFirstTTL
or res
.answer
[0].ttl
== expectedFirstTTL
- 1)
46 expectedFirstTTL
= res
.answer
[0].ttl
47 self
.assertEqual(res
.edns
, query
.edns
)
49 if scopeZeroResponse
is not None:
50 self
.assertEqual(res
.edns
, 0)
52 self
.assertEqual(len(res
.options
), 1)
53 self
.assertEqual(res
.options
[0].otype
, 8)
54 self
.assertEqual(res
.options
[0].scope
, 0)
56 self
.assertEqual(len(res
.options
), 1)
57 self
.assertEqual(res
.options
[0].otype
, 8)
58 self
.assertNotEqual(res
.options
[0].scope
, 0)
60 # wait one second, check that the TTL has been
61 # decreased indicating a cache hit
64 res
= self
.sendUDPQuery(query
)
66 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
67 self
.assertRRsetInAnswer(res
, expected
)
68 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
69 self
.assertEqual(res
.edns
, query
.edns
)
71 def checkECSQueryHit(self
, query
, expected
):
72 res
= self
.sendUDPQuery(query
)
74 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
75 self
.assertRRsetInAnswer(res
, expected
)
76 # this will break if you are not looking for the first RR, sorry!
77 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
80 def startResponders(cls
):
81 global ecsReactorRunning
82 global ecsReactorv6Running
83 print("Launching responders..")
85 address
= cls
._PREFIX
+ '.21'
88 if not ecsReactorRunning
:
89 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
90 ecsReactorRunning
= True
92 if not ecsReactorv6Running
and have_ipv6():
93 reactor
.listenUDP(53000, UDPECSResponder(), interface
='::1')
94 ecsReactorv6Running
= True
96 if not reactor
.running
:
97 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
98 cls
._UDPResponder
.daemon
= True
99 cls
._UDPResponder
.start()
101 class NoECSTest(ECSTest
):
104 _config_template
= """edns-subnet-allow-list=
105 forward-zones=ecs-echo.example=%s.21
106 """ % (os
.environ
['PREFIX'])
108 def testSendECS(self
):
109 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
110 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
111 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
112 self
.sendECSQuery(query
, expected
)
115 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
116 query
= dns
.message
.make_query(nameECS
, 'TXT')
117 self
.sendECSQuery(query
, expected
)
119 def testRequireNoECS(self
):
120 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
122 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
123 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
124 self
.sendECSQuery(query
, expected
)
126 class IncomingNoECSTest(ECSTest
):
127 _confdir
= 'IncomingNoECS'
129 _config_template
= """edns-subnet-allow-list=
130 use-incoming-edns-subnet=yes
131 forward-zones=ecs-echo.example=%s.21
132 """ % (os
.environ
['PREFIX'])
134 def testSendECS(self
):
135 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
137 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
138 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
139 self
.sendECSQuery(query
, expected
, scopeZeroResponse
=True)
142 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
144 query
= dns
.message
.make_query(nameECS
, 'TXT')
145 self
.sendECSQuery(query
, expected
)
147 def testRequireNoECS(self
):
148 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
150 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
151 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
152 self
.sendECSQuery(query
, expected
, scopeZeroResponse
=True)
154 class ECSByNameTest(ECSTest
):
155 _confdir
= 'ECSByName'
157 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
158 forward-zones=ecs-echo.example=%s.21
159 """ % (os
.environ
['PREFIX'])
161 def testSendECS(self
):
162 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
163 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
164 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
165 self
.sendECSQuery(query
, expected
)
167 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
168 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
169 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
170 self
.checkECSQueryHit(query
, expected
)
173 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
174 query
= dns
.message
.make_query(nameECS
, 'TXT')
175 self
.sendECSQuery(query
, expected
)
177 def testRequireNoECS(self
):
178 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
180 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
181 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
182 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
183 self
.sendECSQuery(query
, expected
)
185 class ECSByNameLargerTest(ECSTest
):
186 _confdir
= 'ECSByNameLarger'
188 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
190 forward-zones=ecs-echo.example=%s.21
191 ecs-ipv4-cache-bits=32
192 ecs-ipv6-cache-bits=128
193 """ % (os
.environ
['PREFIX'])
195 def testSendECS(self
):
196 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
197 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
198 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
199 self
.sendECSQuery(query
, expected
)
201 # check that a query in a different range is a miss
202 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
203 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
204 self
.sendECSQuery(query
, expected
)
207 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
208 query
= dns
.message
.make_query(nameECS
, 'TXT')
209 self
.sendECSQuery(query
, expected
)
211 def testRequireNoECS(self
):
212 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
214 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
215 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
216 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
217 self
.sendECSQuery(query
, expected
)
219 class ECSByNameSmallerTest(ECSTest
):
220 _confdir
= 'ECSByNameSmaller'
222 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
224 forward-zones=ecs-echo.example=%s.21
225 """ % (os
.environ
['PREFIX'])
227 def testSendECS(self
):
228 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
229 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
230 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
231 self
.sendECSQuery(query
, expected
)
234 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
235 query
= dns
.message
.make_query(nameECS
, 'TXT')
236 self
.sendECSQuery(query
, expected
)
238 def testRequireNoECS(self
):
239 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
241 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
242 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
243 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
244 self
.sendECSQuery(query
, expected
)
246 class IncomingECSByNameTest(ECSTest
):
247 _confdir
= 'IncomingECSByName'
249 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
250 use-incoming-edns-subnet=yes
251 forward-zones=ecs-echo.example=%s.21
252 ecs-scope-zero-address=2001:db8::42
253 ecs-ipv4-cache-bits=32
254 ecs-ipv6-cache-bits=128
255 """ % (os
.environ
['PREFIX'])
257 def testSendECS(self
):
258 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
259 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
260 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
261 self
.sendECSQuery(query
, expected
, ttlECS
)
263 # check that a query in the same ECS range is a hit
264 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
265 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
266 self
.checkECSQueryHit(query
, expected
)
268 # check that a query in a different ECS range is a miss
269 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
270 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
271 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
272 self
.sendECSQuery(query
, expected
)
275 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
276 query
= dns
.message
.make_query(nameECS
, 'TXT')
277 self
.sendECSQuery(query
, expected
, ttlECS
)
279 def testRequireNoECS(self
):
280 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
282 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
283 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
284 self
.sendECSQuery(query
, expected
, ttlECS
)
286 class IncomingECSByNameLargerTest(ECSTest
):
287 _confdir
= 'IncomingECSByNameLarger'
289 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
290 use-incoming-edns-subnet=yes
292 forward-zones=ecs-echo.example=%s.21
293 ecs-scope-zero-address=192.168.0.1
294 ecs-ipv4-cache-bits=32
295 ecs-ipv6-cache-bits=128
296 """ % (os
.environ
['PREFIX'])
298 def testSendECS(self
):
299 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
301 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
302 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
303 self
.sendECSQuery(query
, expected
, ttlECS
)
306 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
308 query
= dns
.message
.make_query(nameECS
, 'TXT')
309 self
.sendECSQuery(query
, expected
, ttlECS
)
311 def testRequireNoECS(self
):
312 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
314 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
315 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
316 self
.sendECSQuery(query
, expected
, ttlECS
)
318 class IncomingECSByNameSmallerTest(ECSTest
):
319 _confdir
= 'IncomingECSByNameSmaller'
321 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
322 use-incoming-edns-subnet=yes
324 forward-zones=ecs-echo.example=%s.21
325 ecs-scope-zero-address=192.168.0.1
326 ecs-ipv4-cache-bits=32
327 ecs-ipv6-cache-bits=128
328 """ % (os
.environ
['PREFIX'])
330 def testSendECS(self
):
331 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
332 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
333 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
334 self
.sendECSQuery(query
, expected
)
337 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
338 query
= dns
.message
.make_query(nameECS
, 'TXT')
339 self
.sendECSQuery(query
, expected
)
341 def testRequireNoECS(self
):
342 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
344 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
345 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
346 self
.sendECSQuery(query
, expected
, ttlECS
)
348 @unittest.skipIf(not have_ipv6(), "No IPv6")
349 class IncomingECSByNameV6Test(ECSTest
):
350 _confdir
= 'IncomingECSByNameV6'
352 _config_template
= """edns-subnet-allow-list=ecs-echo.example.
353 use-incoming-edns-subnet=yes
355 ecs-ipv4-cache-bits=32
356 ecs-ipv6-cache-bits=128
357 query-local-address=::1
358 forward-zones=ecs-echo.example=[::1]:53000
361 def testSendECS(self
):
362 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
363 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
364 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
365 self
.sendECSQuery(query
, expected
, ttlECS
)
368 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
370 query
= dns
.message
.make_query(nameECS
, 'TXT')
371 res
= self
.sendUDPQuery(query
)
372 self
.sendECSQuery(query
, expected
, ttlECS
)
374 def testRequireNoECS(self
):
375 # we should get ::1/128 because ecs-scope-zero-addr is unset and query-local-address is set to ::1
376 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
378 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
379 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
380 self
.sendECSQuery(query
, expected
, ttlECS
)
382 class ECSNameMismatchTest(ECSTest
):
383 _confdir
= 'ECSNameMismatch'
385 _config_template
= """edns-subnet-allow-list=not-the-right-name.example.
386 forward-zones=ecs-echo.example=%s.21
387 """ % (os
.environ
['PREFIX'])
389 def testSendECS(self
):
390 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
391 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
392 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
393 self
.sendECSQuery(query
, expected
)
396 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
397 query
= dns
.message
.make_query(nameECS
, 'TXT')
398 self
.sendECSQuery(query
, expected
)
400 def testRequireNoECS(self
):
401 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
403 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
404 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
405 self
.sendECSQuery(query
, expected
)
407 class ECSByIPTest(ECSTest
):
410 _config_template
= """edns-subnet-allow-list=%s.21
411 forward-zones=ecs-echo.example=%s.21
412 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
414 def testSendECS(self
):
415 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
416 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
417 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
418 self
.sendECSQuery(query
, expected
)
421 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
422 query
= dns
.message
.make_query(nameECS
, 'TXT')
423 self
.sendECSQuery(query
, expected
)
425 def testRequireNoECS(self
):
426 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
428 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
429 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
430 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
431 self
.sendECSQuery(query
, expected
)
433 class IncomingECSByIPTest(ECSTest
):
434 _confdir
= 'IncomingECSByIP'
436 _config_template
= """edns-subnet-allow-list=%s.21
437 use-incoming-edns-subnet=yes
438 forward-zones=ecs-echo.example=%s.21
439 ecs-scope-zero-address=::1
440 ecs-ipv4-cache-bits=32
441 ecs-ipv6-cache-bits=128
442 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
444 def testSendECS(self
):
445 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
447 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
448 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
449 self
.sendECSQuery(query
, expected
)
452 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
453 query
= dns
.message
.make_query(nameECS
, 'TXT')
454 self
.sendECSQuery(query
, expected
)
456 def testRequireNoECS(self
):
457 # we will get ::1 because ecs-scope-zero-addr is set to ::1
458 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
460 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
461 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
462 self
.sendECSQuery(query
, expected
, ttlECS
)
464 def testSendECSInvalidScope(self
):
465 # test that the recursor does not cache with a more specific scope than the source it sent
466 expected
= dns
.rrset
.from_text(nameECSInvalidScope
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
468 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
469 query
= dns
.message
.make_query(nameECSInvalidScope
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
471 self
.sendECSQuery(query
, expected
)
473 class ECSIPMismatchTest(ECSTest
):
474 _confdir
= 'ECSIPMismatch'
476 _config_template
= """edns-subnet-allow-list=192.0.2.1
477 forward-zones=ecs-echo.example=%s.21
478 """ % (os
.environ
['PREFIX'])
480 def testSendECS(self
):
481 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
482 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
483 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
484 self
.sendECSQuery(query
, expected
)
487 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
488 query
= dns
.message
.make_query(nameECS
, 'TXT')
489 res
= self
.sendUDPQuery(query
)
490 self
.sendECSQuery(query
, expected
)
492 def testRequireNoECS(self
):
493 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
495 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
496 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
497 self
.sendECSQuery(query
, expected
)
499 class ECSWithProxyProtocolRecursorTest(ECSTest
):
500 _confdir
= 'ECSWithProxyProtocolRecursor'
501 _config_template
= """
502 ecs-add-for=2001:db8::1/128
503 edns-subnet-allow-list=ecs-echo.example.
504 forward-zones=ecs-echo.example=%s.21
505 proxy-protocol-from=127.0.0.1/32
506 allow-from=2001:db8::1/128
507 """ % (os
.environ
['PREFIX'])
509 def testProxyProtocolPlusECS(self
):
511 expected
= dns
.rrset
.from_text(qname
, 0, dns
.rdataclass
.IN
, 'TXT', '2001:db8::/56')
513 query
= dns
.message
.make_query(qname
, 'TXT', use_edns
=True)
514 for method
in ("sendUDPQueryWithProxyProtocol", "sendTCPQueryWithProxyProtocol"):
515 sender
= getattr(self
, method
)
516 res
= sender(query
, True, '2001:db8::1', '2001:db8::2', 0, 65535)
517 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
518 self
.assertRRsetInAnswer(res
, expected
)
520 class TooLargeToAddZeroScopeTest(RecursorTest
):
522 _confdir
= 'TooLargeToAddZeroScope'
523 _config_template
= """
524 use-incoming-edns-subnet=yes
527 _lua_dns_script_file
= """
528 function preresolve(dq)
529 if dq.qname == newDN('toolarge.ecs.') then
530 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
541 def generateRecursorConfig(cls
, confdir
):
542 super(TooLargeToAddZeroScopeTest
, cls
).generateRecursorConfig(confdir
)
544 def testTooLarge(self
):
545 qname
= 'toolarge.ecs.'
546 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 24)
547 query
= dns
.message
.make_query(qname
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
549 # should not have an ECS Option since the packet is too large already
550 res
= self
.sendUDPQuery(query
, timeout
=5.0)
551 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
552 self
.assertEqual(len(res
.answer
), 1)
553 self
.assertEqual(res
.edns
, 0)
554 self
.assertEqual(len(res
.options
), 0)
556 res
= self
.sendTCPQuery(query
, timeout
=5.0)
557 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
558 self
.assertEqual(len(res
.answer
), 1)
559 self
.assertEqual(res
.edns
, 0)
560 self
.assertEqual(len(res
.options
), 1)
561 self
.assertEqual(res
.options
[0].otype
, 8)
562 self
.assertEqual(res
.options
[0].scope
, 0)
564 class UDPECSResponder(DatagramProtocol
):
567 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
568 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
569 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
570 ip
= socket
.inet_ntop(socket
.AF_INET6
,
573 option
.ip
& (2 ** 64 - 1)))
576 def datagramReceived(self
, datagram
, address
):
577 request
= dns
.message
.from_wire(datagram
)
579 response
= dns
.message
.make_response(request
)
580 response
.flags |
= dns
.flags
.AA
583 if (request
.question
[0].name
== dns
.name
.from_text(nameECS
) or request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
)) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
586 for option
in request
.options
:
587 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
588 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
590 # Send a scope more specific than the received source for nameECSInvalidScope
591 if request
.question
[0].name
== dns
.name
.from_text(nameECSInvalidScope
):
592 ecso
= clientsubnetoption
.ClientSubnetOption("192.0.42.42", 32, 32)
594 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
596 answer
= dns
.rrset
.from_text(request
.question
[0].name
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
597 response
.answer
.append(answer
)
599 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
600 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
601 response
.answer
.append(answer
)
602 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', os
.environ
['PREFIX'] + '.21')
603 response
.additional
.append(additional
)
606 response
.use_edns(options
= [ecso
])
608 self
.transport
.write(response
.to_wire(), address
)