]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
2 from recursortests
import RecursorTest
7 class AggressiveNSECCacheBase(RecursorTest
):
11 _wsPassword
= 'secretpassword'
12 _apiKey
= 'secretapikey'
13 _config_template
= """
15 aggressive-nsec-cache-size=10000
18 webserver-address=127.0.0.1
21 """ % (_wsPort
, _wsPassword
, _apiKey
)
25 confdir
= os
.path
.join('configs', cls
._confdir
)
26 cls
.wipeRecursorCache(confdir
)
28 def getMetric(self
, name
):
29 headers
= {'x-api-key': self
._apiKey
}
30 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
31 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
33 self
.assertEqual(r
.status_code
, 200)
34 self
.assertTrue(r
.json())
38 if entry
['name'] == name
:
39 return int(entry
['value'])
41 self
.assertTrue(False)
45 # first we query a non-existent type, to get the NSEC in our cache
46 entries
= self
.getMetric('aggressive-nsec-cache-entries')
47 res
= self
.sendQuery('host1.secure.example.', 'TXT')
48 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
49 self
.assertAnswerEmpty(res
)
50 self
.assertAuthorityHasSOA(res
)
51 self
.assertMessageIsAuthenticated(res
)
52 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
54 # now we ask for a different type, we should generate the answer from the NSEC,
55 # and no outgoing query should be made
56 nbQueries
= self
.getMetric('all-outqueries')
57 entries
= self
.getMetric('aggressive-nsec-cache-entries')
58 res
= self
.sendQuery('host1.secure.example.', 'AAAA')
59 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
60 self
.assertAnswerEmpty(res
)
61 self
.assertAuthorityHasSOA(res
)
62 self
.assertMessageIsAuthenticated(res
)
63 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
64 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
66 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase
):
67 _confdir
= 'AggressiveNSECCacheNSEC'
70 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
71 # do not deny the same names than the non-hashed NSECs do
74 # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
75 entries
= self
.getMetric('aggressive-nsec-cache-entries')
76 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
77 res
= self
.sendQuery('host2.secure.example.', 'TXT')
78 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
79 self
.assertAnswerEmpty(res
)
80 self
.assertAuthorityHasSOA(res
)
81 self
.assertMessageIsAuthenticated(res
)
82 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
83 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
85 # now we ask for a different name that is covered by the NSEC,
86 # we should generate the answer from the NSEC and no outgoing query should be made
87 nbQueries
= self
.getMetric('all-outqueries')
88 entries
= self
.getMetric('aggressive-nsec-cache-entries')
89 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
90 res
= self
.sendQuery('host3.secure.example.', 'AAAA')
91 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
92 self
.assertAnswerEmpty(res
)
93 self
.assertAuthorityHasSOA(res
)
94 self
.assertMessageIsAuthenticated(res
)
95 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
96 self
.assertEqual(self
.getMetric('aggressive-nsec-cache-entries'), entries
)
97 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
99 def testWildcard(self
):
101 # first we query a non-existent name, but for which a wildcard matches,
102 # to get the NSEC in our cache
103 res
= self
.sendQuery('test1.wildcard.secure.example.', 'A')
104 expected
= dns
.rrset
.from_text('test1.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
105 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
106 self
.assertMatchingRRSIGInAnswer(res
, expected
)
107 self
.assertMessageIsAuthenticated(res
)
109 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
110 # and no outgoing query should be made
111 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
112 nbQueries
= self
.getMetric('all-outqueries')
113 res
= self
.sendQuery('test2.wildcard.secure.example.', 'A')
114 expected
= dns
.rrset
.from_text('test2.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
115 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
116 self
.assertMatchingRRSIGInAnswer(res
, expected
)
117 self
.assertMessageIsAuthenticated(res
)
118 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
119 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits
)
121 # now we ask for a type that does not exist at the wildcard
122 hits
= self
.getMetric('aggressive-nsec-cache-nsec-hits')
123 nbQueries
= self
.getMetric('all-outqueries')
124 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
125 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
126 self
.assertAnswerEmpty(res
)
127 self
.assertAuthorityHasSOA(res
)
128 self
.assertMessageIsAuthenticated(res
)
129 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
130 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
132 # we can also ask a different type, for a different name that is covered
133 # by the NSEC and matches the wildcard (but the type does not exist)
134 hits
= self
.getMetric('aggressive-nsec-cache-nsec-wc-hits')
135 nbQueries
= self
.getMetric('all-outqueries')
136 res
= self
.sendQuery('test3.wildcard.secure.example.', 'TXT')
137 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
138 self
.assertAnswerEmpty(res
)
139 self
.assertAuthorityHasSOA(res
)
140 self
.assertMessageIsAuthenticated(res
)
141 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
142 self
.assertGreater(self
.getMetric('aggressive-nsec-cache-nsec-hits'), hits
)
144 def test_Bogus(self
):
145 # query a name in a Bogus zone
146 entries
= self
.getMetric('aggressive-nsec-cache-entries')
147 res
= self
.sendQuery('ted1.bogus.example.', 'A')
148 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
149 self
.assertAnswerEmpty(res
)
152 msg
= dns
.message
.make_query('ted1.bogus.example.', 'A', want_dnssec
=True)
153 msg
.flags |
= dns
.flags
.CD
155 res
= self
.sendUDPQuery(msg
)
156 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
157 self
.assertAnswerEmpty(res
)
158 self
.assertAuthorityHasSOA(res
)
160 # check that we _do not_ use the aggressive NSEC cache
161 nbQueries
= self
.getMetric('all-outqueries')
162 msg
= dns
.message
.make_query('ted2.bogus.example.', 'A', want_dnssec
=True)
163 msg
.flags |
= dns
.flags
.CD
165 res
= self
.sendUDPQuery(msg
)
166 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
167 self
.assertAnswerEmpty(res
)
168 self
.assertAuthorityHasSOA(res
)
169 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)
170 # we will accept a NSEC for root, which is secure..
171 self
.assertEqual(entries
+ 1, self
.getMetric('aggressive-nsec-cache-entries'))
173 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase
):
174 _confdir
= 'AggressiveNSECCacheNSEC3'
178 def secureZone(cls
, confdir
, zonename
, key
=None):
179 zone
= '.' if zonename
== 'ROOT' else zonename
181 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
182 '--config-dir=%s' % confdir
,
186 keyfile
= os
.path
.join(confdir
, 'dnssec.key')
187 with
open(keyfile
, 'w') as fdKeyfile
:
190 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
191 '--config-dir=%s' % confdir
,
198 print(' '.join(pdnsutilCmd
))
200 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
201 except subprocess
.CalledProcessError
as e
:
202 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
204 params
= "1 0 100 AABBCCDDEEFF112233"
206 if zone
== "optout.example":
207 params
= "1 1 100 AABBCCDDEEFF112233"
209 pdnsutilCmd
= [os
.environ
['PDNSUTIL'],
210 '--config-dir=%s' % confdir
,
215 print(' '.join(pdnsutilCmd
))
217 subprocess
.check_output(pdnsutilCmd
, stderr
=subprocess
.STDOUT
)
218 except subprocess
.CalledProcessError
as e
:
219 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd
, e
.returncode
, e
.output
))
223 # first we query a non-existent name, to get the needed NSEC3s in our cache
224 res
= self
.sendQuery('host2.secure.example.', 'TXT')
225 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
226 self
.assertAnswerEmpty(res
)
227 self
.assertAuthorityHasSOA(res
)
228 self
.assertMessageIsAuthenticated(res
)
230 # now we ask for a different name that is covered by the NSEC3s,
231 # we should generate the answer from the NSEC3s and no outgoing query should be made
232 nbQueries
= self
.getMetric('all-outqueries')
233 res
= self
.sendQuery('host6.secure.example.', 'AAAA')
234 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
235 self
.assertAnswerEmpty(res
)
236 self
.assertAuthorityHasSOA(res
)
237 self
.assertMessageIsAuthenticated(res
)
238 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
240 def testWildcard(self
):
242 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
243 # but a type that does not exist
244 res
= self
.sendQuery('test1.wildcard.secure.example.', 'AAAA')
245 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
246 self
.assertAnswerEmpty(res
)
247 self
.assertAuthorityHasSOA(res
)
248 self
.assertMessageIsAuthenticated(res
)
250 # we query a non-existent name, but for which a wildcard matches,
251 # to get the NSEC3 in our cache
252 res
= self
.sendQuery('test5.wildcard.secure.example.', 'A')
253 expected
= dns
.rrset
.from_text('test5.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
254 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
255 self
.assertMatchingRRSIGInAnswer(res
, expected
)
256 self
.assertMessageIsAuthenticated(res
)
258 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
259 # and no outgoing query should be made
260 nbQueries
= self
.getMetric('all-outqueries')
261 res
= self
.sendQuery('test6.wildcard.secure.example.', 'A')
262 expected
= dns
.rrset
.from_text('test6.wildcard.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '{prefix}.10'.format(prefix
=self
._PREFIX
))
263 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
264 self
.assertMatchingRRSIGInAnswer(res
, expected
)
265 self
.assertMessageIsAuthenticated(res
)
266 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
268 # now we ask for a type that does not exist at the wildcard
269 nbQueries
= self
.getMetric('all-outqueries')
270 res
= self
.sendQuery('test5.wildcard.secure.example.', 'AAAA')
271 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
272 self
.assertAnswerEmpty(res
)
273 self
.assertAuthorityHasSOA(res
)
274 self
.assertMessageIsAuthenticated(res
)
275 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
277 # we can also ask a different type, for a different name that is covered
278 # by the NSEC3s and matches the wildcard (but the type does not exist)
279 nbQueries
= self
.getMetric('all-outqueries')
280 res
= self
.sendQuery('test6.wildcard.secure.example.', 'TXT')
281 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
282 self
.assertAnswerEmpty(res
)
283 self
.assertAuthorityHasSOA(res
)
284 self
.assertMessageIsAuthenticated(res
)
285 self
.assertEqual(nbQueries
, self
.getMetric('all-outqueries'))
287 def test_OptOut(self
):
288 # query a name in an opt-out zone
289 res
= self
.sendQuery('ns2.optout.example.', 'A')
290 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
291 self
.assertAnswerEmpty(res
)
292 self
.assertAuthorityHasSOA(res
)
294 # check that we _do not_ use the aggressive NSEC cache
295 nbQueries
= self
.getMetric('all-outqueries')
296 res
= self
.sendQuery('ns3.optout.example.', 'A')
297 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
298 self
.assertAnswerEmpty(res
)
299 self
.assertAuthorityHasSOA(res
)
300 self
.assertGreater(self
.getMetric('all-outqueries'), nbQueries
)