]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
9 class AggressiveNSECCacheBase(RecursorTest
):
13 _wsPassword
= 'secretpassword'
14 _apiKey
= 'secretapikey'
15 #_recursorStartupDelay = 4.0
16 _config_template
= """
18 aggressive-nsec-cache-size=10000
21 webserver-address=127.0.0.1
24 devonly-regression-test-mode
25 extended-resolution-errors=yes
26 """ % (_wsPort
, _wsPassword
, _apiKey
)
30 confdir
= os
.path
.join('configs', cls
._confdir
)
31 # Only wipe examples, as wiping the root triggers root NS refreshes
32 cls
.wipeRecursorCache(confdir
, "example$")
34 def getMetric(self
, name
):
35 headers
= {'x-api-key': self
._apiKey
}
36 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
37 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
39 self
.assertEqual(r
.status_code
, 200)
40 self
.assertTrue(r
.json())
44 if entry
['name'] == name
:
45 return int(entry
['value'])
47 self
.assertTrue(False)
52 # first we query a nonexistent type, to get the NSEC in our cache
53 entries
= self
.getMetric('aggressive-nsec-cache-entries')
54 res
= self
.sendQuery('host1.secure.example.', 'TXT')
55 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
56 self
.assertAnswerEmpty(res
)
57 self
.assertAuthorityHasSOA(res
)
58 self
.assertMessageIsAuthenticated(res
)
59 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
61 # now we ask for a different type, we should generate the answer from the NSEC,
62 # and no outgoing query should be made
63 nbQueries
= self
.getMetric('all-outqueries')
64 entries
= self
.getMetric('aggressive-nsec-cache-entries')
65 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
66 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
67 self
.assertAnswerEmpty(res
)
68 self
.assertAuthorityHasSOA(res
)
69 self
.assertMessageIsAuthenticated(res
)
70 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
71 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
72 self
.assertEqual(res
.edns
, 0)
73 self
.assertEqual(len(res
.options
), 1)
74 self
.assertEqual(res
.options
[0].otype
, 15)
75 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
77 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
78 _confdir
= 'AggressiveNSECCacheNSEC'
81 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
82 # do not deny the same names than the non-hashed NSECs do
86 # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
87 entries
= self
.getMetric('aggressive-nsec-cache-entries')
88 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
89 res
= self
.sendQuery('host2.secure.example.', 'TXT')
90 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
91 self
.assertAnswerEmpty(res
)
92 self
.assertAuthorityHasSOA(res
)
93 self
.assertMessageIsAuthenticated(res
)
94 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
95 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
97 # now we ask for a different name that is covered by the NSEC,
98 # we should generate the answer from the NSEC and no outgoing query should be made
99 nbQueries
= self
.getMetric('all-outqueries')
100 entries
= self
.getMetric('aggressive-nsec-cache-entries')
101 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
102 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
103 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
104 self
.assertAnswerEmpty(res
)
105 self
.assertAuthorityHasSOA(res
)
106 self
.assertMessageIsAuthenticated(res
)
107 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
108 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
109 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
110 self
.assertEqual(res
.edns
, 0)
111 self
.assertEqual(len(res
.options
), 1)
112 self
.assertEqual(res
.options
[0].otype
, 15)
113 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
115 def testWildcard(self
):
118 # first we query a nonexistent name, but for which a wildcard matches,
119 # to get the NSEC in our cache
120 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
121 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
122 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
123 self
.assertMatchingRRSIGInAnswer(res
, expected
)
124 self
.assertMessageIsAuthenticated(res
)
126 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
127 # and no outgoing query should be made
128 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
129 nbQueries
= self
.getMetric('all-outqueries')
130 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
131 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
132 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
133 self
.assertMatchingRRSIGInAnswer(res
, expected
)
134 self
.assertMessageIsAuthenticated(res
)
135 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
136 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
137 self
.assertEqual(res
.edns
, 0)
138 self
.assertEqual(len(res
.options
), 1)
139 self
.assertEqual(res
.options
[0].otype
, 15)
140 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
142 # now we ask for a type that does not exist at the wildcard
143 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
144 nbQueries
= self
.getMetric('all-outqueries')
145 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
146 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
147 self
.assertAnswerEmpty(res
)
148 self
.assertAuthorityHasSOA(res
)
149 self
.assertMessageIsAuthenticated(res
)
150 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
151 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
152 self
.assertEqual(res
.edns
, 0)
153 self
.assertEqual(len(res
.options
), 1)
154 self
.assertEqual(res
.options
[0].otype
, 15)
155 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
157 # we can also ask a different type, for a different name that is covered
158 # by the NSEC and matches the wildcard (but the type does not exist)
159 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
160 nbQueries
= self
.getMetric('all-outqueries')
161 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
162 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
163 self
.assertAnswerEmpty(res
)
164 self
.assertAuthorityHasSOA(res
)
165 self
.assertMessageIsAuthenticated(res
)
166 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
167 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
168 self
.assertEqual(res
.edns
, 0)
169 self
.assertEqual(len(res
.options
), 1)
170 self
.assertEqual(res
.options
[0].otype
, 15)
171 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
173 def test_Bogus(self
):
176 # query a name in example to fill the aggressive negcache
177 res
= self
.sendQuery('example.', 'A')
178 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
179 self
.assertAnswerEmpty(res
)
180 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
182 # query a name in a Bogus zone
183 res
= self
.sendQuery('ted1.bogus.example.', 'A')
184 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
185 self
.assertAnswerEmpty(res
)
188 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
189 msg
.flags |
= dns
.flags
.CD
191 res
= self
.sendUDPQuery(msg
)
192 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
193 self
.assertAnswerEmpty(res
)
194 self
.assertAuthorityHasSOA(res
)
196 # check that we _do not_ use the aggressive NSEC cache
197 nbQueries
= self
.getMetric('all-outqueries')
198 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
199 msg
.flags |
= dns
.flags
.CD
201 res
= self
.sendUDPQuery(msg
)
202 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
203 self
.assertAnswerEmpty(res
)
204 self
.assertAuthorityHasSOA(res
)
205 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
207 # Check that we stil have one aggressive cache entry
208 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
210 self
.assertEqual(res
.edns
, 0)
211 self
.assertEqual(len(res
.options
), 1)
212 self
.assertEqual(res
.options
[0].otype
, 15)
213 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(9, b
''))
215 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
216 _confdir
= 'AggressiveNSECCacheNSEC3'
220 def secureZone(cls
, confdir
, zonename
, key
=None):
221 zone
= '.' if zonename
== 'ROOT' else zonename
223 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
224 '--config-dir=%s' % confdir
,
228 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
229 with
open(keyfile
, 'w') as fdKeyfile
:
232 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
233 '--config-dir=%s' % confdir
,
240 print(' '.join(pdnsutilCmd
))
242 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
243 except subprocess
.CalledProcessError
as e
:
244 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
246 params
= "1 0 100 AABBCCDDEEFF112233"
248 if zone
== "optout.example":
249 params
= "1 1 100 AABBCCDDEEFF112233"
251 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
252 '--config-dir=%s' % confdir
,
257 print(' '.join(pdnsutilCmd
))
259 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
260 except subprocess
.CalledProcessError
as e
:
261 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
266 # first we query a nonexistent name, to get the needed NSEC3s in our cache
267 res
= self
.sendQuery('host2.secure.example.', 'TXT')
268 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
269 self
.assertAnswerEmpty(res
)
270 self
.assertAuthorityHasSOA(res
)
271 self
.assertMessageIsAuthenticated(res
)
273 # now we ask for a different name that is covered by the NSEC3s,
274 # we should generate the answer from the NSEC3s and no outgoing query should be made
275 nbQueries
= self
.getMetric('all-outqueries')
276 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
277 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
278 self
.assertAnswerEmpty(res
)
279 self
.assertAuthorityHasSOA(res
)
280 self
.assertMessageIsAuthenticated(res
)
281 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
282 self
.assertEqual(res
.edns
, 0)
283 self
.assertEqual(len(res
.options
), 1)
284 self
.assertEqual(res
.options
[0].otype
, 15)
285 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
287 def testWildcard(self
):
290 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
291 # but a type that does not exist
292 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
293 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
294 self
.assertAnswerEmpty(res
)
295 self
.assertAuthorityHasSOA(res
)
296 self
.assertMessageIsAuthenticated(res
)
298 # we query a nonexistent name, but for which a wildcard matches,
299 # to get the NSEC3 in our cache
300 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
301 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
302 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
303 self
.assertMatchingRRSIGInAnswer(res
, expected
)
304 self
.assertMessageIsAuthenticated(res
)
306 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
307 # and no outgoing query should be made
308 nbQueries
= self
.getMetric('all-outqueries')
309 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
310 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
311 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
312 self
.assertMatchingRRSIGInAnswer(res
, expected
)
313 self
.assertMessageIsAuthenticated(res
)
314 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
315 self
.assertEqual(res
.edns
, 0)
316 self
.assertEqual(len(res
.options
), 1)
317 self
.assertEqual(res
.options
[0].otype
, 15)
318 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
320 # now we ask for a type that does not exist at the wildcard
321 nbQueries
= self
.getMetric('all-outqueries')
322 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
323 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
324 self
.assertAnswerEmpty(res
)
325 self
.assertAuthorityHasSOA(res
)
326 self
.assertMessageIsAuthenticated(res
)
327 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
328 self
.assertEqual(res
.edns
, 0)
329 self
.assertEqual(len(res
.options
), 1)
330 self
.assertEqual(res
.options
[0].otype
, 15)
331 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
333 # we can also ask a different type, for a different name that is covered
334 # by the NSEC3s and matches the wildcard (but the type does not exist)
335 nbQueries
= self
.getMetric('all-outqueries')
336 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
337 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
338 self
.assertAnswerEmpty(res
)
339 self
.assertAuthorityHasSOA(res
)
340 self
.assertMessageIsAuthenticated(res
)
341 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
342 self
.assertEqual(res
.edns
, 0)
343 self
.assertEqual(len(res
.options
), 1)
344 self
.assertEqual(res
.options
[0].otype
, 15)
345 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(0, b
'Result synthesized from aggressive NSEC cache (RFC8198)'))
347 def test_OptOut(self
):
350 # query a name in an opt-out zone
351 res
= self
.sendQuery('ns2.optout.example.', 'A')
352 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
353 self
.assertAnswerEmpty(res
)
354 self
.assertAuthorityHasSOA(res
)
356 # check that we _do not_ use the aggressive NSEC cache
357 nbQueries
= self
.getMetric('all-outqueries')
358 res
= self
.sendQuery('ns3.optout.example.', 'A')
359 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
360 self
.assertAnswerEmpty(res
)
361 self
.assertAuthorityHasSOA(res
)
362 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
363 self
.assertEqual(res
.edns
, 0)
364 self
.assertEqual(len(res
.options
), 0)