]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
9 class AggressiveNSECCacheBase(RecursorTest
):
13 _wsPassword
= 'secretpassword'
14 _apiKey
= 'secretapikey'
15 #_recursorStartupDelay = 4.0
16 _config_template
= """
18 aggressive-nsec-cache-size=10000
19 aggressive-cache-max-nsec3-hash-cost=204
20 nsec3-max-iterations=150
23 webserver-address=127.0.0.1
26 devonly-regression-test-mode
27 extended-resolution-errors=yes
28 """ % (_wsPort
, _wsPassword
, _apiKey
)
32 confdir
= os
.path
.join('configs', cls
._confdir
)
33 # Only wipe examples, as wiping the root triggers root NS refreshes
34 cls
.wipeRecursorCache(confdir
, "example$")
36 def getMetric(self
, name
):
37 headers
= {'x-api-key': self
._apiKey
}
38 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
39 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
41 self
.assertEqual(r
.status_code
, 200)
42 self
.assertTrue(r
.json())
46 if entry
['name'] == name
:
47 return int(entry
['value'])
49 self
.assertTrue(False)
52 # This isn't an aggresive cache check, but the strcuture is very similar to the others,
53 # so letys place it here.
54 # It test the issue that an intermediate EDE does not get reported with the final answer
55 # https://github.com/PowerDNS/pdns/pull/12694
58 res
= self
.sendQuery('host1.sub.secure.example.', 'TXT')
59 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
60 self
.assertAnswerEmpty(res
)
61 self
.assertAuthorityHasSOA(res
)
62 self
.assertMessageIsAuthenticated(res
)
63 self
.assertEqual(res
.edns
, 0)
64 self
.assertEqual(len(res
.options
), 0)
66 res
= self
.sendQuery('host1.sub.secure.example.', 'A')
67 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
68 self
.assertMessageIsAuthenticated(res
)
69 self
.assertEqual(res
.edns
, 0)
70 self
.assertEqual(len(res
.options
), 0)
75 # first we query a nonexistent type, to get the NSEC in our cache
76 entries
= self
.getMetric('aggressive-nsec-cache-entries')
77 res
= self
.sendQuery('host1.secure.example.', 'TXT')
78 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
79 self
.assertAnswerEmpty(res
)
80 self
.assertAuthorityHasSOA(res
)
81 self
.assertMessageIsAuthenticated(res
)
82 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
84 # now we ask for a different type, we should generate the answer from the NSEC,
85 # and no outgoing query should be made
86 nbQueries
= self
.getMetric('all-outqueries')
87 entries
= self
.getMetric('aggressive-nsec-cache-entries')
88 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
89 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
90 self
.assertAnswerEmpty(res
)
91 self
.assertAuthorityHasSOA(res
)
92 self
.assertMessageIsAuthenticated(res
)
93 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
94 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
95 self
.assertEqual(res
.edns
, 0)
96 self
.assertEqual(len(res
.options
), 1)
97 self
.assertEqual(res
.options
[0].otype
, 15)
98 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
100 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
101 _confdir
= 'AggressiveNSECCacheNSEC'
104 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
105 # do not deny the same names than the non-hashed NSECs do
109 # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
110 entries
= self
.getMetric('aggressive-nsec-cache-entries')
111 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
112 res
= self
.sendQuery('host2.secure.example.', 'TXT')
113 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
114 self
.assertAnswerEmpty(res
)
115 self
.assertAuthorityHasSOA(res
)
116 self
.assertMessageIsAuthenticated(res
)
117 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
118 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
120 # now we ask for a different name that is covered by the NSEC,
121 # we should generate the answer from the NSEC and no outgoing query should be made
122 nbQueries
= self
.getMetric('all-outqueries')
123 entries
= self
.getMetric('aggressive-nsec-cache-entries')
124 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
125 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
126 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
127 self
.assertAnswerEmpty(res
)
128 self
.assertAuthorityHasSOA(res
)
129 self
.assertMessageIsAuthenticated(res
)
130 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
131 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
132 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
133 self
.assertEqual(res
.edns
, 0)
134 self
.assertEqual(len(res
.options
), 1)
135 self
.assertEqual(res
.options
[0].otype
, 15)
136 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
138 def testWildcard(self
):
141 # first we query a nonexistent name, but for which a wildcard matches,
142 # to get the NSEC in our cache
143 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
144 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
145 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
146 self
.assertMatchingRRSIGInAnswer(res
, expected
)
147 self
.assertMessageIsAuthenticated(res
)
149 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
150 # and no outgoing query should be made
151 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
152 nbQueries
= self
.getMetric('all-outqueries')
153 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
154 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
155 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
156 self
.assertMatchingRRSIGInAnswer(res
, expected
)
157 self
.assertMessageIsAuthenticated(res
)
158 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
159 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
160 self
.assertEqual(res
.edns
, 0)
161 self
.assertEqual(len(res
.options
), 1)
162 self
.assertEqual(res
.options
[0].otype
, 15)
163 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
165 # now we ask for a type that does not exist at the wildcard
166 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
167 nbQueries
= self
.getMetric('all-outqueries')
168 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
169 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
170 self
.assertAnswerEmpty(res
)
171 self
.assertAuthorityHasSOA(res
)
172 self
.assertMessageIsAuthenticated(res
)
173 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
174 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
175 self
.assertEqual(res
.edns
, 0)
176 self
.assertEqual(len(res
.options
), 1)
177 self
.assertEqual(res
.options
[0].otype
, 15)
178 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
180 # we can also ask a different type, for a different name that is covered
181 # by the NSEC and matches the wildcard (but the type does not exist)
182 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
183 nbQueries
= self
.getMetric('all-outqueries')
184 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
185 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
186 self
.assertAnswerEmpty(res
)
187 self
.assertAuthorityHasSOA(res
)
188 self
.assertMessageIsAuthenticated(res
)
189 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
190 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
191 self
.assertEqual(res
.edns
, 0)
192 self
.assertEqual(len(res
.options
), 1)
193 self
.assertEqual(res
.options
[0].otype
, 15)
194 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
196 def test_Bogus(self
):
199 # query a name in example to fill the aggressive negcache
200 res
= self
.sendQuery('example.', 'A')
201 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
202 self
.assertAnswerEmpty(res
)
203 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
205 # query a name in a Bogus zone
206 res
= self
.sendQuery('ted1.bogus.example.', 'A')
207 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
208 self
.assertAnswerEmpty(res
)
211 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
212 msg
.flags |
= dns
.flags
.CD
214 res
= self
.sendUDPQuery(msg
)
215 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
216 self
.assertAnswerEmpty(res
)
217 self
.assertAuthorityHasSOA(res
)
219 # check that we _do not_ use the aggressive NSEC cache
220 nbQueries
= self
.getMetric('all-outqueries')
221 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
222 msg
.flags |
= dns
.flags
.CD
224 res
= self
.sendUDPQuery(msg
)
225 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
226 self
.assertAnswerEmpty(res
)
227 self
.assertAuthorityHasSOA(res
)
228 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
230 # Check that we stil have one aggressive cache entry
231 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
233 self
.assertEqual(res
.edns
, 0)
234 self
.assertEqual(len(res
.options
), 1)
235 self
.assertEqual(res
.options
[0].otype
, 15)
236 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(9, b
''))
238 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
239 _confdir
= 'AggressiveNSECCacheNSEC3'
243 def secureZone(cls
, confdir
, zonename
, key
=None):
244 zone
= '.' if zonename
== 'ROOT' else zonename
246 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
247 '--config-dir=%s' % confdir
,
251 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
252 with
open(keyfile
, 'w') as fdKeyfile
:
255 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
256 '--config-dir=%s' % confdir
,
263 print(' '.join(pdnsutilCmd
))
265 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
266 except subprocess
.CalledProcessError
as e
:
267 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
269 params
= "1 0 100 AABBCCDDEEFF112233"
271 if zone
== "optout.example":
272 params
= "1 1 100 AABBCCDDEEFF112233"
274 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
275 '--config-dir=%s' % confdir
,
280 print(' '.join(pdnsutilCmd
))
282 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
283 except subprocess
.CalledProcessError
as e
:
284 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
289 # first we query a nonexistent name, to get the needed NSEC3s in our cache
290 res
= self
.sendQuery('host2.secure.example.', 'TXT')
291 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
292 self
.assertAnswerEmpty(res
)
293 self
.assertAuthorityHasSOA(res
)
294 self
.assertMessageIsAuthenticated(res
)
296 # now we ask for a different name that is covered by the NSEC3s,
297 # we should generate the answer from the NSEC3s and no outgoing query should be made
298 nbQueries
= self
.getMetric('all-outqueries')
299 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
300 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
301 self
.assertAnswerEmpty(res
)
302 self
.assertAuthorityHasSOA(res
)
303 self
.assertMessageIsAuthenticated(res
)
304 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
305 self
.assertEqual(res
.edns
, 0)
306 self
.assertEqual(len(res
.options
), 1)
307 self
.assertEqual(res
.options
[0].otype
, 15)
308 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
310 def testWildcard(self
):
313 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
314 # but a type that does not exist
315 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
316 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
317 self
.assertAnswerEmpty(res
)
318 self
.assertAuthorityHasSOA(res
)
319 self
.assertMessageIsAuthenticated(res
)
321 # we query a nonexistent name, but for which a wildcard matches,
322 # to get the NSEC3 in our cache
323 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
324 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
325 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
326 self
.assertMatchingRRSIGInAnswer(res
, expected
)
327 self
.assertMessageIsAuthenticated(res
)
329 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
330 # and no outgoing query should be made
331 nbQueries
= self
.getMetric('all-outqueries')
332 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
333 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
334 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
335 self
.assertMatchingRRSIGInAnswer(res
, expected
)
336 self
.assertMessageIsAuthenticated(res
)
337 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
338 self
.assertEqual(res
.edns
, 0)
339 self
.assertEqual(len(res
.options
), 1)
340 self
.assertEqual(res
.options
[0].otype
, 15)
341 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
343 # now we ask for a type that does not exist at the wildcard
344 nbQueries
= self
.getMetric('all-outqueries')
345 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
346 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
347 self
.assertAnswerEmpty(res
)
348 self
.assertAuthorityHasSOA(res
)
349 self
.assertMessageIsAuthenticated(res
)
350 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
351 self
.assertEqual(res
.edns
, 0)
352 self
.assertEqual(len(res
.options
), 1)
353 self
.assertEqual(res
.options
[0].otype
, 15)
354 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
356 # we can also ask a different type, for a different name that is covered
357 # by the NSEC3s and matches the wildcard (but the type does not exist)
358 nbQueries
= self
.getMetric('all-outqueries')
359 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
360 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
361 self
.assertAnswerEmpty(res
)
362 self
.assertAuthorityHasSOA(res
)
363 self
.assertMessageIsAuthenticated(res
)
364 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
365 self
.assertEqual(res
.edns
, 0)
366 self
.assertEqual(len(res
.options
), 1)
367 self
.assertEqual(res
.options
[0].otype
, 15)
368 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
370 def test_OptOut(self
):
373 # query a name in an opt-out zone
374 res
= self
.sendQuery('ns2.optout.example.', 'A')
375 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
376 self
.assertAnswerEmpty(res
)
377 self
.assertAuthorityHasSOA(res
)
379 # check that we _do not_ use the aggressive NSEC cache
380 nbQueries
= self
.getMetric('all-outqueries')
381 res
= self
.sendQuery('ns3.optout.example.', 'A')
382 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
383 self
.assertAnswerEmpty(res
)
384 self
.assertAuthorityHasSOA(res
)
385 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
386 self
.assertEqual(res
.edns
, 0)
387 self
.assertEqual(len(res
.options
), 0)