]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
9 class AggressiveNSECCacheBase(RecursorTest
):
13 _wsPassword
= 'secretpassword'
14 _apiKey
= 'secretapikey'
15 #_recursorStartupDelay = 4.0
16 _config_template
= """
18 aggressive-nsec-cache-size=10000
21 webserver-address=127.0.0.1
24 devonly-regression-test-mode
25 extended-resolution-errors=yes
26 """ % (_wsPort
, _wsPassword
, _apiKey
)
30 confdir
= os
.path
.join('configs', cls
._confdir
)
31 # Only wipe examples, as wiping the root triggers root NS refreshes
32 cls
.wipeRecursorCache(confdir
, "example$")
34 def getMetric(self
, name
):
35 headers
= {'x-api-key': self
._apiKey
}
36 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
37 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
39 self
.assertEqual(r
.status_code
, 200)
40 self
.assertTrue(r
.json())
44 if entry
['name'] == name
:
45 return int(entry
['value'])
47 self
.assertTrue(False)
50 # This isn't an aggresive cache check, but the strcuture is very similar to the others,
51 # so letys place it here.
52 # It test the issue that an intermediate EDE does not get reported with the final answer
53 # https://github.com/PowerDNS/pdns/pull/12694
56 res
= self
.sendQuery('host1.sub.secure.example.', 'TXT')
57 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
58 self
.assertAnswerEmpty(res
)
59 self
.assertAuthorityHasSOA(res
)
60 self
.assertMessageIsAuthenticated(res
)
61 self
.assertEqual(res
.edns
, 0)
62 self
.assertEqual(len(res
.options
), 0)
64 res
= self
.sendQuery('host1.sub.secure.example.', 'A')
65 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
66 self
.assertMessageIsAuthenticated(res
)
67 self
.assertEqual(res
.edns
, 0)
68 self
.assertEqual(len(res
.options
), 0)
73 # first we query a nonexistent type, to get the NSEC in our cache
74 entries
= self
.getMetric('aggressive-nsec-cache-entries')
75 res
= self
.sendQuery('host1.secure.example.', 'TXT')
76 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
77 self
.assertAnswerEmpty(res
)
78 self
.assertAuthorityHasSOA(res
)
79 self
.assertMessageIsAuthenticated(res
)
80 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
82 # now we ask for a different type, we should generate the answer from the NSEC,
83 # and no outgoing query should be made
84 nbQueries
= self
.getMetric('all-outqueries')
85 entries
= self
.getMetric('aggressive-nsec-cache-entries')
86 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
87 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
88 self
.assertAnswerEmpty(res
)
89 self
.assertAuthorityHasSOA(res
)
90 self
.assertMessageIsAuthenticated(res
)
91 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
92 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
93 self
.assertEqual(res
.edns
, 0)
94 self
.assertEqual(len(res
.options
), 1)
95 self
.assertEqual(res
.options
[0].otype
, 15)
96 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
98 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
99 _confdir
= 'AggressiveNSECCacheNSEC'
102 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
103 # do not deny the same names than the non-hashed NSECs do
107 # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
108 entries
= self
.getMetric('aggressive-nsec-cache-entries')
109 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
110 res
= self
.sendQuery('host2.secure.example.', 'TXT')
111 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
112 self
.assertAnswerEmpty(res
)
113 self
.assertAuthorityHasSOA(res
)
114 self
.assertMessageIsAuthenticated(res
)
115 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
116 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
118 # now we ask for a different name that is covered by the NSEC,
119 # we should generate the answer from the NSEC and no outgoing query should be made
120 nbQueries
= self
.getMetric('all-outqueries')
121 entries
= self
.getMetric('aggressive-nsec-cache-entries')
122 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
123 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
124 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
125 self
.assertAnswerEmpty(res
)
126 self
.assertAuthorityHasSOA(res
)
127 self
.assertMessageIsAuthenticated(res
)
128 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
129 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
130 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
131 self
.assertEqual(res
.edns
, 0)
132 self
.assertEqual(len(res
.options
), 1)
133 self
.assertEqual(res
.options
[0].otype
, 15)
134 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
136 def testWildcard(self
):
139 # first we query a nonexistent name, but for which a wildcard matches,
140 # to get the NSEC in our cache
141 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
142 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
143 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
144 self
.assertMatchingRRSIGInAnswer(res
, expected
)
145 self
.assertMessageIsAuthenticated(res
)
147 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
148 # and no outgoing query should be made
149 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
150 nbQueries
= self
.getMetric('all-outqueries')
151 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
152 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
153 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
154 self
.assertMatchingRRSIGInAnswer(res
, expected
)
155 self
.assertMessageIsAuthenticated(res
)
156 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
157 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
158 self
.assertEqual(res
.edns
, 0)
159 self
.assertEqual(len(res
.options
), 1)
160 self
.assertEqual(res
.options
[0].otype
, 15)
161 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
163 # now we ask for a type that does not exist at the wildcard
164 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
165 nbQueries
= self
.getMetric('all-outqueries')
166 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
167 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
168 self
.assertAnswerEmpty(res
)
169 self
.assertAuthorityHasSOA(res
)
170 self
.assertMessageIsAuthenticated(res
)
171 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
172 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
173 self
.assertEqual(res
.edns
, 0)
174 self
.assertEqual(len(res
.options
), 1)
175 self
.assertEqual(res
.options
[0].otype
, 15)
176 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
178 # we can also ask a different type, for a different name that is covered
179 # by the NSEC and matches the wildcard (but the type does not exist)
180 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
181 nbQueries
= self
.getMetric('all-outqueries')
182 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
183 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
184 self
.assertAnswerEmpty(res
)
185 self
.assertAuthorityHasSOA(res
)
186 self
.assertMessageIsAuthenticated(res
)
187 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
188 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
189 self
.assertEqual(res
.edns
, 0)
190 self
.assertEqual(len(res
.options
), 1)
191 self
.assertEqual(res
.options
[0].otype
, 15)
192 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
194 def test_Bogus(self
):
197 # query a name in example to fill the aggressive negcache
198 res
= self
.sendQuery('example.', 'A')
199 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
200 self
.assertAnswerEmpty(res
)
201 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
203 # query a name in a Bogus zone
204 res
= self
.sendQuery('ted1.bogus.example.', 'A')
205 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
206 self
.assertAnswerEmpty(res
)
209 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
210 msg
.flags |
= dns
.flags
.CD
212 res
= self
.sendUDPQuery(msg
)
213 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
214 self
.assertAnswerEmpty(res
)
215 self
.assertAuthorityHasSOA(res
)
217 # check that we _do not_ use the aggressive NSEC cache
218 nbQueries
= self
.getMetric('all-outqueries')
219 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
220 msg
.flags |
= dns
.flags
.CD
222 res
= self
.sendUDPQuery(msg
)
223 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
224 self
.assertAnswerEmpty(res
)
225 self
.assertAuthorityHasSOA(res
)
226 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
228 # Check that we stil have one aggressive cache entry
229 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
231 self
.assertEqual(res
.edns
, 0)
232 self
.assertEqual(len(res
.options
), 1)
233 self
.assertEqual(res
.options
[0].otype
, 15)
234 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(9, b
''))
236 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
237 _confdir
= 'AggressiveNSECCacheNSEC3'
241 def secureZone(cls
, confdir
, zonename
, key
=None):
242 zone
= '.' if zonename
== 'ROOT' else zonename
244 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
245 '--config-dir=%s' % confdir
,
249 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
250 with
open(keyfile
, 'w') as fdKeyfile
:
253 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
254 '--config-dir=%s' % confdir
,
261 print(' '.join(pdnsutilCmd
))
263 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
264 except subprocess
.CalledProcessError
as e
:
265 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
267 params
= "1 0 100 AABBCCDDEEFF112233"
269 if zone
== "optout.example":
270 params
= "1 1 100 AABBCCDDEEFF112233"
272 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
273 '--config-dir=%s' % confdir
,
278 print(' '.join(pdnsutilCmd
))
280 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
281 except subprocess
.CalledProcessError
as e
:
282 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
287 # first we query a nonexistent name, to get the needed NSEC3s in our cache
288 res
= self
.sendQuery('host2.secure.example.', 'TXT')
289 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
290 self
.assertAnswerEmpty(res
)
291 self
.assertAuthorityHasSOA(res
)
292 self
.assertMessageIsAuthenticated(res
)
294 # now we ask for a different name that is covered by the NSEC3s,
295 # we should generate the answer from the NSEC3s and no outgoing query should be made
296 nbQueries
= self
.getMetric('all-outqueries')
297 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
298 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
299 self
.assertAnswerEmpty(res
)
300 self
.assertAuthorityHasSOA(res
)
301 self
.assertMessageIsAuthenticated(res
)
302 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
303 self
.assertEqual(res
.edns
, 0)
304 self
.assertEqual(len(res
.options
), 1)
305 self
.assertEqual(res
.options
[0].otype
, 15)
306 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
308 def testWildcard(self
):
311 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
312 # but a type that does not exist
313 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
314 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
315 self
.assertAnswerEmpty(res
)
316 self
.assertAuthorityHasSOA(res
)
317 self
.assertMessageIsAuthenticated(res
)
319 # we query a nonexistent name, but for which a wildcard matches,
320 # to get the NSEC3 in our cache
321 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
322 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
323 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
324 self
.assertMatchingRRSIGInAnswer(res
, expected
)
325 self
.assertMessageIsAuthenticated(res
)
327 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
328 # and no outgoing query should be made
329 nbQueries
= self
.getMetric('all-outqueries')
330 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
331 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
332 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
333 self
.assertMatchingRRSIGInAnswer(res
, expected
)
334 self
.assertMessageIsAuthenticated(res
)
335 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
336 self
.assertEqual(res
.edns
, 0)
337 self
.assertEqual(len(res
.options
), 1)
338 self
.assertEqual(res
.options
[0].otype
, 15)
339 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
341 # now we ask for a type that does not exist at the wildcard
342 nbQueries
= self
.getMetric('all-outqueries')
343 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
344 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
345 self
.assertAnswerEmpty(res
)
346 self
.assertAuthorityHasSOA(res
)
347 self
.assertMessageIsAuthenticated(res
)
348 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
349 self
.assertEqual(res
.edns
, 0)
350 self
.assertEqual(len(res
.options
), 1)
351 self
.assertEqual(res
.options
[0].otype
, 15)
352 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
354 # we can also ask a different type, for a different name that is covered
355 # by the NSEC3s and matches the wildcard (but the type does not exist)
356 nbQueries
= self
.getMetric('all-outqueries')
357 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
358 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
359 self
.assertAnswerEmpty(res
)
360 self
.assertAuthorityHasSOA(res
)
361 self
.assertMessageIsAuthenticated(res
)
362 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
363 self
.assertEqual(res
.edns
, 0)
364 self
.assertEqual(len(res
.options
), 1)
365 self
.assertEqual(res
.options
[0].otype
, 15)
366 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
368 def test_OptOut(self
):
371 # query a name in an opt-out zone
372 res
= self
.sendQuery('ns2.optout.example.', 'A')
373 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
374 self
.assertAnswerEmpty(res
)
375 self
.assertAuthorityHasSOA(res
)
377 # check that we _do not_ use the aggressive NSEC cache
378 nbQueries
= self
.getMetric('all-outqueries')
379 res
= self
.sendQuery('ns3.optout.example.', 'A')
380 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
381 self
.assertAnswerEmpty(res
)
382 self
.assertAuthorityHasSOA(res
)
383 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
384 self
.assertEqual(res
.edns
, 0)
385 self
.assertEqual(len(res
.options
), 0)