]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ECS.py
7 import clientsubnetoption
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 emptyECSText
= 'No ECS received'
13 nameECS
= 'ecs-echo.example.'
15 ecsReactorRunning
= False
17 class ECSTest(RecursorTest
):
18 _config_template_default
= """
23 local-address=127.0.0.1
25 packetcache-servfail-ttl=0
32 def sendECSQuery(self
, query
, expected
, expectedFirstTTL
=None):
33 res
= self
.sendUDPQuery(query
)
35 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
36 self
.assertRRsetInAnswer(res
, expected
)
37 # this will break if you are not looking for the first RR, sorry!
38 if expectedFirstTTL
is not None:
39 self
.assertEqual(res
.answer
[0].ttl
, expectedFirstTTL
)
41 expectedFirstTTL
= res
.answer
[0].ttl
43 # wait one second, check that the TTL has been
44 # decreased indicating a cache hit
47 res
= self
.sendUDPQuery(query
)
49 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
50 self
.assertRRsetInAnswer(res
, expected
)
51 self
.assertLess(res
.answer
[0].ttl
, expectedFirstTTL
)
53 def checkECSQueryHit(self
, query
, expected
):
54 res
= self
.sendUDPQuery(query
)
56 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
57 self
.assertRRsetInAnswer(res
, expected
)
58 # this will break if you are not looking for the first RR, sorry!
59 self
.assertLess(res
.answer
[0].ttl
, ttlECS
)
62 def startResponders(cls
):
63 global ecsReactorRunning
64 print("Launching responders..")
66 address
= cls
._PREFIX
+ '.21'
69 if not ecsReactorRunning
:
70 reactor
.listenUDP(port
, UDPECSResponder(), interface
=address
)
71 ecsReactorRunning
= True
73 if not reactor
.running
:
74 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
75 cls
._UDPResponder
.setDaemon(True)
76 cls
._UDPResponder
.start()
84 confdir
= os
.path
.join('configs', cls
._confdir
)
85 cls
.createConfigDir(confdir
)
87 cls
.generateRecursorConfig(confdir
)
88 cls
.startRecursor(confdir
, cls
._recursorPort
)
90 print("Launching tests..")
93 def tearDownClass(cls
):
94 cls
.tearDownRecursor()
96 class testNoECS(ECSTest
):
99 _config_template
= """edns-subnet-whitelist=
100 forward-zones=ecs-echo.example=%s.21
101 """ % (os
.environ
['PREFIX'])
103 def testSendECS(self
):
104 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
105 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
106 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
107 self
.sendECSQuery(query
, expected
)
110 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
111 query
= dns
.message
.make_query(nameECS
, 'TXT')
112 self
.sendECSQuery(query
, expected
)
114 def testRequireNoECS(self
):
115 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
117 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
118 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
119 self
.sendECSQuery(query
, expected
)
121 class testIncomingNoECS(ECSTest
):
122 _confdir
= 'IncomingNoECS'
124 _config_template
= """edns-subnet-whitelist=
125 use-incoming-edns-subnet=yes
126 forward-zones=ecs-echo.example=%s.21
127 """ % (os
.environ
['PREFIX'])
129 def testSendECS(self
):
130 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
132 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
133 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
134 self
.sendECSQuery(query
, expected
)
137 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
139 query
= dns
.message
.make_query(nameECS
, 'TXT')
140 self
.sendECSQuery(query
, expected
)
142 def testRequireNoECS(self
):
143 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
145 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
146 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
147 self
.sendECSQuery(query
, expected
)
149 class testECSByName(ECSTest
):
150 _confdir
= 'ECSByName'
152 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
153 forward-zones=ecs-echo.example=%s.21
154 """ % (os
.environ
['PREFIX'])
156 def testSendECS(self
):
157 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
158 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
159 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
160 self
.sendECSQuery(query
, expected
)
162 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
163 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
164 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
165 self
.checkECSQueryHit(query
, expected
)
168 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
169 query
= dns
.message
.make_query(nameECS
, 'TXT')
170 self
.sendECSQuery(query
, expected
)
172 def testRequireNoECS(self
):
173 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
175 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
176 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
177 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
178 self
.sendECSQuery(query
, expected
)
180 class testECSByNameLarger(ECSTest
):
181 _confdir
= 'ECSByNameLarger'
183 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
185 forward-zones=ecs-echo.example=%s.21
186 """ % (os
.environ
['PREFIX'])
188 def testSendECS(self
):
189 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
190 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
191 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
192 self
.sendECSQuery(query
, expected
)
194 # check that a query in a different range is a miss
195 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
196 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
197 self
.sendECSQuery(query
, expected
)
200 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
201 query
= dns
.message
.make_query(nameECS
, 'TXT')
202 self
.sendECSQuery(query
, expected
)
204 def testRequireNoECS(self
):
205 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
207 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
208 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
209 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
210 self
.sendECSQuery(query
, expected
)
212 class testECSByNameSmaller(ECSTest
):
213 _confdir
= 'ECSByNameLarger'
215 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
217 forward-zones=ecs-echo.example=%s.21
218 """ % (os
.environ
['PREFIX'])
220 def testSendECS(self
):
221 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
222 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
223 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
224 self
.sendECSQuery(query
, expected
)
227 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
228 query
= dns
.message
.make_query(nameECS
, 'TXT')
229 self
.sendECSQuery(query
, expected
)
231 def testRequireNoECS(self
):
232 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
234 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
235 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
236 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
237 self
.sendECSQuery(query
, expected
)
239 class testIncomingECSByName(ECSTest
):
240 _confdir
= 'ECSIncomingByName'
242 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
243 use-incoming-edns-subnet=yes
244 forward-zones=ecs-echo.example=%s.21
245 ecs-scope-zero-address=2001:db8::42
246 """ % (os
.environ
['PREFIX'])
248 def testSendECS(self
):
249 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
250 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
251 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
252 self
.sendECSQuery(query
, expected
, ttlECS
)
254 # check that a query in the same ECS range is a hit
255 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.2', 32)
256 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
257 self
.checkECSQueryHit(query
, expected
)
259 # check that a query in a different ECS range is a miss
260 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.1.2.0/24')
261 ecso
= clientsubnetoption
.ClientSubnetOption('192.1.2.2', 32)
262 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
263 self
.sendECSQuery(query
, expected
)
266 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
267 query
= dns
.message
.make_query(nameECS
, 'TXT')
268 self
.sendECSQuery(query
, expected
, ttlECS
)
270 def testRequireNoECS(self
):
271 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "2001:db8::42/128")
273 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
274 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
275 self
.sendECSQuery(query
, expected
, ttlECS
)
277 class testIncomingECSByNameLarger(ECSTest
):
278 _confdir
= 'ECSIncomingByNameLarger'
280 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
281 use-incoming-edns-subnet=yes
283 forward-zones=ecs-echo.example=%s.21
284 ecs-scope-zero-address=192.168.0.1
285 """ % (os
.environ
['PREFIX'])
287 def testSendECS(self
):
288 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.1/32')
290 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
291 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
292 self
.sendECSQuery(query
, expected
, ttlECS
)
295 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.1/32')
297 query
= dns
.message
.make_query(nameECS
, 'TXT')
298 self
.sendECSQuery(query
, expected
, ttlECS
)
300 def testRequireNoECS(self
):
301 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
303 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
304 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
305 self
.sendECSQuery(query
, expected
, ttlECS
)
307 class testIncomingECSByNameSmaller(ECSTest
):
308 _confdir
= 'ECSIncomingByNameSmaller'
310 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
311 use-incoming-edns-subnet=yes
313 forward-zones=ecs-echo.example=%s.21
314 ecs-scope-zero-address=192.168.0.1
315 """ % (os
.environ
['PREFIX'])
317 def testSendECS(self
):
318 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.0.0/16')
319 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
320 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
321 self
.sendECSQuery(query
, expected
)
324 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/16')
325 query
= dns
.message
.make_query(nameECS
, 'TXT')
326 self
.sendECSQuery(query
, expected
)
328 def testRequireNoECS(self
):
329 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.168.0.1/32')
331 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
332 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
333 self
.sendECSQuery(query
, expected
, ttlECS
)
335 class testIncomingECSByNameV6(ECSTest
):
336 _confdir
= 'ECSIncomingByNameV6'
338 _config_template
= """edns-subnet-whitelist=ecs-echo.example.
339 use-incoming-edns-subnet=yes
341 forward-zones=ecs-echo.example=%s.21
342 query-local-address6=::1
343 """ % (os
.environ
['PREFIX'])
345 def testSendECS(self
):
346 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '2001:db8::1/128')
347 ecso
= clientsubnetoption
.ClientSubnetOption('2001:db8::1', 128)
348 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
349 self
.sendECSQuery(query
, expected
, ttlECS
)
352 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
354 query
= dns
.message
.make_query(nameECS
, 'TXT')
355 res
= self
.sendUDPQuery(query
)
356 self
.sendECSQuery(query
, expected
, ttlECS
)
358 def testRequireNoECS(self
):
359 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
360 # but query-local-address6 is set to ::1
361 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', "::1/128")
363 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
364 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
365 self
.sendECSQuery(query
, expected
, ttlECS
)
367 class testECSNameMismatch(ECSTest
):
368 _confdir
= 'ECSNameMismatch'
370 _config_template
= """edns-subnet-whitelist=not-the-right-name.example.
371 forward-zones=ecs-echo.example=%s.21
372 """ % (os
.environ
['PREFIX'])
374 def testSendECS(self
):
375 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
376 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
377 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
378 self
.sendECSQuery(query
, expected
)
381 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
382 query
= dns
.message
.make_query(nameECS
, 'TXT')
383 self
.sendECSQuery(query
, expected
)
385 def testRequireNoECS(self
):
386 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
388 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
389 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
390 self
.sendECSQuery(query
, expected
)
392 class testECSByIP(ECSTest
):
395 _config_template
= """edns-subnet-whitelist=%s.21
396 forward-zones=ecs-echo.example=%s.21
397 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
399 def testSendECS(self
):
400 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
401 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
402 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
403 self
.sendECSQuery(query
, expected
)
406 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
407 query
= dns
.message
.make_query(nameECS
, 'TXT')
408 self
.sendECSQuery(query
, expected
)
410 def testRequireNoECS(self
):
411 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
413 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
414 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
415 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
416 self
.sendECSQuery(query
, expected
)
418 class testIncomingECSByIP(ECSTest
):
419 _confdir
= 'ECSIncomingByIP'
421 _config_template
= """edns-subnet-whitelist=%s.21
422 use-incoming-edns-subnet=yes
423 forward-zones=ecs-echo.example=%s.21
424 ecs-scope-zero-address=::1
425 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
427 def testSendECS(self
):
428 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '192.0.2.0/24')
430 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
431 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
432 self
.sendECSQuery(query
, expected
)
435 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '127.0.0.0/24')
436 query
= dns
.message
.make_query(nameECS
, 'TXT')
437 self
.sendECSQuery(query
, expected
)
439 def testRequireNoECS(self
):
440 # we will get ::1 because ecs-scope-zero-addr is set to ::1
441 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', '::1/128')
443 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
444 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
445 self
.sendECSQuery(query
, expected
, ttlECS
)
447 class testECSIPMismatch(ECSTest
):
448 _confdir
= 'ECSIPMismatch'
450 _config_template
= """edns-subnet-whitelist=192.0.2.1
451 forward-zones=ecs-echo.example=%s.21
452 """ % (os
.environ
['PREFIX'])
454 def testSendECS(self
):
455 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
456 ecso
= clientsubnetoption
.ClientSubnetOption('192.0.2.1', 32)
457 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
458 self
.sendECSQuery(query
, expected
)
461 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
462 query
= dns
.message
.make_query(nameECS
, 'TXT')
463 res
= self
.sendUDPQuery(query
)
464 self
.sendECSQuery(query
, expected
)
466 def testRequireNoECS(self
):
467 expected
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', emptyECSText
)
469 ecso
= clientsubnetoption
.ClientSubnetOption('0.0.0.0', 0)
470 query
= dns
.message
.make_query(nameECS
, 'TXT', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
471 self
.sendECSQuery(query
, expected
)
473 class UDPECSResponder(DatagramProtocol
):
476 if option
.family
== clientsubnetoption
.FAMILY_IPV4
:
477 ip
= socket
.inet_ntop(socket
.AF_INET
, struct
.pack('!L', option
.ip
))
478 elif option
.family
== clientsubnetoption
.FAMILY_IPV6
:
479 ip
= socket
.inet_ntop(socket
.AF_INET6
,
482 option
.ip
& (2 ** 64 - 1)))
485 def datagramReceived(self
, datagram
, address
):
486 request
= dns
.message
.from_wire(datagram
)
488 response
= dns
.message
.make_response(request
)
489 response
.flags |
= dns
.flags
.AA
492 if request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.TXT
:
494 for option
in request
.options
:
495 if option
.otype
== clientsubnetoption
.ASSIGNED_OPTION_CODE
and isinstance(option
, clientsubnetoption
.ClientSubnetOption
):
496 text
= self
.ipToStr(option
) + '/' + str(option
.mask
)
497 ecso
= clientsubnetoption
.ClientSubnetOption(self
.ipToStr(option
), option
.mask
, option
.mask
)
499 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'TXT', text
)
500 response
.answer
.append(answer
)
501 elif request
.question
[0].name
== dns
.name
.from_text(nameECS
) and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
502 answer
= dns
.rrset
.from_text(nameECS
, ttlECS
, dns
.rdataclass
.IN
, 'NS', 'ns1.ecs-echo.example.')
503 response
.answer
.append(answer
)
504 additional
= dns
.rrset
.from_text('ns1.ecs-echo.example.', 15, dns
.rdataclass
.IN
, 'A', cls
._PREFIX
+ '.21')
505 response
.additional
.append(additional
)
508 response
.options
= [ecso
]
510 self
.transport
.write(response
.to_wire(), address
)