]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
Introducing TCounters
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_AggressiveNSECCache.py
1 import dns
2 from recursortests import RecursorTest
3 import os
4 import requests
5 import subprocess
6 import time
7
8 class AggressiveNSECCacheBase(RecursorTest):
9 __test__ = False
10 _wsPort = 8042
11 _wsTimeout = 10
12 _wsPassword = 'secretpassword'
13 _apiKey = 'secretapikey'
14 #_recursorStartupDelay = 4.0
15 _config_template = """
16 dnssec=validate
17 aggressive-nsec-cache-size=10000
18 webserver=yes
19 webserver-port=%d
20 webserver-address=127.0.0.1
21 webserver-password=%s
22 api-key=%s
23 devonly-regression-test-mode
24 """ % (_wsPort, _wsPassword, _apiKey)
25
26 @classmethod
27 def wipe(cls):
28 confdir = os.path.join('configs', cls._confdir)
29 # Only wipe examples, as wiping the root triggers root NS refreshes
30 cls.wipeRecursorCache(confdir, "example$")
31
32 def getMetric(self, name):
33 headers = {'x-api-key': self._apiKey}
34 url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
35 r = requests.get(url, headers=headers, timeout=self._wsTimeout)
36 self.assertTrue(r)
37 self.assertEqual(r.status_code, 200)
38 self.assertTrue(r.json())
39 content = r.json()
40
41 for entry in content:
42 if entry['name'] == name:
43 return int(entry['value'])
44
45 self.assertTrue(False)
46
47 def testNoData(self):
48 self.wipe()
49
50 # first we query a non-existent type, to get the NSEC in our cache
51 entries = self.getMetric('aggressive-nsec-cache-entries')
52 res = self.sendQuery('host1.secure.example.', 'TXT')
53 self.assertRcodeEqual(res, dns.rcode.NOERROR)
54 self.assertAnswerEmpty(res)
55 self.assertAuthorityHasSOA(res)
56 self.assertMessageIsAuthenticated(res)
57 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
58
59 # now we ask for a different type, we should generate the answer from the NSEC,
60 # and no outgoing query should be made
61 nbQueries = self.getMetric('all-outqueries')
62 entries = self.getMetric('aggressive-nsec-cache-entries')
63 res = self.sendQuery('host1.secure.example.', 'AAAA')
64 self.assertRcodeEqual(res, dns.rcode.NOERROR)
65 self.assertAnswerEmpty(res)
66 self.assertAuthorityHasSOA(res)
67 self.assertMessageIsAuthenticated(res)
68 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
69 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
70
71 class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
72 _confdir = 'AggressiveNSECCacheNSEC'
73 __test__ = True
74
75 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
76 # do not deny the same names than the non-hashed NSECs do
77 def testNXD(self):
78 self.wipe()
79
80 # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
81 entries = self.getMetric('aggressive-nsec-cache-entries')
82 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
83 res = self.sendQuery('host2.secure.example.', 'TXT')
84 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
85 self.assertAnswerEmpty(res)
86 self.assertAuthorityHasSOA(res)
87 self.assertMessageIsAuthenticated(res)
88 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
89 self.assertEqual(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
90
91 # now we ask for a different name that is covered by the NSEC,
92 # we should generate the answer from the NSEC and no outgoing query should be made
93 nbQueries = self.getMetric('all-outqueries')
94 entries = self.getMetric('aggressive-nsec-cache-entries')
95 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
96 res = self.sendQuery('host3.secure.example.', 'AAAA')
97 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
98 self.assertAnswerEmpty(res)
99 self.assertAuthorityHasSOA(res)
100 self.assertMessageIsAuthenticated(res)
101 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
102 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
103 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
104
105 def testWildcard(self):
106 self.wipe()
107
108 # first we query a non-existent name, but for which a wildcard matches,
109 # to get the NSEC in our cache
110 res = self.sendQuery('test1.wildcard.secure.example.', 'A')
111 expected = dns.rrset.from_text('test1.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
112 self.assertRcodeEqual(res, dns.rcode.NOERROR)
113 self.assertMatchingRRSIGInAnswer(res, expected)
114 self.assertMessageIsAuthenticated(res)
115
116 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
117 # and no outgoing query should be made
118 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
119 nbQueries = self.getMetric('all-outqueries')
120 res = self.sendQuery('test2.wildcard.secure.example.', 'A')
121 expected = dns.rrset.from_text('test2.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
122 self.assertRcodeEqual(res, dns.rcode.NOERROR)
123 self.assertMatchingRRSIGInAnswer(res, expected)
124 self.assertMessageIsAuthenticated(res)
125 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
126 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits)
127
128 # now we ask for a type that does not exist at the wildcard
129 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
130 nbQueries = self.getMetric('all-outqueries')
131 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
132 self.assertRcodeEqual(res, dns.rcode.NOERROR)
133 self.assertAnswerEmpty(res)
134 self.assertAuthorityHasSOA(res)
135 self.assertMessageIsAuthenticated(res)
136 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
137 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
138
139 # we can also ask a different type, for a different name that is covered
140 # by the NSEC and matches the wildcard (but the type does not exist)
141 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
142 nbQueries = self.getMetric('all-outqueries')
143 res = self.sendQuery('test3.wildcard.secure.example.', 'TXT')
144 self.assertRcodeEqual(res, dns.rcode.NOERROR)
145 self.assertAnswerEmpty(res)
146 self.assertAuthorityHasSOA(res)
147 self.assertMessageIsAuthenticated(res)
148 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
149 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
150
151 def test_Bogus(self):
152 self.wipe()
153
154 # query a name in example to fill the aggressive negcache
155 res = self.sendQuery('example.', 'A')
156 self.assertRcodeEqual(res, dns.rcode.NOERROR)
157 self.assertAnswerEmpty(res)
158 self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
159
160 # query a name in a Bogus zone
161 res = self.sendQuery('ted1.bogus.example.', 'A')
162 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
163 self.assertAnswerEmpty(res)
164
165 # disable validation
166 msg = dns.message.make_query('ted1.bogus.example.', 'A', want_dnssec=True)
167 msg.flags |= dns.flags.CD
168
169 res = self.sendUDPQuery(msg)
170 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
171 self.assertAnswerEmpty(res)
172 self.assertAuthorityHasSOA(res)
173
174 # check that we _do not_ use the aggressive NSEC cache
175 nbQueries = self.getMetric('all-outqueries')
176 msg = dns.message.make_query('ted2.bogus.example.', 'A', want_dnssec=True)
177 msg.flags |= dns.flags.CD
178
179 res = self.sendUDPQuery(msg)
180 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
181 self.assertAnswerEmpty(res)
182 self.assertAuthorityHasSOA(res)
183 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
184
185 # Check that we stil have one aggressive cache entry
186 self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
187
188 class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
189 _confdir = 'AggressiveNSECCacheNSEC3'
190 __test__ = True
191
192 @classmethod
193 def secureZone(cls, confdir, zonename, key=None):
194 zone = '.' if zonename == 'ROOT' else zonename
195 if not key:
196 pdnsutilCmd = [os.environ['PDNSUTIL'],
197 '--config-dir=%s' % confdir,
198 'secure-zone',
199 zone]
200 else:
201 keyfile = os.path.join(confdir, 'dnssec.key')
202 with open(keyfile, 'w') as fdKeyfile:
203 fdKeyfile.write(key)
204
205 pdnsutilCmd = [os.environ['PDNSUTIL'],
206 '--config-dir=%s' % confdir,
207 'import-zone-key',
208 zone,
209 keyfile,
210 'active',
211 'ksk']
212
213 print(' '.join(pdnsutilCmd))
214 try:
215 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
216 except subprocess.CalledProcessError as e:
217 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
218
219 params = "1 0 100 AABBCCDDEEFF112233"
220
221 if zone == "optout.example":
222 params = "1 1 100 AABBCCDDEEFF112233"
223
224 pdnsutilCmd = [os.environ['PDNSUTIL'],
225 '--config-dir=%s' % confdir,
226 'set-nsec3',
227 zone,
228 params]
229
230 print(' '.join(pdnsutilCmd))
231 try:
232 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
233 except subprocess.CalledProcessError as e:
234 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
235
236 def testNXD(self):
237 self.wipe()
238
239 # first we query a non-existent name, to get the needed NSEC3s in our cache
240 res = self.sendQuery('host2.secure.example.', 'TXT')
241 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
242 self.assertAnswerEmpty(res)
243 self.assertAuthorityHasSOA(res)
244 self.assertMessageIsAuthenticated(res)
245
246 # now we ask for a different name that is covered by the NSEC3s,
247 # we should generate the answer from the NSEC3s and no outgoing query should be made
248 nbQueries = self.getMetric('all-outqueries')
249 res = self.sendQuery('host6.secure.example.', 'AAAA')
250 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
251 self.assertAnswerEmpty(res)
252 self.assertAuthorityHasSOA(res)
253 self.assertMessageIsAuthenticated(res)
254 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
255
256 def testWildcard(self):
257 self.wipe()
258
259 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
260 # but a type that does not exist
261 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
262 self.assertRcodeEqual(res, dns.rcode.NOERROR)
263 self.assertAnswerEmpty(res)
264 self.assertAuthorityHasSOA(res)
265 self.assertMessageIsAuthenticated(res)
266
267 # we query a non-existent name, but for which a wildcard matches,
268 # to get the NSEC3 in our cache
269 res = self.sendQuery('test5.wildcard.secure.example.', 'A')
270 expected = dns.rrset.from_text('test5.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
271 self.assertRcodeEqual(res, dns.rcode.NOERROR)
272 self.assertMatchingRRSIGInAnswer(res, expected)
273 self.assertMessageIsAuthenticated(res)
274
275 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
276 # and no outgoing query should be made
277 nbQueries = self.getMetric('all-outqueries')
278 res = self.sendQuery('test6.wildcard.secure.example.', 'A')
279 expected = dns.rrset.from_text('test6.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
280 self.assertRcodeEqual(res, dns.rcode.NOERROR)
281 self.assertMatchingRRSIGInAnswer(res, expected)
282 self.assertMessageIsAuthenticated(res)
283 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
284
285 # now we ask for a type that does not exist at the wildcard
286 nbQueries = self.getMetric('all-outqueries')
287 res = self.sendQuery('test5.wildcard.secure.example.', 'AAAA')
288 self.assertRcodeEqual(res, dns.rcode.NOERROR)
289 self.assertAnswerEmpty(res)
290 self.assertAuthorityHasSOA(res)
291 self.assertMessageIsAuthenticated(res)
292 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
293
294 # we can also ask a different type, for a different name that is covered
295 # by the NSEC3s and matches the wildcard (but the type does not exist)
296 nbQueries = self.getMetric('all-outqueries')
297 res = self.sendQuery('test6.wildcard.secure.example.', 'TXT')
298 self.assertRcodeEqual(res, dns.rcode.NOERROR)
299 self.assertAnswerEmpty(res)
300 self.assertAuthorityHasSOA(res)
301 self.assertMessageIsAuthenticated(res)
302 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
303
304 def test_OptOut(self):
305 self.wipe()
306
307 # query a name in an opt-out zone
308 res = self.sendQuery('ns2.optout.example.', 'A')
309 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
310 self.assertAnswerEmpty(res)
311 self.assertAuthorityHasSOA(res)
312
313 # check that we _do not_ use the aggressive NSEC cache
314 nbQueries = self.getMetric('all-outqueries')
315 res = self.sendQuery('ns3.optout.example.', 'A')
316 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
317 self.assertAnswerEmpty(res)
318 self.assertAuthorityHasSOA(res)
319 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)