]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
Merge pull request #12669 from chbruyand/auth-ifurlup-ips
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_AggressiveNSECCache.py
1 import dns
2 from recursortests import RecursorTest
3 import os
4 import requests
5 import subprocess
6 import time
7 import extendederrors
8
9 class AggressiveNSECCacheBase(RecursorTest):
10 __test__ = False
11 _wsPort = 8042
12 _wsTimeout = 10
13 _wsPassword = 'secretpassword'
14 _apiKey = 'secretapikey'
15 #_recursorStartupDelay = 4.0
16 _config_template = """
17 dnssec=validate
18 aggressive-nsec-cache-size=10000
19 webserver=yes
20 webserver-port=%d
21 webserver-address=127.0.0.1
22 webserver-password=%s
23 api-key=%s
24 devonly-regression-test-mode
25 extended-resolution-errors=yes
26 """ % (_wsPort, _wsPassword, _apiKey)
27
28 @classmethod
29 def wipe(cls):
30 confdir = os.path.join('configs', cls._confdir)
31 # Only wipe examples, as wiping the root triggers root NS refreshes
32 cls.wipeRecursorCache(confdir, "example$")
33
34 def getMetric(self, name):
35 headers = {'x-api-key': self._apiKey}
36 url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
37 r = requests.get(url, headers=headers, timeout=self._wsTimeout)
38 self.assertTrue(r)
39 self.assertEqual(r.status_code, 200)
40 self.assertTrue(r.json())
41 content = r.json()
42
43 for entry in content:
44 if entry['name'] == name:
45 return int(entry['value'])
46
47 self.assertTrue(False)
48
49 def testNoEDE(self):
50 # This isn't an aggresive cache check, but the strcuture is very similar to the others,
51 # so letys place it here.
52 # It test the issue that an intermediate EDE does not get reported with the final answer
53 # https://github.com/PowerDNS/pdns/pull/12694
54 self.wipe()
55
56 res = self.sendQuery('host1.sub.secure.example.', 'TXT')
57 self.assertRcodeEqual(res, dns.rcode.NOERROR)
58 self.assertAnswerEmpty(res)
59 self.assertAuthorityHasSOA(res)
60 self.assertMessageIsAuthenticated(res)
61 self.assertEqual(res.edns, 0)
62 self.assertEqual(len(res.options), 0)
63
64 res = self.sendQuery('host1.sub.secure.example.', 'A')
65 self.assertRcodeEqual(res, dns.rcode.NOERROR)
66 self.assertMessageIsAuthenticated(res)
67 self.assertEqual(res.edns, 0)
68 self.assertEqual(len(res.options), 0)
69
70 def testNoData(self):
71 self.wipe()
72
73 # first we query a nonexistent type, to get the NSEC in our cache
74 entries = self.getMetric('aggressive-nsec-cache-entries')
75 res = self.sendQuery('host1.secure.example.', 'TXT')
76 self.assertRcodeEqual(res, dns.rcode.NOERROR)
77 self.assertAnswerEmpty(res)
78 self.assertAuthorityHasSOA(res)
79 self.assertMessageIsAuthenticated(res)
80 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
81
82 # now we ask for a different type, we should generate the answer from the NSEC,
83 # and no outgoing query should be made
84 nbQueries = self.getMetric('all-outqueries')
85 entries = self.getMetric('aggressive-nsec-cache-entries')
86 res = self.sendQuery('host1.secure.example.', 'AAAA')
87 self.assertRcodeEqual(res, dns.rcode.NOERROR)
88 self.assertAnswerEmpty(res)
89 self.assertAuthorityHasSOA(res)
90 self.assertMessageIsAuthenticated(res)
91 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
92 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
93 self.assertEqual(res.edns, 0)
94 self.assertEqual(len(res.options), 1)
95 self.assertEqual(res.options[0].otype, 15)
96 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
97
98 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
99 _confdir = 'AggressiveNSECCacheNSEC'
100 __test__ = True
101
102 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
103 # do not deny the same names than the non-hashed NSECs do
104 def testNXD(self):
105 self.wipe()
106
107 # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
108 entries = self.getMetric('aggressive-nsec-cache-entries')
109 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
110 res = self.sendQuery('host2.secure.example.', 'TXT')
111 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
112 self.assertAnswerEmpty(res)
113 self.assertAuthorityHasSOA(res)
114 self.assertMessageIsAuthenticated(res)
115 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
116 self.assertEqual(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
117
118 # now we ask for a different name that is covered by the NSEC,
119 # we should generate the answer from the NSEC and no outgoing query should be made
120 nbQueries = self.getMetric('all-outqueries')
121 entries = self.getMetric('aggressive-nsec-cache-entries')
122 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
123 res = self.sendQuery('host3.secure.example.', 'AAAA')
124 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
125 self.assertAnswerEmpty(res)
126 self.assertAuthorityHasSOA(res)
127 self.assertMessageIsAuthenticated(res)
128 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
129 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
130 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
131 self.assertEqual(res.edns, 0)
132 self.assertEqual(len(res.options), 1)
133 self.assertEqual(res.options[0].otype, 15)
134 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
135
136 def testWildcard(self):
137 self.wipe()
138
139 # first we query a nonexistent name, but for which a wildcard matches,
140 # to get the NSEC in our cache
141 res = self.sendQuery('test1.wildcard.secure.example.', 'A')
142 expected = dns.rrset.from_text('test1.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
143 self.assertRcodeEqual(res, dns.rcode.NOERROR)
144 self.assertMatchingRRSIGInAnswer(res, expected)
145 self.assertMessageIsAuthenticated(res)
146
147 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
148 # and no outgoing query should be made
149 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
150 nbQueries = self.getMetric('all-outqueries')
151 res = self.sendQuery('test2.wildcard.secure.example.', 'A')
152 expected = dns.rrset.from_text('test2.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
153 self.assertRcodeEqual(res, dns.rcode.NOERROR)
154 self.assertMatchingRRSIGInAnswer(res, expected)
155 self.assertMessageIsAuthenticated(res)
156 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
157 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits)
158 self.assertEqual(res.edns, 0)
159 self.assertEqual(len(res.options), 1)
160 self.assertEqual(res.options[0].otype, 15)
161 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
162
163 # now we ask for a type that does not exist at the wildcard
164 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
165 nbQueries = self.getMetric('all-outqueries')
166 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
167 self.assertRcodeEqual(res, dns.rcode.NOERROR)
168 self.assertAnswerEmpty(res)
169 self.assertAuthorityHasSOA(res)
170 self.assertMessageIsAuthenticated(res)
171 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
172 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
173 self.assertEqual(res.edns, 0)
174 self.assertEqual(len(res.options), 1)
175 self.assertEqual(res.options[0].otype, 15)
176 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
177
178 # we can also ask a different type, for a different name that is covered
179 # by the NSEC and matches the wildcard (but the type does not exist)
180 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
181 nbQueries = self.getMetric('all-outqueries')
182 res = self.sendQuery('test3.wildcard.secure.example.', 'TXT')
183 self.assertRcodeEqual(res, dns.rcode.NOERROR)
184 self.assertAnswerEmpty(res)
185 self.assertAuthorityHasSOA(res)
186 self.assertMessageIsAuthenticated(res)
187 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
188 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
189 self.assertEqual(res.edns, 0)
190 self.assertEqual(len(res.options), 1)
191 self.assertEqual(res.options[0].otype, 15)
192 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
193
194 def test_Bogus(self):
195 self.wipe()
196
197 # query a name in example to fill the aggressive negcache
198 res = self.sendQuery('example.', 'A')
199 self.assertRcodeEqual(res, dns.rcode.NOERROR)
200 self.assertAnswerEmpty(res)
201 self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
202
203 # query a name in a Bogus zone
204 res = self.sendQuery('ted1.bogus.example.', 'A')
205 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
206 self.assertAnswerEmpty(res)
207
208 # disable validation
209 msg = dns.message.make_query('ted1.bogus.example.', 'A', want_dnssec=True)
210 msg.flags |= dns.flags.CD
211
212 res = self.sendUDPQuery(msg)
213 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
214 self.assertAnswerEmpty(res)
215 self.assertAuthorityHasSOA(res)
216
217 # check that we _do not_ use the aggressive NSEC cache
218 nbQueries = self.getMetric('all-outqueries')
219 msg = dns.message.make_query('ted2.bogus.example.', 'A', want_dnssec=True)
220 msg.flags |= dns.flags.CD
221
222 res = self.sendUDPQuery(msg)
223 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
224 self.assertAnswerEmpty(res)
225 self.assertAuthorityHasSOA(res)
226 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
227
228 # Check that we stil have one aggressive cache entry
229 self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
230 print(res.options)
231 self.assertEqual(res.edns, 0)
232 self.assertEqual(len(res.options), 1)
233 self.assertEqual(res.options[0].otype, 15)
234 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(9, b''))
235
236 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
237 _confdir = 'AggressiveNSECCacheNSEC3'
238 __test__ = True
239
240 @classmethod
241 def secureZone(cls, confdir, zonename, key=None):
242 zone = '.' if zonename == 'ROOT' else zonename
243 if not key:
244 pdnsutilCmd = [os.environ['PDNSUTIL'],
245 '--config-dir=%s' % confdir,
246 'secure-zone',
247 zone]
248 else:
249 keyfile = os.path.join(confdir, 'dnssec.key')
250 with open(keyfile, 'w') as fdKeyfile:
251 fdKeyfile.write(key)
252
253 pdnsutilCmd = [os.environ['PDNSUTIL'],
254 '--config-dir=%s' % confdir,
255 'import-zone-key',
256 zone,
257 keyfile,
258 'active',
259 'ksk']
260
261 print(' '.join(pdnsutilCmd))
262 try:
263 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
264 except subprocess.CalledProcessError as e:
265 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
266
267 params = "1 0 100 AABBCCDDEEFF112233"
268
269 if zone == "optout.example":
270 params = "1 1 100 AABBCCDDEEFF112233"
271
272 pdnsutilCmd = [os.environ['PDNSUTIL'],
273 '--config-dir=%s' % confdir,
274 'set-nsec3',
275 zone,
276 params]
277
278 print(' '.join(pdnsutilCmd))
279 try:
280 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
281 except subprocess.CalledProcessError as e:
282 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
283
284 def testNXD(self):
285 self.wipe()
286
287 # first we query a nonexistent name, to get the needed NSEC3s in our cache
288 res = self.sendQuery('host2.secure.example.', 'TXT')
289 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
290 self.assertAnswerEmpty(res)
291 self.assertAuthorityHasSOA(res)
292 self.assertMessageIsAuthenticated(res)
293
294 # now we ask for a different name that is covered by the NSEC3s,
295 # we should generate the answer from the NSEC3s and no outgoing query should be made
296 nbQueries = self.getMetric('all-outqueries')
297 res = self.sendQuery('host6.secure.example.', 'AAAA')
298 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
299 self.assertAnswerEmpty(res)
300 self.assertAuthorityHasSOA(res)
301 self.assertMessageIsAuthenticated(res)
302 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
303 self.assertEqual(res.edns, 0)
304 self.assertEqual(len(res.options), 1)
305 self.assertEqual(res.options[0].otype, 15)
306 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
307
308 def testWildcard(self):
309 self.wipe()
310
311 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
312 # but a type that does not exist
313 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
314 self.assertRcodeEqual(res, dns.rcode.NOERROR)
315 self.assertAnswerEmpty(res)
316 self.assertAuthorityHasSOA(res)
317 self.assertMessageIsAuthenticated(res)
318
319 # we query a nonexistent name, but for which a wildcard matches,
320 # to get the NSEC3 in our cache
321 res = self.sendQuery('test5.wildcard.secure.example.', 'A')
322 expected = dns.rrset.from_text('test5.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
323 self.assertRcodeEqual(res, dns.rcode.NOERROR)
324 self.assertMatchingRRSIGInAnswer(res, expected)
325 self.assertMessageIsAuthenticated(res)
326
327 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
328 # and no outgoing query should be made
329 nbQueries = self.getMetric('all-outqueries')
330 res = self.sendQuery('test6.wildcard.secure.example.', 'A')
331 expected = dns.rrset.from_text('test6.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
332 self.assertRcodeEqual(res, dns.rcode.NOERROR)
333 self.assertMatchingRRSIGInAnswer(res, expected)
334 self.assertMessageIsAuthenticated(res)
335 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
336 self.assertEqual(res.edns, 0)
337 self.assertEqual(len(res.options), 1)
338 self.assertEqual(res.options[0].otype, 15)
339 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
340
341 # now we ask for a type that does not exist at the wildcard
342 nbQueries = self.getMetric('all-outqueries')
343 res = self.sendQuery('test5.wildcard.secure.example.', 'AAAA')
344 self.assertRcodeEqual(res, dns.rcode.NOERROR)
345 self.assertAnswerEmpty(res)
346 self.assertAuthorityHasSOA(res)
347 self.assertMessageIsAuthenticated(res)
348 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
349 self.assertEqual(res.edns, 0)
350 self.assertEqual(len(res.options), 1)
351 self.assertEqual(res.options[0].otype, 15)
352 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
353
354 # we can also ask a different type, for a different name that is covered
355 # by the NSEC3s and matches the wildcard (but the type does not exist)
356 nbQueries = self.getMetric('all-outqueries')
357 res = self.sendQuery('test6.wildcard.secure.example.', 'TXT')
358 self.assertRcodeEqual(res, dns.rcode.NOERROR)
359 self.assertAnswerEmpty(res)
360 self.assertAuthorityHasSOA(res)
361 self.assertMessageIsAuthenticated(res)
362 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
363 self.assertEqual(res.edns, 0)
364 self.assertEqual(len(res.options), 1)
365 self.assertEqual(res.options[0].otype, 15)
366 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
367
368 def test_OptOut(self):
369 self.wipe()
370
371 # query a name in an opt-out zone
372 res = self.sendQuery('ns2.optout.example.', 'A')
373 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
374 self.assertAnswerEmpty(res)
375 self.assertAuthorityHasSOA(res)
376
377 # check that we _do not_ use the aggressive NSEC cache
378 nbQueries = self.getMetric('all-outqueries')
379 res = self.sendQuery('ns3.optout.example.', 'A')
380 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
381 self.assertAnswerEmpty(res)
382 self.assertAuthorityHasSOA(res)
383 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
384 self.assertEqual(res.edns, 0)
385 self.assertEqual(len(res.options), 0)