]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
7 import clientsubnetoption
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 emptyECSText
= 'No ECS received'
13 nameECS
= 'ecs-echo.example.'
15 ecsReactorRunning
= False
17 class ECSTest(RecursorTest
):
18 _config_template_default
= """
22 local-address=127.0.0.1
24 packetcache-servfail-ttl=0
31 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None):
32 res
= self
.sendUDPQuery(query
)
34 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
35 self
.assertRRsetInAnswer(res
, expected
)
36 # this will break if you are not looking for the first RR, sorry!
37 if expectedFirstTTL
is not None:
38 self
.assertEqual(res
.answer
[0].ttl
, expectedFirstTTL
)
40 expectedFirstTTL
= res
.answer
[0].ttl
42 # wait one second, check that the TTL has been
43 # decreased indicating a cache hit
46 res
= self
.sendUDPQuery(query
)
48 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
49 self
.assertRRsetInAnswer(res
, expected
)
50 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
52 def checkECSQueryHit(self
, query
, expected
):
53 res
= self
.sendUDPQuery(query
)
55 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
56 self
.assertRRsetInAnswer(res
, expected
)
57 # this will break if you are not looking for the first RR, sorry!
58 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
61 def startResponders(cls
):
62 global ecsReactorRunning
63 print("Launching responders..")
65 address
= cls
._PREFIX
+ '.21'
68 if not ecsReactorRunning
:
69 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
70 ecsReactorRunning
= True
72 if not reactor
.running
:
73 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
74 cls
._UDPResponder
.setDaemon(True)
75 cls
._UDPResponder
.start()
83 confdir
= os
.path
.join('configs', cls
._confdir
)
84 cls
.createConfigDir(confdir
)
86 cls
.generateRecursorConfig(confdir
)
87 cls
.startRecursor(confdir
, cls
._recursorPort
)
89 print("Launching tests..")
92 def tearDownClass(cls
):
93 cls
.tearDownRecursor()
95 class testNoECS(ECSTest
):
98 _config_template
= """edns-subnet-whitelist=
99 forward-zones=ecs-echo.example=%s.21
100 """ % (os
.environ
['PREFIX'])
102 def testSendECS(self
):
103 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
104 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
105 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
106 self
.sendECSQuery(query
, expected
)
109 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
110 query
= dns
.message
.make_query(nameECS
, 'TXT')
111 self
.sendECSQuery(query
, expected
)
113 def testRequireNoECS(self
):
114 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
116 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
117 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
118 self
.sendECSQuery(query
, expected
)
120 class testIncomingNoECS(ECSTest
):
121 _confdir
= 'IncomingNoECS'
123 _config_template
= """edns-subnet-whitelist=
124 use-incoming-edns-subnet=yes
125 forward-zones=ecs-echo.example=%s.21
126 """ % (os
.environ
['PREFIX'])
128 def testSendECS(self
):
129 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
131 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
132 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
133 self
.sendECSQuery(query
, expected
)
136 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
138 query
= dns
.message
.make_query(nameECS
, 'TXT')
139 self
.sendECSQuery(query
, expected
)
141 def testRequireNoECS(self
):
142 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
144 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
145 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
146 self
.sendECSQuery(query
, expected
)
148 class testECSByName(ECSTest
):
149 _confdir
= 'ECSByName'
151 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
152 forward-zones=ecs-echo.example=%s.21
153 """ % (os
.environ
['PREFIX'])
155 def testSendECS(self
):
156 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
157 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
158 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
159 self
.sendECSQuery(query
, expected
)
161 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
162 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
163 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
164 self
.checkECSQueryHit(query
, expected
)
167 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
168 query
= dns
.message
.make_query(nameECS
, 'TXT')
169 self
.sendECSQuery(query
, expected
)
171 def testRequireNoECS(self
):
172 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
174 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
175 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
176 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
177 self
.sendECSQuery(query
, expected
)
179 class testECSByNameLarger(ECSTest
):
180 _confdir
= 'ECSByNameLarger'
182 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
184 forward-zones=ecs-echo.example=%s.21
185 """ % (os
.environ
['PREFIX'])
187 def testSendECS(self
):
188 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
189 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
190 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
191 self
.sendECSQuery(query
, expected
)
193 # check that a query in a different range is a miss
194 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
195 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
196 self
.sendECSQuery(query
, expected
)
199 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
200 query
= dns
.message
.make_query(nameECS
, 'TXT')
201 self
.sendECSQuery(query
, expected
)
203 def testRequireNoECS(self
):
204 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
206 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
207 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
208 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
209 self
.sendECSQuery(query
, expected
)
211 class testECSByNameSmaller(ECSTest
):
212 _confdir
= 'ECSByNameLarger'
214 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
216 forward-zones=ecs-echo.example=%s.21
217 """ % (os
.environ
['PREFIX'])
219 def testSendECS(self
):
220 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
221 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
222 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
223 self
.sendECSQuery(query
, expected
)
226 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
227 query
= dns
.message
.make_query(nameECS
, 'TXT')
228 self
.sendECSQuery(query
, expected
)
230 def testRequireNoECS(self
):
231 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
233 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
234 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
235 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
236 self
.sendECSQuery(query
, expected
)
238 class testIncomingECSByName(ECSTest
):
239 _confdir
= 'ECSIncomingByName'
241 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
242 use-incoming-edns-subnet=yes
243 forward-zones=ecs-echo.example=%s.21
244 ecs-scope-zero-address=2001:db8::42
245 """ % (os
.environ
['PREFIX'])
247 def testSendECS(self
):
248 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
249 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
250 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
251 self
.sendECSQuery(query
, expected
, ttlECS
)
253 # check that a query in the same ECS range is a hit
254 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
255 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
256 self
.checkECSQueryHit(query
, expected
)
258 # check that a query in a different ECS range is a miss
259 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
260 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
261 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
262 self
.sendECSQuery(query
, expected
)
265 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
266 query
= dns
.message
.make_query(nameECS
, 'TXT')
267 self
.sendECSQuery(query
, expected
, ttlECS
)
269 def testRequireNoECS(self
):
270 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
272 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
273 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
274 self
.sendECSQuery(query
, expected
, ttlECS
)
276 class testIncomingECSByNameLarger(ECSTest
):
277 _confdir
= 'ECSIncomingByNameLarger'
279 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
280 use-incoming-edns-subnet=yes
282 forward-zones=ecs-echo.example=%s.21
283 ecs-scope-zero-address=192.168.0.1
284 """ % (os
.environ
['PREFIX'])
286 def testSendECS(self
):
287 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
289 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
290 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
291 self
.sendECSQuery(query
, expected
, ttlECS
)
294 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
296 query
= dns
.message
.make_query(nameECS
, 'TXT')
297 self
.sendECSQuery(query
, expected
, ttlECS
)
299 def testRequireNoECS(self
):
300 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
302 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
303 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
304 self
.sendECSQuery(query
, expected
, ttlECS
)
306 class testIncomingECSByNameSmaller(ECSTest
):
307 _confdir
= 'ECSIncomingByNameSmaller'
309 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
310 use-incoming-edns-subnet=yes
312 forward-zones=ecs-echo.example=%s.21
313 ecs-scope-zero-address=192.168.0.1
314 """ % (os
.environ
['PREFIX'])
316 def testSendECS(self
):
317 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
318 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
319 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
320 self
.sendECSQuery(query
, expected
)
323 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
324 query
= dns
.message
.make_query(nameECS
, 'TXT')
325 self
.sendECSQuery(query
, expected
)
327 def testRequireNoECS(self
):
328 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
330 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
331 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
332 self
.sendECSQuery(query
, expected
, ttlECS
)
334 class testIncomingECSByNameV6(ECSTest
):
335 _confdir
= 'ECSIncomingByNameV6'
337 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
338 use-incoming-edns-subnet=yes
340 forward-zones=ecs-echo.example=%s.21
341 query-local-address6=::1
342 """ % (os
.environ
['PREFIX'])
344 def testSendECS(self
):
345 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
346 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
347 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
348 self
.sendECSQuery(query
, expected
, ttlECS
)
351 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
353 query
= dns
.message
.make_query(nameECS
, 'TXT')
354 res
= self
.sendUDPQuery(query
)
355 self
.sendECSQuery(query
, expected
, ttlECS
)
357 def testRequireNoECS(self
):
358 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
359 # but query-local-address6 is set to ::1
360 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
362 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
363 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
364 self
.sendECSQuery(query
, expected
, ttlECS
)
366 class testECSNameMismatch(ECSTest
):
367 _confdir
= 'ECSNameMismatch'
369 _config_template
= """edns-subnet-whitelist=not-the-right-name.example.
370 forward-zones=ecs-echo.example=%s.21
371 """ % (os
.environ
['PREFIX'])
373 def testSendECS(self
):
374 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
375 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
376 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
377 self
.sendECSQuery(query
, expected
)
380 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
381 query
= dns
.message
.make_query(nameECS
, 'TXT')
382 self
.sendECSQuery(query
, expected
)
384 def testRequireNoECS(self
):
385 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
387 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
388 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
389 self
.sendECSQuery(query
, expected
)
391 class testECSByIP(ECSTest
):
394 _config_template
= """edns-subnet-whitelist=%s.21
395 forward-zones=ecs-echo.example=%s.21
396 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
398 def testSendECS(self
):
399 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
400 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
401 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
402 self
.sendECSQuery(query
, expected
)
405 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
406 query
= dns
.message
.make_query(nameECS
, 'TXT')
407 self
.sendECSQuery(query
, expected
)
409 def testRequireNoECS(self
):
410 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
412 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
413 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
414 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
415 self
.sendECSQuery(query
, expected
)
417 class testIncomingECSByIP(ECSTest
):
418 _confdir
= 'ECSIncomingByIP'
420 _config_template
= """edns-subnet-whitelist=%s.21
421 use-incoming-edns-subnet=yes
422 forward-zones=ecs-echo.example=%s.21
423 ecs-scope-zero-address=::1
424 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
426 def testSendECS(self
):
427 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
429 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
430 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
431 self
.sendECSQuery(query
, expected
)
434 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
435 query
= dns
.message
.make_query(nameECS
, 'TXT')
436 self
.sendECSQuery(query
, expected
)
438 def testRequireNoECS(self
):
439 # we will get ::1 because ecs-scope-zero-addr is set to ::1
440 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
442 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
443 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
444 self
.sendECSQuery(query
, expected
, ttlECS
)
446 class testECSIPMismatch(ECSTest
):
447 _confdir
= 'ECSIPMismatch'
449 _config_template
= """edns-subnet-whitelist=192.0.2.1
450 forward-zones=ecs-echo.example=%s.21
451 """ % (os
.environ
['PREFIX'])
453 def testSendECS(self
):
454 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
455 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
456 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
457 self
.sendECSQuery(query
, expected
)
460 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
461 query
= dns
.message
.make_query(nameECS
, 'TXT')
462 res
= self
.sendUDPQuery(query
)
463 self
.sendECSQuery(query
, expected
)
465 def testRequireNoECS(self
):
466 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
468 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
469 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
470 self
.sendECSQuery(query
, expected
)
472 class UDPECSResponder(DatagramProtocol
):
475 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
476 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
477 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
478 ip
= socket
.inet_ntop(socket
.AF_INET6
,
481 option
.ip
& (2 ** 64 - 1)))
484 def datagramReceived(self
, datagram
, address
):
485 request
= dns
.message
.from_wire(datagram
)
487 response
= dns
.message
.make_response(request
)
488 response
.flags |
= dns
.flags
.AA
491 if request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
493 for option
in request
.options
:
494 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
495 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
496 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
498 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
499 response
.answer
.append(answer
)
500 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
501 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
502 response
.answer
.append(answer
)
503 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', cls
._PREFIX
+ '.21')
504 response
.additional
.append(additional
)
507 response
.options
= [ecso
]
509 self
.transport
.write(response
.to_wire(), address
)