]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
8 class AggressiveNSECCacheBase(RecursorTest
):
12 _wsPassword
= 'secretpassword'
13 _apiKey
= 'secretapikey'
14 #_recursorStartupDelay = 4.0
15 _config_template
= """
17 aggressive-nsec-cache-size=10000
20 webserver-address=127.0.0.1
23 devonly-regression-test-mode
24 """ % (_wsPort
, _wsPassword
, _apiKey
)
28 confdir
= os
.path
.join('configs', cls
._confdir
)
29 # Only wipe examples, as wiping the root triggers root NS refreshes
30 cls
.wipeRecursorCache(confdir
, "example$")
32 def getMetric(self
, name
):
33 headers
= {'x-api-key': self
._apiKey
}
34 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
35 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
37 self
.assertEqual(r
.status_code
, 200)
38 self
.assertTrue(r
.json())
42 if entry
['name'] == name
:
43 return int(entry
['value'])
45 self
.assertTrue(False)
50 # first we query a non-existent type, to get the NSEC in our cache
51 entries
= self
.getMetric('aggressive-nsec-cache-entries')
52 res
= self
.sendQuery('host1.secure.example.', 'TXT')
53 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
54 self
.assertAnswerEmpty(res
)
55 self
.assertAuthorityHasSOA(res
)
56 self
.assertMessageIsAuthenticated(res
)
57 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
59 # now we ask for a different type, we should generate the answer from the NSEC,
60 # and no outgoing query should be made
61 nbQueries
= self
.getMetric('all-outqueries')
62 entries
= self
.getMetric('aggressive-nsec-cache-entries')
63 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
64 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
65 self
.assertAnswerEmpty(res
)
66 self
.assertAuthorityHasSOA(res
)
67 self
.assertMessageIsAuthenticated(res
)
68 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
69 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
71 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
72 _confdir
= 'AggressiveNSECCacheNSEC'
75 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
76 # do not deny the same names than the non-hashed NSECs do
80 # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
81 entries
= self
.getMetric('aggressive-nsec-cache-entries')
82 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
83 res
= self
.sendQuery('host2.secure.example.', 'TXT')
84 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
85 self
.assertAnswerEmpty(res
)
86 self
.assertAuthorityHasSOA(res
)
87 self
.assertMessageIsAuthenticated(res
)
88 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
89 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
91 # now we ask for a different name that is covered by the NSEC,
92 # we should generate the answer from the NSEC and no outgoing query should be made
93 nbQueries
= self
.getMetric('all-outqueries')
94 entries
= self
.getMetric('aggressive-nsec-cache-entries')
95 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
96 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
97 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
98 self
.assertAnswerEmpty(res
)
99 self
.assertAuthorityHasSOA(res
)
100 self
.assertMessageIsAuthenticated(res
)
101 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
102 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
103 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
105 def testWildcard(self
):
108 # first we query a non-existent name, but for which a wildcard matches,
109 # to get the NSEC in our cache
110 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
111 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
112 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
113 self
.assertMatchingRRSIGInAnswer(res
, expected
)
114 self
.assertMessageIsAuthenticated(res
)
116 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
117 # and no outgoing query should be made
118 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
119 nbQueries
= self
.getMetric('all-outqueries')
120 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
121 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
122 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
123 self
.assertMatchingRRSIGInAnswer(res
, expected
)
124 self
.assertMessageIsAuthenticated(res
)
125 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
126 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
128 # now we ask for a type that does not exist at the wildcard
129 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
130 nbQueries
= self
.getMetric('all-outqueries')
131 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
132 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
133 self
.assertAnswerEmpty(res
)
134 self
.assertAuthorityHasSOA(res
)
135 self
.assertMessageIsAuthenticated(res
)
136 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
137 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
139 # we can also ask a different type, for a different name that is covered
140 # by the NSEC and matches the wildcard (but the type does not exist)
141 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
142 nbQueries
= self
.getMetric('all-outqueries')
143 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
144 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
145 self
.assertAnswerEmpty(res
)
146 self
.assertAuthorityHasSOA(res
)
147 self
.assertMessageIsAuthenticated(res
)
148 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
149 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
151 def test_Bogus(self
):
154 # query a name in example to fill the aggressive negcache
155 res
= self
.sendQuery('example.', 'A')
156 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
157 self
.assertAnswerEmpty(res
)
158 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
160 # query a name in a Bogus zone
161 res
= self
.sendQuery('ted1.bogus.example.', 'A')
162 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
163 self
.assertAnswerEmpty(res
)
166 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
167 msg
.flags |
= dns
.flags
.CD
169 res
= self
.sendUDPQuery(msg
)
170 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
171 self
.assertAnswerEmpty(res
)
172 self
.assertAuthorityHasSOA(res
)
174 # check that we _do not_ use the aggressive NSEC cache
175 nbQueries
= self
.getMetric('all-outqueries')
176 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
177 msg
.flags |
= dns
.flags
.CD
179 res
= self
.sendUDPQuery(msg
)
180 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
181 self
.assertAnswerEmpty(res
)
182 self
.assertAuthorityHasSOA(res
)
183 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
185 # Check that we stil have one aggressive cache entry
186 self
.assertEqual(1, self
.getMetric('aggressive-nsec-cache-entries'))
188 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
189 _confdir
= 'AggressiveNSECCacheNSEC3'
193 def secureZone(cls
, confdir
, zonename
, key
=None):
194 zone
= '.' if zonename
== 'ROOT' else zonename
196 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
197 '--config-dir=%s' % confdir
,
201 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
202 with
open(keyfile
, 'w') as fdKeyfile
:
205 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
206 '--config-dir=%s' % confdir
,
213 print(' '.join(pdnsutilCmd
))
215 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
216 except subprocess
.CalledProcessError
as e
:
217 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
219 params
= "1 0 100 AABBCCDDEEFF112233"
221 if zone
== "optout.example":
222 params
= "1 1 100 AABBCCDDEEFF112233"
224 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
225 '--config-dir=%s' % confdir
,
230 print(' '.join(pdnsutilCmd
))
232 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
233 except subprocess
.CalledProcessError
as e
:
234 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
239 # first we query a non-existent name, to get the needed NSEC3s in our cache
240 res
= self
.sendQuery('host2.secure.example.', 'TXT')
241 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
242 self
.assertAnswerEmpty(res
)
243 self
.assertAuthorityHasSOA(res
)
244 self
.assertMessageIsAuthenticated(res
)
246 # now we ask for a different name that is covered by the NSEC3s,
247 # we should generate the answer from the NSEC3s and no outgoing query should be made
248 nbQueries
= self
.getMetric('all-outqueries')
249 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
250 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
251 self
.assertAnswerEmpty(res
)
252 self
.assertAuthorityHasSOA(res
)
253 self
.assertMessageIsAuthenticated(res
)
254 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
256 def testWildcard(self
):
259 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
260 # but a type that does not exist
261 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
262 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
263 self
.assertAnswerEmpty(res
)
264 self
.assertAuthorityHasSOA(res
)
265 self
.assertMessageIsAuthenticated(res
)
267 # we query a non-existent name, but for which a wildcard matches,
268 # to get the NSEC3 in our cache
269 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
270 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
271 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
272 self
.assertMatchingRRSIGInAnswer(res
, expected
)
273 self
.assertMessageIsAuthenticated(res
)
275 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
276 # and no outgoing query should be made
277 nbQueries
= self
.getMetric('all-outqueries')
278 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
279 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
280 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
281 self
.assertMatchingRRSIGInAnswer(res
, expected
)
282 self
.assertMessageIsAuthenticated(res
)
283 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
285 # now we ask for a type that does not exist at the wildcard
286 nbQueries
= self
.getMetric('all-outqueries')
287 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
288 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
289 self
.assertAnswerEmpty(res
)
290 self
.assertAuthorityHasSOA(res
)
291 self
.assertMessageIsAuthenticated(res
)
292 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
294 # we can also ask a different type, for a different name that is covered
295 # by the NSEC3s and matches the wildcard (but the type does not exist)
296 nbQueries
= self
.getMetric('all-outqueries')
297 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
298 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
299 self
.assertAnswerEmpty(res
)
300 self
.assertAuthorityHasSOA(res
)
301 self
.assertMessageIsAuthenticated(res
)
302 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
304 def test_OptOut(self
):
307 # query a name in an opt-out zone
308 res
= self
.sendQuery('ns2.optout.example.', 'A')
309 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
310 self
.assertAnswerEmpty(res
)
311 self
.assertAuthorityHasSOA(res
)
313 # check that we _do not_ use the aggressive NSEC cache
314 nbQueries
= self
.getMetric('all-outqueries')
315 res
= self
.sendQuery('ns3.optout.example.', 'A')
316 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
317 self
.assertAnswerEmpty(res
)
318 self
.assertAuthorityHasSOA(res
)
319 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)