]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_ECS.py
Merge pull request #7484 from omoerbeek/no-utility-random
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
CommitLineData
9a0b88e8
RG
1import dns
2import os
3import socket
4import struct
5import threading
a5849a16 6import time
9a0b88e8
RG
7import clientsubnetoption
8from recursortests import RecursorTest
9from twisted.internet.protocol import DatagramProtocol
10from twisted.internet import reactor
11
12emptyECSText = 'No ECS received'
13nameECS = 'ecs-echo.example.'
635a6765 14nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
a5849a16 15ttlECS = 60
8a3a3822 16ecsReactorRunning = False
9a0b88e8
RG
17
18class ECSTest(RecursorTest):
a5849a16
RG
19 _config_template_default = """
20daemon=no
21trace=yes
22dont-query=
2fe3354d 23ecs-add-for=0.0.0.0/0
a5849a16
RG
24local-address=127.0.0.1
25packetcache-ttl=0
26packetcache-servfail-ttl=0
27max-cache-ttl=600
28threads=1
29loglevel=9
30disable-syslog=yes
31"""
32
33 def sendECSQuery(self, query, expected, expectedFirstTTL=None):
34 res = self.sendUDPQuery(query)
35
36 self.assertRcodeEqual(res, dns.rcode.NOERROR)
37 self.assertRRsetInAnswer(res, expected)
38 # this will break if you are not looking for the first RR, sorry!
39 if expectedFirstTTL is not None:
40 self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
41 else:
42 expectedFirstTTL = res.answer[0].ttl
43
44 # wait one second, check that the TTL has been
45 # decreased indicating a cache hit
46 time.sleep(1)
47
48 res = self.sendUDPQuery(query)
49
50 self.assertRcodeEqual(res, dns.rcode.NOERROR)
51 self.assertRRsetInAnswer(res, expected)
52 self.assertLess(res.answer[0].ttl, expectedFirstTTL)
53
54 def checkECSQueryHit(self, query, expected):
55 res = self.sendUDPQuery(query)
56
57 self.assertRcodeEqual(res, dns.rcode.NOERROR)
58 self.assertRRsetInAnswer(res, expected)
59 # this will break if you are not looking for the first RR, sorry!
60 self.assertLess(res.answer[0].ttl, ttlECS)
9a0b88e8
RG
61
62 @classmethod
63 def startResponders(cls):
8a3a3822 64 global ecsReactorRunning
9a0b88e8
RG
65 print("Launching responders..")
66
67 address = cls._PREFIX + '.21'
68 port = 53
69
8a3a3822 70 if not ecsReactorRunning:
9a0b88e8 71 reactor.listenUDP(port, UDPECSResponder(), interface=address)
8a3a3822 72 ecsReactorRunning = True
9a0b88e8 73
8a3a3822 74 if not reactor.running:
fbfaa4a7 75 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
9a0b88e8
RG
76 cls._UDPResponder.setDaemon(True)
77 cls._UDPResponder.start()
78
9a0b88e8
RG
79 @classmethod
80 def setUpClass(cls):
81 cls.setUpSockets()
82
83 cls.startResponders()
84
85 confdir = os.path.join('configs', cls._confdir)
86 cls.createConfigDir(confdir)
87
88 cls.generateRecursorConfig(confdir)
89 cls.startRecursor(confdir, cls._recursorPort)
90
91 print("Launching tests..")
92
93 @classmethod
94 def tearDownClass(cls):
95 cls.tearDownRecursor()
96
97class testNoECS(ECSTest):
98 _confdir = 'NoECS'
99
100 _config_template = """edns-subnet-whitelist=
101forward-zones=ecs-echo.example=%s.21
102 """ % (os.environ['PREFIX'])
103
104 def testSendECS(self):
a5849a16 105 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
106 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
107 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 108 self.sendECSQuery(query, expected)
9a0b88e8
RG
109
110 def testNoECS(self):
a5849a16 111 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8 112 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 113 self.sendECSQuery(query, expected)
9a0b88e8 114
8a3a3822
RG
115 def testRequireNoECS(self):
116 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
117
118 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
119 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
120 self.sendECSQuery(query, expected)
121
9a0b88e8
RG
122class testIncomingNoECS(ECSTest):
123 _confdir = 'IncomingNoECS'
124
125 _config_template = """edns-subnet-whitelist=
126use-incoming-edns-subnet=yes
127forward-zones=ecs-echo.example=%s.21
128 """ % (os.environ['PREFIX'])
129
130 def testSendECS(self):
a5849a16 131 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
132
133 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
134 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 135 self.sendECSQuery(query, expected)
9a0b88e8
RG
136
137 def testNoECS(self):
a5849a16 138 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
139
140 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 141 self.sendECSQuery(query, expected)
9a0b88e8 142
8a3a3822
RG
143 def testRequireNoECS(self):
144 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
145
146 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
147 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
148 self.sendECSQuery(query, expected)
149
9a0b88e8
RG
150class testECSByName(ECSTest):
151 _confdir = 'ECSByName'
152
153 _config_template = """edns-subnet-whitelist=ecs-echo.example.
154forward-zones=ecs-echo.example=%s.21
155 """ % (os.environ['PREFIX'])
156
157 def testSendECS(self):
a5849a16 158 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8
RG
159 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
160 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 161 self.sendECSQuery(query, expected)
9a0b88e8 162
a5849a16
RG
163 # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
164 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
165 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
166 self.checkECSQueryHit(query, expected)
9a0b88e8
RG
167
168 def testNoECS(self):
a5849a16 169 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8 170 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 171 self.sendECSQuery(query, expected)
9a0b88e8 172
8a3a3822
RG
173 def testRequireNoECS(self):
174 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
175
176 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
177 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
178 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
179 self.sendECSQuery(query, expected)
180
9a0b88e8
RG
181class testECSByNameLarger(ECSTest):
182 _confdir = 'ECSByNameLarger'
183
184 _config_template = """edns-subnet-whitelist=ecs-echo.example.
185ecs-ipv4-bits=32
186forward-zones=ecs-echo.example=%s.21
187 """ % (os.environ['PREFIX'])
188
189 def testSendECS(self):
a5849a16 190 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
9a0b88e8
RG
191 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
192 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 193 self.sendECSQuery(query, expected)
9a0b88e8 194
a5849a16
RG
195 # check that a query in a different range is a miss
196 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
197 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
198 self.sendECSQuery(query, expected)
9a0b88e8
RG
199
200 def testNoECS(self):
a5849a16 201 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
9a0b88e8 202 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 203 self.sendECSQuery(query, expected)
9a0b88e8 204
8a3a3822
RG
205 def testRequireNoECS(self):
206 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
207
208 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
209 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
210 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
211 self.sendECSQuery(query, expected)
212
9a0b88e8
RG
213class testECSByNameSmaller(ECSTest):
214 _confdir = 'ECSByNameLarger'
215
216 _config_template = """edns-subnet-whitelist=ecs-echo.example.
217ecs-ipv4-bits=16
218forward-zones=ecs-echo.example=%s.21
219 """ % (os.environ['PREFIX'])
220
221 def testSendECS(self):
a5849a16 222 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
9a0b88e8
RG
223 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
224 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 225 self.sendECSQuery(query, expected)
9a0b88e8
RG
226
227 def testNoECS(self):
a5849a16 228 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
9a0b88e8 229 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 230 self.sendECSQuery(query, expected)
9a0b88e8 231
8a3a3822
RG
232 def testRequireNoECS(self):
233 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
234
235 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
236 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
237 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
238 self.sendECSQuery(query, expected)
239
9a0b88e8
RG
240class testIncomingECSByName(ECSTest):
241 _confdir = 'ECSIncomingByName'
242
243 _config_template = """edns-subnet-whitelist=ecs-echo.example.
244use-incoming-edns-subnet=yes
245forward-zones=ecs-echo.example=%s.21
8a3a3822 246ecs-scope-zero-address=2001:db8::42
9a0b88e8
RG
247 """ % (os.environ['PREFIX'])
248
249 def testSendECS(self):
a5849a16 250 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
9a0b88e8
RG
251 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
252 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 253 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8 254
a5849a16
RG
255 # check that a query in the same ECS range is a hit
256 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
257 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
258 self.checkECSQueryHit(query, expected)
9a0b88e8 259
a5849a16
RG
260 # check that a query in a different ECS range is a miss
261 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
262 ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
263 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
264 self.sendECSQuery(query, expected)
9a0b88e8 265
a5849a16
RG
266 def testNoECS(self):
267 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8 268 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 269 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8 270
8a3a3822
RG
271 def testRequireNoECS(self):
272 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "2001:db8::42/128")
273
274 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
275 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
276 self.sendECSQuery(query, expected, ttlECS)
277
9a0b88e8
RG
278class testIncomingECSByNameLarger(ECSTest):
279 _confdir = 'ECSIncomingByNameLarger'
280
281 _config_template = """edns-subnet-whitelist=ecs-echo.example.
282use-incoming-edns-subnet=yes
283ecs-ipv4-bits=32
284forward-zones=ecs-echo.example=%s.21
8a3a3822 285ecs-scope-zero-address=192.168.0.1
9a0b88e8
RG
286 """ % (os.environ['PREFIX'])
287
288 def testSendECS(self):
a5849a16 289 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
9a0b88e8
RG
290
291 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
292 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 293 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8
RG
294
295 def testNoECS(self):
a5849a16 296 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
9a0b88e8
RG
297
298 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 299 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8 300
8a3a3822
RG
301 def testRequireNoECS(self):
302 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
303
304 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
305 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
306 self.sendECSQuery(query, expected, ttlECS)
307
9a0b88e8
RG
308class testIncomingECSByNameSmaller(ECSTest):
309 _confdir = 'ECSIncomingByNameSmaller'
310
311 _config_template = """edns-subnet-whitelist=ecs-echo.example.
312use-incoming-edns-subnet=yes
313ecs-ipv4-bits=16
314forward-zones=ecs-echo.example=%s.21
8a3a3822 315ecs-scope-zero-address=192.168.0.1
9a0b88e8
RG
316 """ % (os.environ['PREFIX'])
317
318 def testSendECS(self):
a5849a16 319 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
9a0b88e8
RG
320 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
321 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 322 self.sendECSQuery(query, expected)
9a0b88e8
RG
323
324 def testNoECS(self):
a5849a16 325 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
9a0b88e8 326 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 327 self.sendECSQuery(query, expected)
9a0b88e8 328
8a3a3822
RG
329 def testRequireNoECS(self):
330 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.168.0.1/32')
331
332 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
333 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
334 self.sendECSQuery(query, expected, ttlECS)
335
9a0b88e8
RG
336class testIncomingECSByNameV6(ECSTest):
337 _confdir = 'ECSIncomingByNameV6'
338
339 _config_template = """edns-subnet-whitelist=ecs-echo.example.
340use-incoming-edns-subnet=yes
341ecs-ipv6-bits=128
342forward-zones=ecs-echo.example=%s.21
8a3a3822 343query-local-address6=::1
9a0b88e8
RG
344 """ % (os.environ['PREFIX'])
345
346 def testSendECS(self):
a5849a16 347 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
9a0b88e8
RG
348 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
349 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 350 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8
RG
351
352 def testNoECS(self):
a5849a16 353 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8
RG
354
355 query = dns.message.make_query(nameECS, 'TXT')
356 res = self.sendUDPQuery(query)
a5849a16 357 self.sendECSQuery(query, expected, ttlECS)
9a0b88e8 358
8a3a3822
RG
359 def testRequireNoECS(self):
360 # we should get ::1/128 because neither ecs-scope-zero-addr nor query-local-address are set,
361 # but query-local-address6 is set to ::1
362 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', "::1/128")
363
364 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
365 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
366 self.sendECSQuery(query, expected, ttlECS)
367
9a0b88e8
RG
368class testECSNameMismatch(ECSTest):
369 _confdir = 'ECSNameMismatch'
370
371 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
372forward-zones=ecs-echo.example=%s.21
373 """ % (os.environ['PREFIX'])
374
375 def testSendECS(self):
a5849a16 376 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
377 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
378 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 379 self.sendECSQuery(query, expected)
9a0b88e8
RG
380
381 def testNoECS(self):
a5849a16 382 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8 383 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 384 self.sendECSQuery(query, expected)
9a0b88e8 385
8a3a3822
RG
386 def testRequireNoECS(self):
387 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
388
389 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
390 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
391 self.sendECSQuery(query, expected)
392
9a0b88e8
RG
393class testECSByIP(ECSTest):
394 _confdir = 'ECSByIP'
395
396 _config_template = """edns-subnet-whitelist=%s.21
397forward-zones=ecs-echo.example=%s.21
398 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
399
400 def testSendECS(self):
a5849a16 401 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8
RG
402 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
403 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 404 self.sendECSQuery(query, expected)
9a0b88e8
RG
405
406 def testNoECS(self):
a5849a16 407 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8 408 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 409 self.sendECSQuery(query, expected)
9a0b88e8 410
8a3a3822
RG
411 def testRequireNoECS(self):
412 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
413
414 # the request for no ECS is ignored because use-incoming-edns-subnet is not set
415 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
416 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
417 self.sendECSQuery(query, expected)
418
9a0b88e8
RG
419class testIncomingECSByIP(ECSTest):
420 _confdir = 'ECSIncomingByIP'
421
422 _config_template = """edns-subnet-whitelist=%s.21
423use-incoming-edns-subnet=yes
424forward-zones=ecs-echo.example=%s.21
8a3a3822 425ecs-scope-zero-address=::1
9a0b88e8
RG
426 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
427
428 def testSendECS(self):
a5849a16 429 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
9a0b88e8
RG
430
431 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
432 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 433 self.sendECSQuery(query, expected)
9a0b88e8
RG
434
435 def testNoECS(self):
a5849a16 436 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
9a0b88e8 437 query = dns.message.make_query(nameECS, 'TXT')
a5849a16 438 self.sendECSQuery(query, expected)
9a0b88e8 439
8a3a3822
RG
440 def testRequireNoECS(self):
441 # we will get ::1 because ecs-scope-zero-addr is set to ::1
442 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '::1/128')
443
444 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
445 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
446 self.sendECSQuery(query, expected, ttlECS)
447
635a6765
RG
448 def testSendECSInvalidScope(self):
449 # test that the recursor does not cache with a more specific scope than the source it sent
450 expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
451
452 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
453 query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
454
455 self.sendECSQuery(query, expected)
456
9a0b88e8
RG
457class testECSIPMismatch(ECSTest):
458 _confdir = 'ECSIPMismatch'
459
460 _config_template = """edns-subnet-whitelist=192.0.2.1
461forward-zones=ecs-echo.example=%s.21
462 """ % (os.environ['PREFIX'])
463
464 def testSendECS(self):
a5849a16 465 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
466 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
467 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
a5849a16 468 self.sendECSQuery(query, expected)
9a0b88e8
RG
469
470 def testNoECS(self):
a5849a16 471 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
9a0b88e8
RG
472 query = dns.message.make_query(nameECS, 'TXT')
473 res = self.sendUDPQuery(query)
a5849a16 474 self.sendECSQuery(query, expected)
9a0b88e8 475
8a3a3822
RG
476 def testRequireNoECS(self):
477 expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
478
479 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
480 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
481 self.sendECSQuery(query, expected)
482
9a0b88e8
RG
483class UDPECSResponder(DatagramProtocol):
484 @staticmethod
485 def ipToStr(option):
486 if option.family == clientsubnetoption.FAMILY_IPV4:
487 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
488 elif option.family == clientsubnetoption.FAMILY_IPV6:
489 ip = socket.inet_ntop(socket.AF_INET6,
490 struct.pack('!QQ',
491 option.ip >> 64,
492 option.ip & (2 ** 64 - 1)))
493 return ip
494
495 def datagramReceived(self, datagram, address):
496 request = dns.message.from_wire(datagram)
497
498 response = dns.message.make_response(request)
a5849a16
RG
499 response.flags |= dns.flags.AA
500 ecso = None
9a0b88e8 501
635a6765
RG
502 if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
503
9a0b88e8
RG
504 text = emptyECSText
505 for option in request.options:
506 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
507 text = self.ipToStr(option) + '/' + str(option.mask)
508
635a6765
RG
509 # Send a scope more specific than the received source for nameECSInvalidScope
510 if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
511 ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
512 else:
513 ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
514
515 answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
9a0b88e8 516 response.answer.append(answer)
635a6765 517
9a0b88e8 518 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
a5849a16 519 answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
9a0b88e8 520 response.answer.append(answer)
efd26793 521 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
9a0b88e8
RG
522 response.additional.append(additional)
523
a5849a16
RG
524 if ecso:
525 response.options = [ecso]
526
9a0b88e8 527 self.transport.write(response.to_wire(), address)