]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
Merge pull request #11019 from omoerbeek/rec-regr-vs-libfaketime
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_AggressiveNSECCache.py
CommitLineData
dd54e182
RG
1import dns
2from recursortests import RecursorTest
3import os
4import requests
5import subprocess
6
7class AggressiveNSECCacheBase(RecursorTest):
8 __test__ = False
9 _wsPort = 8042
10 _wsTimeout = 2
11 _wsPassword = 'secretpassword'
12 _apiKey = 'secretapikey'
13 _config_template = """
14 dnssec=validate
15 aggressive-nsec-cache-size=10000
16 webserver=yes
17 webserver-port=%d
18 webserver-address=127.0.0.1
19 webserver-password=%s
20 api-key=%s
21 """ % (_wsPort, _wsPassword, _apiKey)
22
23 @classmethod
24 def setUp(cls):
25 confdir = os.path.join('configs', cls._confdir)
26 cls.wipeRecursorCache(confdir)
27
28 def getMetric(self, name):
29 headers = {'x-api-key': self._apiKey}
30 url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
31 r = requests.get(url, headers=headers, timeout=self._wsTimeout)
32 self.assertTrue(r)
4bfebc93 33 self.assertEqual(r.status_code, 200)
dd54e182
RG
34 self.assertTrue(r.json())
35 content = r.json()
36
37 for entry in content:
38 if entry['name'] == name:
39 return int(entry['value'])
40
41 self.assertTrue(False)
42
43 def testNoData(self):
44
45 # first we query a non-existent type, to get the NSEC in our cache
46 entries = self.getMetric('aggressive-nsec-cache-entries')
47 res = self.sendQuery('host1.secure.example.', 'TXT')
48 self.assertRcodeEqual(res, dns.rcode.NOERROR)
49 self.assertAnswerEmpty(res)
50 self.assertAuthorityHasSOA(res)
51 self.assertMessageIsAuthenticated(res)
52 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
53
54 # now we ask for a different type, we should generate the answer from the NSEC,
55 # and no outgoing query should be made
56 nbQueries = self.getMetric('all-outqueries')
57 entries = self.getMetric('aggressive-nsec-cache-entries')
58 res = self.sendQuery('host1.secure.example.', 'AAAA')
59 self.assertRcodeEqual(res, dns.rcode.NOERROR)
60 self.assertAnswerEmpty(res)
61 self.assertAuthorityHasSOA(res)
62 self.assertMessageIsAuthenticated(res)
4bfebc93
CH
63 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
64 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
dd54e182
RG
65
66class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
67 _confdir = 'AggressiveNSECCacheNSEC'
68 __test__ = True
69
70 # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
71 # do not deny the same names than the non-hashed NSECs do
72 def testNXD(self):
73
74 # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
75 entries = self.getMetric('aggressive-nsec-cache-entries')
76 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
77 res = self.sendQuery('host2.secure.example.', 'TXT')
78 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
79 self.assertAnswerEmpty(res)
80 self.assertAuthorityHasSOA(res)
81 self.assertMessageIsAuthenticated(res)
82 self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
4bfebc93 83 self.assertEqual(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
dd54e182
RG
84
85 # now we ask for a different name that is covered by the NSEC,
86 # we should generate the answer from the NSEC and no outgoing query should be made
87 nbQueries = self.getMetric('all-outqueries')
88 entries = self.getMetric('aggressive-nsec-cache-entries')
89 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
90 res = self.sendQuery('host3.secure.example.', 'AAAA')
91 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
92 self.assertAnswerEmpty(res)
93 self.assertAuthorityHasSOA(res)
94 self.assertMessageIsAuthenticated(res)
4bfebc93
CH
95 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
96 self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
dd54e182
RG
97 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
98
99 def testWildcard(self):
100
101 # first we query a non-existent name, but for which a wildcard matches,
102 # to get the NSEC in our cache
103 res = self.sendQuery('test1.wildcard.secure.example.', 'A')
104 expected = dns.rrset.from_text('test1.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
105 self.assertRcodeEqual(res, dns.rcode.NOERROR)
106 self.assertMatchingRRSIGInAnswer(res, expected)
107 self.assertMessageIsAuthenticated(res)
108
109 # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
110 # and no outgoing query should be made
111 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
112 nbQueries = self.getMetric('all-outqueries')
113 res = self.sendQuery('test2.wildcard.secure.example.', 'A')
114 expected = dns.rrset.from_text('test2.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
115 self.assertRcodeEqual(res, dns.rcode.NOERROR)
116 self.assertMatchingRRSIGInAnswer(res, expected)
117 self.assertMessageIsAuthenticated(res)
4bfebc93 118 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
119 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits)
120
121 # now we ask for a type that does not exist at the wildcard
122 hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
123 nbQueries = self.getMetric('all-outqueries')
124 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
125 self.assertRcodeEqual(res, dns.rcode.NOERROR)
126 self.assertAnswerEmpty(res)
127 self.assertAuthorityHasSOA(res)
128 self.assertMessageIsAuthenticated(res)
4bfebc93 129 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
130 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
131
132 # we can also ask a different type, for a different name that is covered
133 # by the NSEC and matches the wildcard (but the type does not exist)
134 hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
135 nbQueries = self.getMetric('all-outqueries')
136 res = self.sendQuery('test3.wildcard.secure.example.', 'TXT')
137 self.assertRcodeEqual(res, dns.rcode.NOERROR)
138 self.assertAnswerEmpty(res)
139 self.assertAuthorityHasSOA(res)
140 self.assertMessageIsAuthenticated(res)
4bfebc93 141 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
142 self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
143
144 def test_Bogus(self):
145 # query a name in a Bogus zone
146 entries = self.getMetric('aggressive-nsec-cache-entries')
147 res = self.sendQuery('ted1.bogus.example.', 'A')
148 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
149 self.assertAnswerEmpty(res)
150
151 # disable validation
152 msg = dns.message.make_query('ted1.bogus.example.', 'A', want_dnssec=True)
153 msg.flags |= dns.flags.CD
154
155 res = self.sendUDPQuery(msg)
156 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
157 self.assertAnswerEmpty(res)
158 self.assertAuthorityHasSOA(res)
159
160 # check that we _do not_ use the aggressive NSEC cache
161 nbQueries = self.getMetric('all-outqueries')
162 msg = dns.message.make_query('ted2.bogus.example.', 'A', want_dnssec=True)
163 msg.flags |= dns.flags.CD
164
165 res = self.sendUDPQuery(msg)
166 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
167 self.assertAnswerEmpty(res)
168 self.assertAuthorityHasSOA(res)
169 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
170 # we will accept a NSEC for root, which is secure..
4bfebc93 171 self.assertEqual(entries + 1, self.getMetric('aggressive-nsec-cache-entries'))
dd54e182
RG
172
173class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
174 _confdir = 'AggressiveNSECCacheNSEC3'
175 __test__ = True
176
177 @classmethod
178 def secureZone(cls, confdir, zonename, key=None):
179 zone = '.' if zonename == 'ROOT' else zonename
180 if not key:
181 pdnsutilCmd = [os.environ['PDNSUTIL'],
182 '--config-dir=%s' % confdir,
183 'secure-zone',
184 zone]
185 else:
186 keyfile = os.path.join(confdir, 'dnssec.key')
187 with open(keyfile, 'w') as fdKeyfile:
188 fdKeyfile.write(key)
189
190 pdnsutilCmd = [os.environ['PDNSUTIL'],
191 '--config-dir=%s' % confdir,
192 'import-zone-key',
193 zone,
194 keyfile,
195 'active',
196 'ksk']
197
198 print(' '.join(pdnsutilCmd))
199 try:
200 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
201 except subprocess.CalledProcessError as e:
202 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
203
204 params = "1 0 100 AABBCCDDEEFF112233"
205
206 if zone == "optout.example":
207 params = "1 1 100 AABBCCDDEEFF112233"
208
209 pdnsutilCmd = [os.environ['PDNSUTIL'],
210 '--config-dir=%s' % confdir,
211 'set-nsec3',
212 zone,
213 params]
214
215 print(' '.join(pdnsutilCmd))
216 try:
217 subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
218 except subprocess.CalledProcessError as e:
219 raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
220
221 def testNXD(self):
222
223 # first we query a non-existent name, to get the needed NSEC3s in our cache
224 res = self.sendQuery('host2.secure.example.', 'TXT')
225 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
226 self.assertAnswerEmpty(res)
227 self.assertAuthorityHasSOA(res)
228 self.assertMessageIsAuthenticated(res)
229
230 # now we ask for a different name that is covered by the NSEC3s,
231 # we should generate the answer from the NSEC3s and no outgoing query should be made
232 nbQueries = self.getMetric('all-outqueries')
233 res = self.sendQuery('host6.secure.example.', 'AAAA')
234 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
235 self.assertAnswerEmpty(res)
236 self.assertAuthorityHasSOA(res)
237 self.assertMessageIsAuthenticated(res)
4bfebc93 238 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
239
240 def testWildcard(self):
241
8a64a821
RG
242 # first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
243 # but a type that does not exist
244 res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
245 self.assertRcodeEqual(res, dns.rcode.NOERROR)
246 self.assertAnswerEmpty(res)
247 self.assertAuthorityHasSOA(res)
248 self.assertMessageIsAuthenticated(res)
249
250 # we query a non-existent name, but for which a wildcard matches,
dd54e182
RG
251 # to get the NSEC3 in our cache
252 res = self.sendQuery('test5.wildcard.secure.example.', 'A')
253 expected = dns.rrset.from_text('test5.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
254 self.assertRcodeEqual(res, dns.rcode.NOERROR)
255 self.assertMatchingRRSIGInAnswer(res, expected)
256 self.assertMessageIsAuthenticated(res)
257
258 # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
259 # and no outgoing query should be made
260 nbQueries = self.getMetric('all-outqueries')
261 res = self.sendQuery('test6.wildcard.secure.example.', 'A')
262 expected = dns.rrset.from_text('test6.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
263 self.assertRcodeEqual(res, dns.rcode.NOERROR)
264 self.assertMatchingRRSIGInAnswer(res, expected)
265 self.assertMessageIsAuthenticated(res)
4bfebc93 266 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
267
268 # now we ask for a type that does not exist at the wildcard
269 nbQueries = self.getMetric('all-outqueries')
270 res = self.sendQuery('test5.wildcard.secure.example.', 'AAAA')
271 self.assertRcodeEqual(res, dns.rcode.NOERROR)
272 self.assertAnswerEmpty(res)
273 self.assertAuthorityHasSOA(res)
274 self.assertMessageIsAuthenticated(res)
4bfebc93 275 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
276
277 # we can also ask a different type, for a different name that is covered
278 # by the NSEC3s and matches the wildcard (but the type does not exist)
279 nbQueries = self.getMetric('all-outqueries')
280 res = self.sendQuery('test6.wildcard.secure.example.', 'TXT')
281 self.assertRcodeEqual(res, dns.rcode.NOERROR)
282 self.assertAnswerEmpty(res)
283 self.assertAuthorityHasSOA(res)
284 self.assertMessageIsAuthenticated(res)
4bfebc93 285 self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
dd54e182
RG
286
287 def test_OptOut(self):
288 # query a name in an opt-out zone
289 res = self.sendQuery('ns2.optout.example.', 'A')
290 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
291 self.assertAnswerEmpty(res)
292 self.assertAuthorityHasSOA(res)
293
294 # check that we _do not_ use the aggressive NSEC cache
295 nbQueries = self.getMetric('all-outqueries')
296 res = self.sendQuery('ns3.optout.example.', 'A')
297 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
298 self.assertAnswerEmpty(res)
299 self.assertAuthorityHasSOA(res)
300 self.assertGreater(self.getMetric('all-outqueries'), nbQueries)