]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Add regression tests for the aggressive NSEC cache
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 24 Feb 2021 10:12:46 +0000 (11:12 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 24 Feb 2021 10:12:46 +0000 (11:12 +0100)
regression-tests.recursor-dnssec/test_AggressiveNSECCache.py [new file with mode: 0644]

diff --git a/regression-tests.recursor-dnssec/test_AggressiveNSECCache.py b/regression-tests.recursor-dnssec/test_AggressiveNSECCache.py
new file mode 100644 (file)
index 0000000..c114c9e
--- /dev/null
@@ -0,0 +1,292 @@
+import dns
+from recursortests import RecursorTest
+import os
+import requests
+import subprocess
+
+class AggressiveNSECCacheBase(RecursorTest):
+    __test__ = False
+    _wsPort = 8042
+    _wsTimeout = 2
+    _wsPassword = 'secretpassword'
+    _apiKey = 'secretapikey'
+    _config_template = """
+    dnssec=validate
+    aggressive-nsec-cache-size=10000
+    webserver=yes
+    webserver-port=%d
+    webserver-address=127.0.0.1
+    webserver-password=%s
+    api-key=%s
+    """ % (_wsPort, _wsPassword, _apiKey)
+
+    @classmethod
+    def setUp(cls):
+        confdir = os.path.join('configs', cls._confdir)
+        cls.wipeRecursorCache(confdir)
+
+    def getMetric(self, name):
+        headers = {'x-api-key': self._apiKey}
+        url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
+        r = requests.get(url, headers=headers, timeout=self._wsTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+        self.assertTrue(r.json())
+        content = r.json()
+
+        for entry in content:
+            if entry['name'] == name:
+                return int(entry['value'])
+
+        self.assertTrue(False)
+
+    def testNoData(self):
+
+        # first we query a non-existent type, to get the NSEC in our cache
+        entries = self.getMetric('aggressive-nsec-cache-entries')
+        res = self.sendQuery('host1.secure.example.', 'TXT')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
+
+        # now we ask for a different type, we should generate the answer from the NSEC,
+        # and no outgoing query should be made
+        nbQueries = self.getMetric('all-outqueries')
+        entries = self.getMetric('aggressive-nsec-cache-entries')
+        res = self.sendQuery('host1.secure.example.', 'AAAA')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+        self.assertEquals(self.getMetric('aggressive-nsec-cache-entries'), entries)
+
+class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
+    _confdir = 'AggressiveNSECCacheNSEC'
+    __test__ = True
+
+    # we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
+    # do not deny the same names than the non-hashed NSECs do
+    def testNXD(self):
+
+        # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
+        entries = self.getMetric('aggressive-nsec-cache-entries')
+        hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
+        res = self.sendQuery('host2.secure.example.', 'TXT')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-entries'), entries)
+        self.assertEquals(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+
+        # now we ask for a different name that is covered by the NSEC,
+        # we should generate the answer from the NSEC and no outgoing query should be made
+        nbQueries = self.getMetric('all-outqueries')
+        entries = self.getMetric('aggressive-nsec-cache-entries')
+        hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
+        res = self.sendQuery('host3.secure.example.', 'AAAA')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+        self.assertEquals(self.getMetric('aggressive-nsec-cache-entries'), entries)
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+
+    def testWildcard(self):
+
+        # first we query a non-existent name, but for which a wildcard matches,
+        # to get the NSEC in our cache
+        res = self.sendQuery('test1.wildcard.secure.example.', 'A')
+        expected = dns.rrset.from_text('test1.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertMatchingRRSIGInAnswer(res, expected)
+        self.assertMessageIsAuthenticated(res)
+
+        # now we ask for a different name, we should generate the answer from the NSEC and the wildcard,
+        # and no outgoing query should be made
+        hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test2.wildcard.secure.example.', 'A')
+        expected = dns.rrset.from_text('test2.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertMatchingRRSIGInAnswer(res, expected)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits)
+
+        # now we ask for a type that does not exist at the wildcard
+        hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test1.wildcard.secure.example.', 'AAAA')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+
+        # we can also ask a different type, for a different name that is covered
+        # by the NSEC and matches the wildcard (but the type does not exist)
+        hits = self.getMetric('aggressive-nsec-cache-nsec-wc-hits')
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test3.wildcard.secure.example.', 'TXT')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+        self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+
+    def test_Bogus(self):
+        # query a name in a Bogus zone
+        entries = self.getMetric('aggressive-nsec-cache-entries')
+        res = self.sendQuery('ted1.bogus.example.', 'A')
+        self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+        self.assertAnswerEmpty(res)
+
+        # disable validation
+        msg = dns.message.make_query('ted1.bogus.example.', 'A', want_dnssec=True)
+        msg.flags |= dns.flags.CD
+
+        res = self.sendUDPQuery(msg)
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+
+        # check that we _do not_ use the aggressive NSEC cache
+        nbQueries = self.getMetric('all-outqueries')
+        msg = dns.message.make_query('ted2.bogus.example.', 'A', want_dnssec=True)
+        msg.flags |= dns.flags.CD
+
+        res = self.sendUDPQuery(msg)
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
+        # we will accept a NSEC for root, which is secure..
+        self.assertEquals(entries + 1, self.getMetric('aggressive-nsec-cache-entries'))
+
+class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
+    _confdir = 'AggressiveNSECCacheNSEC3'
+    __test__ = True
+
+    @classmethod
+    def secureZone(cls, confdir, zonename, key=None):
+        zone = '.' if zonename == 'ROOT' else zonename
+        if not key:
+            pdnsutilCmd = [os.environ['PDNSUTIL'],
+                           '--config-dir=%s' % confdir,
+                           'secure-zone',
+                           zone]
+        else:
+            keyfile = os.path.join(confdir, 'dnssec.key')
+            with open(keyfile, 'w') as fdKeyfile:
+                fdKeyfile.write(key)
+
+            pdnsutilCmd = [os.environ['PDNSUTIL'],
+                           '--config-dir=%s' % confdir,
+                           'import-zone-key',
+                           zone,
+                           keyfile,
+                           'active',
+                           'ksk']
+
+        print(' '.join(pdnsutilCmd))
+        try:
+            subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as e:
+            raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
+
+        params = "1 0 100 AABBCCDDEEFF112233"
+
+        if zone == "optout.example":
+            params = "1 1 100 AABBCCDDEEFF112233"
+
+        pdnsutilCmd = [os.environ['PDNSUTIL'],
+                       '--config-dir=%s' % confdir,
+                       'set-nsec3',
+                       zone,
+                       params]
+
+        print(' '.join(pdnsutilCmd))
+        try:
+            subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as e:
+            raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
+
+    def testNXD(self):
+
+        # first we query a non-existent name, to get the needed NSEC3s in our cache
+        res = self.sendQuery('host2.secure.example.', 'TXT')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+
+        # now we ask for a different name that is covered by the NSEC3s,
+        # we should generate the answer from the NSEC3s and no outgoing query should be made
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('host6.secure.example.', 'AAAA')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+
+    def testWildcard(self):
+
+        # first we query a non-existent name, but for which a wildcard matches,
+        # to get the NSEC3 in our cache
+        res = self.sendQuery('test5.wildcard.secure.example.', 'A')
+        expected = dns.rrset.from_text('test5.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertMatchingRRSIGInAnswer(res, expected)
+        self.assertMessageIsAuthenticated(res)
+
+        # now we ask for a different name, we should generate the answer from the NSEC3s and the wildcard,
+        # and no outgoing query should be made
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test6.wildcard.secure.example.', 'A')
+        expected = dns.rrset.from_text('test6.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertMatchingRRSIGInAnswer(res, expected)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+
+        # now we ask for a type that does not exist at the wildcard
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test5.wildcard.secure.example.', 'AAAA')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+
+        # we can also ask a different type, for a different name that is covered
+        # by the NSEC3s and matches the wildcard (but the type does not exist)
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('test6.wildcard.secure.example.', 'TXT')
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertMessageIsAuthenticated(res)
+        self.assertEquals(nbQueries, self.getMetric('all-outqueries'))
+
+    def test_OptOut(self):
+        # query a name in an opt-out zone
+        res = self.sendQuery('ns2.optout.example.', 'A')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+
+        # check that we _do not_ use the aggressive NSEC cache
+        nbQueries = self.getMetric('all-outqueries')
+        res = self.sendQuery('ns3.optout.example.', 'A')
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertAnswerEmpty(res)
+        self.assertAuthorityHasSOA(res)
+        self.assertGreater(self.getMetric('all-outqueries'), nbQueries)