]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_ECS.py
rec: Add ECS regression tests
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ECS.py
CommitLineData
9a0b88e8
RG
1import dns
2import os
3import socket
4import struct
5import threading
6import clientsubnetoption
7from recursortests import RecursorTest
8from twisted.internet.protocol import DatagramProtocol
9from twisted.internet import reactor
10
11emptyECSText = 'No ECS received'
12nameECS = 'ecs-echo.example.'
13
14class ECSTest(RecursorTest):
15
16 @classmethod
17 def startResponders(cls):
18 print("Launching responders..")
19
20 address = cls._PREFIX + '.21'
21 port = 53
22
23 if not reactor.running:
24 reactor.listenUDP(port, UDPECSResponder(), interface=address)
25
26 cls._UDPResponder = threading.Thread(name='UDP ECS Responder', target=reactor.run, args=(False,))
27 cls._UDPResponder.setDaemon(True)
28 cls._UDPResponder.start()
29
30 @classmethod
31 def tearDownResponders(cls):
32 reactor.stop()
33
34 @classmethod
35 def setUpClass(cls):
36 cls.setUpSockets()
37
38 cls.startResponders()
39
40 confdir = os.path.join('configs', cls._confdir)
41 cls.createConfigDir(confdir)
42
43 cls.generateRecursorConfig(confdir)
44 cls.startRecursor(confdir, cls._recursorPort)
45
46 print("Launching tests..")
47
48 @classmethod
49 def tearDownClass(cls):
50 cls.tearDownRecursor()
51
52class testNoECS(ECSTest):
53 _confdir = 'NoECS'
54
55 _config_template = """edns-subnet-whitelist=
56forward-zones=ecs-echo.example=%s.21
57 """ % (os.environ['PREFIX'])
58
59 def testSendECS(self):
60 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
61
62 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
63 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
64 res = self.sendUDPQuery(query)
65
66 self.assertRcodeEqual(res, dns.rcode.NOERROR)
67 self.assertRRsetInAnswer(res, expected)
68
69 def testNoECS(self):
70 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
71
72 query = dns.message.make_query(nameECS, 'TXT')
73 res = self.sendUDPQuery(query)
74
75 self.assertRcodeEqual(res, dns.rcode.NOERROR)
76 self.assertRRsetInAnswer(res, expected)
77
78class testIncomingNoECS(ECSTest):
79 _confdir = 'IncomingNoECS'
80
81 _config_template = """edns-subnet-whitelist=
82use-incoming-edns-subnet=yes
83forward-zones=ecs-echo.example=%s.21
84 """ % (os.environ['PREFIX'])
85
86 def testSendECS(self):
87 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
88
89 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
90 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
91 res = self.sendUDPQuery(query)
92
93 self.assertRcodeEqual(res, dns.rcode.NOERROR)
94 self.assertRRsetInAnswer(res, expected)
95
96 def testNoECS(self):
97 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
98
99 query = dns.message.make_query(nameECS, 'TXT')
100 res = self.sendUDPQuery(query)
101
102 self.assertRcodeEqual(res, dns.rcode.NOERROR)
103 self.assertRRsetInAnswer(res, expected)
104
105class testECSByName(ECSTest):
106 _confdir = 'ECSByName'
107
108 _config_template = """edns-subnet-whitelist=ecs-echo.example.
109forward-zones=ecs-echo.example=%s.21
110 """ % (os.environ['PREFIX'])
111
112 def testSendECS(self):
113 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
114
115 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
116 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
117 res = self.sendUDPQuery(query)
118
119 self.assertRcodeEqual(res, dns.rcode.NOERROR)
120 self.assertRRsetInAnswer(res, expected)
121
122 def testNoECS(self):
123 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
124
125 query = dns.message.make_query(nameECS, 'TXT')
126 res = self.sendUDPQuery(query)
127
128 self.assertRcodeEqual(res, dns.rcode.NOERROR)
129 self.assertRRsetInAnswer(res, expected)
130
131class testECSByNameLarger(ECSTest):
132 _confdir = 'ECSByNameLarger'
133
134 _config_template = """edns-subnet-whitelist=ecs-echo.example.
135ecs-ipv4-bits=32
136forward-zones=ecs-echo.example=%s.21
137 """ % (os.environ['PREFIX'])
138
139 def testSendECS(self):
140 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
141
142 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
143 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
144 res = self.sendUDPQuery(query)
145
146 self.assertRcodeEqual(res, dns.rcode.NOERROR)
147 self.assertRRsetInAnswer(res, expected)
148
149 def testNoECS(self):
150 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
151
152 query = dns.message.make_query(nameECS, 'TXT')
153 res = self.sendUDPQuery(query)
154
155 self.assertRcodeEqual(res, dns.rcode.NOERROR)
156 self.assertRRsetInAnswer(res, expected)
157
158class testECSByNameSmaller(ECSTest):
159 _confdir = 'ECSByNameLarger'
160
161 _config_template = """edns-subnet-whitelist=ecs-echo.example.
162ecs-ipv4-bits=16
163forward-zones=ecs-echo.example=%s.21
164 """ % (os.environ['PREFIX'])
165
166 def testSendECS(self):
167 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
168
169 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
170 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
171 res = self.sendUDPQuery(query)
172
173 self.assertRcodeEqual(res, dns.rcode.NOERROR)
174 self.assertRRsetInAnswer(res, expected)
175
176 def testNoECS(self):
177 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
178
179 query = dns.message.make_query(nameECS, 'TXT')
180 res = self.sendUDPQuery(query)
181
182 self.assertRcodeEqual(res, dns.rcode.NOERROR)
183 self.assertRRsetInAnswer(res, expected)
184
185class testIncomingECSByName(ECSTest):
186 _confdir = 'ECSIncomingByName'
187
188 _config_template = """edns-subnet-whitelist=ecs-echo.example.
189use-incoming-edns-subnet=yes
190forward-zones=ecs-echo.example=%s.21
191 """ % (os.environ['PREFIX'])
192
193 def testSendECS(self):
194 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
195
196 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
197 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
198 res = self.sendUDPQuery(query)
199
200 self.assertRcodeEqual(res, dns.rcode.NOERROR)
201 self.assertRRsetInAnswer(res, expected)
202
203 def testNoECS(self):
204 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
205
206 query = dns.message.make_query(nameECS, 'TXT')
207 res = self.sendUDPQuery(query)
208
209 self.assertRcodeEqual(res, dns.rcode.NOERROR)
210 self.assertRRsetInAnswer(res, expected)
211
212class testIncomingECSByNameLarger(ECSTest):
213 _confdir = 'ECSIncomingByNameLarger'
214
215 _config_template = """edns-subnet-whitelist=ecs-echo.example.
216use-incoming-edns-subnet=yes
217ecs-ipv4-bits=32
218forward-zones=ecs-echo.example=%s.21
219 """ % (os.environ['PREFIX'])
220
221 def testSendECS(self):
222 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
223
224 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
225 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
226 res = self.sendUDPQuery(query)
227
228 self.assertRcodeEqual(res, dns.rcode.NOERROR)
229 self.assertRRsetInAnswer(res, expected)
230
231 def testNoECS(self):
232 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
233
234 query = dns.message.make_query(nameECS, 'TXT')
235 res = self.sendUDPQuery(query)
236
237 self.assertRcodeEqual(res, dns.rcode.NOERROR)
238 self.assertRRsetInAnswer(res, expected)
239
240class testIncomingECSByNameSmaller(ECSTest):
241 _confdir = 'ECSIncomingByNameSmaller'
242
243 _config_template = """edns-subnet-whitelist=ecs-echo.example.
244use-incoming-edns-subnet=yes
245ecs-ipv4-bits=16
246forward-zones=ecs-echo.example=%s.21
247 """ % (os.environ['PREFIX'])
248
249 def testSendECS(self):
250 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
251
252 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
253 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
254 res = self.sendUDPQuery(query)
255
256 self.assertRcodeEqual(res, dns.rcode.NOERROR)
257 self.assertRRsetInAnswer(res, expected)
258
259 def testNoECS(self):
260 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
261
262 query = dns.message.make_query(nameECS, 'TXT')
263 res = self.sendUDPQuery(query)
264
265 self.assertRcodeEqual(res, dns.rcode.NOERROR)
266 self.assertRRsetInAnswer(res, expected)
267
268class testIncomingECSByNameV6(ECSTest):
269 _confdir = 'ECSIncomingByNameV6'
270
271 _config_template = """edns-subnet-whitelist=ecs-echo.example.
272use-incoming-edns-subnet=yes
273ecs-ipv6-bits=128
274forward-zones=ecs-echo.example=%s.21
275 """ % (os.environ['PREFIX'])
276
277 def testSendECS(self):
278 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
279
280 ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
281 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
282 res = self.sendUDPQuery(query)
283
284 self.assertRcodeEqual(res, dns.rcode.NOERROR)
285 self.assertRRsetInAnswer(res, expected)
286
287 def testNoECS(self):
288 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
289
290 query = dns.message.make_query(nameECS, 'TXT')
291 res = self.sendUDPQuery(query)
292
293 self.assertRcodeEqual(res, dns.rcode.NOERROR)
294 self.assertRRsetInAnswer(res, expected)
295
296class testECSNameMismatch(ECSTest):
297 _confdir = 'ECSNameMismatch'
298
299 _config_template = """edns-subnet-whitelist=not-the-right-name.example.
300forward-zones=ecs-echo.example=%s.21
301 """ % (os.environ['PREFIX'])
302
303 def testSendECS(self):
304 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
305
306 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
307 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
308 res = self.sendUDPQuery(query)
309
310 self.assertRcodeEqual(res, dns.rcode.NOERROR)
311 self.assertRRsetInAnswer(res, expected)
312
313 def testNoECS(self):
314 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
315
316 query = dns.message.make_query(nameECS, 'TXT')
317 res = self.sendUDPQuery(query)
318
319 self.assertRcodeEqual(res, dns.rcode.NOERROR)
320 self.assertRRsetInAnswer(res, expected)
321
322class testECSByIP(ECSTest):
323 _confdir = 'ECSByIP'
324
325 _config_template = """edns-subnet-whitelist=%s.21
326forward-zones=ecs-echo.example=%s.21
327 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
328
329 def testSendECS(self):
330 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
331
332 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
333 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
334 res = self.sendUDPQuery(query)
335
336 self.assertRcodeEqual(res, dns.rcode.NOERROR)
337 self.assertRRsetInAnswer(res, expected)
338
339 def testNoECS(self):
340 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
341
342 query = dns.message.make_query(nameECS, 'TXT')
343 res = self.sendUDPQuery(query)
344
345 self.assertRcodeEqual(res, dns.rcode.NOERROR)
346 self.assertRRsetInAnswer(res, expected)
347
348class testIncomingECSByIP(ECSTest):
349 _confdir = 'ECSIncomingByIP'
350
351 _config_template = """edns-subnet-whitelist=%s.21
352use-incoming-edns-subnet=yes
353forward-zones=ecs-echo.example=%s.21
354 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
355
356 def testSendECS(self):
357 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
358
359 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
360 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
361 res = self.sendUDPQuery(query)
362
363 self.assertRcodeEqual(res, dns.rcode.NOERROR)
364 self.assertRRsetInAnswer(res, expected)
365
366 def testNoECS(self):
367 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
368
369 query = dns.message.make_query(nameECS, 'TXT')
370 res = self.sendUDPQuery(query)
371
372 self.assertRcodeEqual(res, dns.rcode.NOERROR)
373 self.assertRRsetInAnswer(res, expected)
374
375class testECSIPMismatch(ECSTest):
376 _confdir = 'ECSIPMismatch'
377
378 _config_template = """edns-subnet-whitelist=192.0.2.1
379forward-zones=ecs-echo.example=%s.21
380 """ % (os.environ['PREFIX'])
381
382 def testSendECS(self):
383 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
384
385 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
386 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
387 res = self.sendUDPQuery(query)
388
389 self.assertRcodeEqual(res, dns.rcode.NOERROR)
390 self.assertRRsetInAnswer(res, expected)
391
392 def testNoECS(self):
393 expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
394
395 query = dns.message.make_query(nameECS, 'TXT')
396 res = self.sendUDPQuery(query)
397
398 self.assertRcodeEqual(res, dns.rcode.NOERROR)
399 self.assertRRsetInAnswer(res, expected)
400
401class UDPECSResponder(DatagramProtocol):
402 @staticmethod
403 def ipToStr(option):
404 if option.family == clientsubnetoption.FAMILY_IPV4:
405 ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', option.ip))
406 elif option.family == clientsubnetoption.FAMILY_IPV6:
407 ip = socket.inet_ntop(socket.AF_INET6,
408 struct.pack('!QQ',
409 option.ip >> 64,
410 option.ip & (2 ** 64 - 1)))
411 return ip
412
413 def datagramReceived(self, datagram, address):
414 request = dns.message.from_wire(datagram)
415
416 response = dns.message.make_response(request)
417
418 if request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.TXT:
419 text = emptyECSText
420 for option in request.options:
421 if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
422 text = self.ipToStr(option) + '/' + str(option.mask)
423
424 answer = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', text)
425 response.answer.append(answer)
426 elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
427 answer = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
428 response.answer.append(answer)
429 additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', cls._PREFIX + '.21')
430 response.additional.append(additional)
431
432 self.transport.write(response.to_wire(), address)