]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
9 class AggressiveNSECCacheBase(RecursorTest
):
13 _wsPassword
= 'secretpassword'
14 _apiKey
= 'secretapikey'
15 #_recursorStartupDelay = 4.0
16 _config_template
= """
18 aggressive-nsec-cache-size=10000
19 nsec3-max-iterations=150
22 webserver-address=127.0.0.1
25 devonly-regression-test-mode
26 extended-resolution-errors=yes
27 """ % (_wsPort
, _wsPassword
, _apiKey
)
31 confdir
= os
.path
.join('configs', cls
._confdir
)
32 # Only wipe examples, as wiping the root triggers root NS refreshes
33 cls
.wipeRecursorCache(confdir
, "example$")
35 def getMetric(self
, name
):
36 headers
= {'x-api-key': self
._apiKey
}
37 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
38 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
40 self
.assertEqual(r
.status_code
, 200)
41 self
.assertTrue(r
.json())
45 if entry
['name'] == name
:
46 return int(entry
['value'])
48 self
.assertTrue(False)
51 # This isn't an aggresive cache check, but the strcuture is very similar to the others,
52 # so letys place it here.
53 # It test the issue that an intermediate EDE does not get reported with the final answer
54 # https://github.com/PowerDNS/pdns/pull/12694
57 res
= self
.sendQuery('host1.sub.secure.example.', 'TXT')
58 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
59 self
.assertAnswerEmpty(res
)
60 self
.assertAuthorityHasSOA(res
)
61 self
.assertMessageIsAuthenticated(res
)
62 self
.assertEqual(res
.edns
, 0)
63 self
.assertEqual(len(res
.options
), 0)
65 res
= self
.sendQuery('host1.sub.secure.example.', 'A')
66 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
67 self
.assertMessageIsAuthenticated(res
)
68 self
.assertEqual(res
.edns
, 0)
69 self
.assertEqual(len(res
.options
), 0)
74 # first we query a nonexistent type, to get the NSEC in our cache
75 entries
= self
.getMetric('aggressive-nsec-cache-entries')
76 res
= self
.sendQuery('host1.secure.example.', 'TXT')
77 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
78 self
.assertAnswerEmpty(res
)
79 self
.assertAuthorityHasSOA(res
)
80 self
.assertMessageIsAuthenticated(res
)
81 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
83 # now we ask for a different type, we should generate the answer from the NSEC,
84 # and no outgoing query should be made
85 nbQueries
= self
.getMetric('all-outqueries')
86 entries
= self
.getMetric('aggressive-nsec-cache-entries')
87 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
88 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
89 self
.assertAnswerEmpty(res
)
90 self
.assertAuthorityHasSOA(res
)
91 self
.assertMessageIsAuthenticated(res
)
92 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
93 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
94 self
.assertEqual(res
.edns
, 0)
95 self
.assertEqual(len(res
.options
), 1)
96 self
.assertEqual(res
.options
[0].otype
, 15)
97 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
99 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
100 _confdir
= 'AggressiveNSECCacheNSEC'
103 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
104 # do not deny the same names than the non-hashed NSECs do
108 # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
109 entries
= self
.getMetric('aggressive-nsec-cache-entries')
110 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
111 res
= self
.sendQuery('host2.secure.example.', 'TXT')
112 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
113 self
.assertAnswerEmpty(res
)
114 self
.assertAuthorityHasSOA(res
)
115 self
.assertMessageIsAuthenticated(res
)
116 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
117 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
119 # now we ask for a different name that is covered by the NSEC,
120 # we should generate the answer from the NSEC and no outgoing query should be made
121 nbQueries
= self
.getMetric('all-outqueries')
122 entries
= self
.getMetric('aggressive-nsec-cache-entries')
123 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
124 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
125 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
126 self
.assertAnswerEmpty(res
)
127 self
.assertAuthorityHasSOA(res
)
128 self
.assertMessageIsAuthenticated(res
)
129 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
130 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
131 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
132 self
.assertEqual(res
.edns
, 0)
133 self
.assertEqual(len(res
.options
), 1)
134 self
.assertEqual(res
.options
[0].otype
, 15)
135 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
137 def testWildcard(self
):
140 # first we query a nonexistent name, but for which a wildcard matches,
141 # to get the NSEC in our cache
142 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
143 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
144 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
145 self
.assertMatchingRRSIGInAnswer(res
, expected
)
146 self
.assertMessageIsAuthenticated(res
)
148 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
149 # and no outgoing query should be made
150 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
151 nbQueries
= self
.getMetric('all-outqueries')
152 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
153 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
154 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
155 self
.assertMatchingRRSIGInAnswer(res
, expected
)
156 self
.assertMessageIsAuthenticated(res
)
157 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
158 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
159 self
.assertEqual(res
.edns
, 0)
160 self
.assertEqual(len(res
.options
), 1)
161 self
.assertEqual(res
.options
[0].otype
, 15)
162 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
164 # now we ask for a type that does not exist at the wildcard
165 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
166 nbQueries
= self
.getMetric('all-outqueries')
167 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
168 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
169 self
.assertAnswerEmpty(res
)
170 self
.assertAuthorityHasSOA(res
)
171 self
.assertMessageIsAuthenticated(res
)
172 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
173 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
174 self
.assertEqual(res
.edns
, 0)
175 self
.assertEqual(len(res
.options
), 1)
176 self
.assertEqual(res
.options
[0].otype
, 15)
177 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
179 # we can also ask a different type, for a different name that is covered
180 # by the NSEC and matches the wildcard (but the type does not exist)
181 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
182 nbQueries
= self
.getMetric('all-outqueries')
183 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
184 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
185 self
.assertAnswerEmpty(res
)
186 self
.assertAuthorityHasSOA(res
)
187 self
.assertMessageIsAuthenticated(res
)
188 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
189 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
190 self
.assertEqual(res
.edns
, 0)
191 self
.assertEqual(len(res
.options
), 1)
192 self
.assertEqual(res
.options
[0].otype
, 15)
193 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
195 def test_Bogus(self
):
198 # query a name in example to fill the aggressive negcache
199 res
= self
.sendQuery('example.', 'A')
200 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
201 self
.assertAnswerEmpty(res
)
202 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
204 # query a name in a Bogus zone
205 res
= self
.sendQuery('ted1.bogus.example.', 'A')
206 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
207 self
.assertAnswerEmpty(res
)
210 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
211 msg
.flags |
= dns
.flags
.CD
213 res
= self
.sendUDPQuery(msg
)
214 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
215 self
.assertAnswerEmpty(res
)
216 self
.assertAuthorityHasSOA(res
)
218 # check that we _do not_ use the aggressive NSEC cache
219 nbQueries
= self
.getMetric('all-outqueries')
220 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
221 msg
.flags |
= dns
.flags
.CD
223 res
= self
.sendUDPQuery(msg
)
224 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
225 self
.assertAnswerEmpty(res
)
226 self
.assertAuthorityHasSOA(res
)
227 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
229 # Check that we stil have one aggressive cache entry
230 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
232 self
.assertEqual(res
.edns
, 0)
233 self
.assertEqual(len(res
.options
), 1)
234 self
.assertEqual(res
.options
[0].otype
, 15)
235 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(9, b
''))
237 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
238 _confdir
= 'AggressiveNSECCacheNSEC3'
242 def secureZone(cls
, confdir
, zonename
, key
=None):
243 zone
= '.' if zonename
== 'ROOT' else zonename
245 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
246 '--config-dir=%s' % confdir
,
250 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
251 with
open(keyfile
, 'w') as fdKeyfile
:
254 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
255 '--config-dir=%s' % confdir
,
262 print(' '.join(pdnsutilCmd
))
264 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
265 except subprocess
.CalledProcessError
as e
:
266 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
268 params
= "1 0 100 AABBCCDDEEFF112233"
270 if zone
== "optout.example":
271 params
= "1 1 100 AABBCCDDEEFF112233"
273 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
274 '--config-dir=%s' % confdir
,
279 print(' '.join(pdnsutilCmd
))
281 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
282 except subprocess
.CalledProcessError
as e
:
283 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
288 # first we query a nonexistent name, to get the needed NSEC3s in our cache
289 res
= self
.sendQuery('host2.secure.example.', 'TXT')
290 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
291 self
.assertAnswerEmpty(res
)
292 self
.assertAuthorityHasSOA(res
)
293 self
.assertMessageIsAuthenticated(res
)
295 # now we ask for a different name that is covered by the NSEC3s,
296 # we should generate the answer from the NSEC3s and no outgoing query should be made
297 nbQueries
= self
.getMetric('all-outqueries')
298 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
299 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
300 self
.assertAnswerEmpty(res
)
301 self
.assertAuthorityHasSOA(res
)
302 self
.assertMessageIsAuthenticated(res
)
303 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
304 self
.assertEqual(res
.edns
, 0)
305 self
.assertEqual(len(res
.options
), 1)
306 self
.assertEqual(res
.options
[0].otype
, 15)
307 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
309 def testWildcard(self
):
312 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
313 # but a type that does not exist
314 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
315 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
316 self
.assertAnswerEmpty(res
)
317 self
.assertAuthorityHasSOA(res
)
318 self
.assertMessageIsAuthenticated(res
)
320 # we query a nonexistent name, but for which a wildcard matches,
321 # to get the NSEC3 in our cache
322 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
323 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
324 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
325 self
.assertMatchingRRSIGInAnswer(res
, expected
)
326 self
.assertMessageIsAuthenticated(res
)
328 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
329 # and no outgoing query should be made
330 nbQueries
= self
.getMetric('all-outqueries')
331 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
332 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
333 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
334 self
.assertMatchingRRSIGInAnswer(res
, expected
)
335 self
.assertMessageIsAuthenticated(res
)
336 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
337 self
.assertEqual(res
.edns
, 0)
338 self
.assertEqual(len(res
.options
), 1)
339 self
.assertEqual(res
.options
[0].otype
, 15)
340 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
342 # now we ask for a type that does not exist at the wildcard
343 nbQueries
= self
.getMetric('all-outqueries')
344 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
345 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
346 self
.assertAnswerEmpty(res
)
347 self
.assertAuthorityHasSOA(res
)
348 self
.assertMessageIsAuthenticated(res
)
349 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
350 self
.assertEqual(res
.edns
, 0)
351 self
.assertEqual(len(res
.options
), 1)
352 self
.assertEqual(res
.options
[0].otype
, 15)
353 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
355 # we can also ask a different type, for a different name that is covered
356 # by the NSEC3s and matches the wildcard (but the type does not exist)
357 nbQueries
= self
.getMetric('all-outqueries')
358 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
359 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
360 self
.assertAnswerEmpty(res
)
361 self
.assertAuthorityHasSOA(res
)
362 self
.assertMessageIsAuthenticated(res
)
363 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
364 self
.assertEqual(res
.edns
, 0)
365 self
.assertEqual(len(res
.options
), 1)
366 self
.assertEqual(res
.options
[0].otype
, 15)
367 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(29, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
369 def test_OptOut(self
):
372 # query a name in an opt-out zone
373 res
= self
.sendQuery('ns2.optout.example.', 'A')
374 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
375 self
.assertAnswerEmpty(res
)
376 self
.assertAuthorityHasSOA(res
)
378 # check that we _do not_ use the aggressive NSEC cache
379 nbQueries
= self
.getMetric('all-outqueries')
380 res
= self
.sendQuery('ns3.optout.example.', 'A')
381 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
382 self
.assertAnswerEmpty(res
)
383 self
.assertAuthorityHasSOA(res
)
384 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
385 self
.assertEqual(res
.edns
, 0)
386 self
.assertEqual(len(res
.options
), 0)