import os
import requests
import subprocess
+import time
+import extendederrors
class AggressiveNSECCacheBase(RecursorTest):
__test__ = False
_wsTimeout = 10
_wsPassword = 'secretpassword'
_apiKey = 'secretapikey'
+ #_recursorStartupDelay = 4.0
_config_template = """
dnssec=validate
aggressive-nsec-cache-size=10000
webserver-address=127.0.0.1
webserver-password=%s
api-key=%s
+ devonly-regression-test-mode
+ extended-resolution-errors=yes
""" % (_wsPort, _wsPassword, _apiKey)
@classmethod
- def setUp(cls):
+ def wipe(cls):
confdir = os.path.join('configs', cls._confdir)
- cls.wipeRecursorCache(confdir)
+ # Only wipe examples, as wiping the root triggers root NS refreshes
+ cls.wipeRecursorCache(confdir, "example$")
def getMetric(self, name):
headers = {'x-api-key': self._apiKey}
self.assertTrue(False)
+ def testNoEDE(self):
+ # This isn't an aggresive cache check, but the strcuture is very similar to the others,
+ # so letys place it here.
+ # It test the issue that an intermediate EDE does not get reported with the final answer
+ # https://github.com/PowerDNS/pdns/pull/12694
+ self.wipe()
+
+ res = self.sendQuery('host1.sub.secure.example.', 'TXT')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertAnswerEmpty(res)
+ self.assertAuthorityHasSOA(res)
+ self.assertMessageIsAuthenticated(res)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 0)
+
+ res = self.sendQuery('host1.sub.secure.example.', 'A')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageIsAuthenticated(res)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 0)
+
def testNoData(self):
+ self.wipe()
- # first we query a non-existent type, to get the NSEC in our cache
+ # first we query a nonexistent type, to get the NSEC in our cache
entries = self.getMetric('aggressive-nsec-cache-entries')
res = self.sendQuery('host1.secure.example.', 'TXT')
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
_confdir = 'AggressiveNSECCacheNSEC'
# we can't use the same tests for NSEC and NSEC3 because the hashed NSEC3s
# do not deny the same names than the non-hashed NSECs do
def testNXD(self):
+ self.wipe()
- # first we query a non-existent name, to get the needed NSECs (name + widcard) in our cache
+ # first we query a nonexistent name, to get the needed NSECs (name + widcard) in our cache
entries = self.getMetric('aggressive-nsec-cache-entries')
hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
res = self.sendQuery('host2.secure.example.', 'TXT')
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
self.assertEqual(self.getMetric('aggressive-nsec-cache-entries'), entries)
self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
def testWildcard(self):
+ self.wipe()
- # first we query a non-existent name, but for which a wildcard matches,
+ # first we query a nonexistent name, but for which a wildcard matches,
# to get the NSEC in our cache
res = self.sendQuery('test1.wildcard.secure.example.', 'A')
expected = dns.rrset.from_text('test1.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-wc-hits'), hits)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
# now we ask for a type that does not exist at the wildcard
hits = self.getMetric('aggressive-nsec-cache-nsec-hits')
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
# we can also ask a different type, for a different name that is covered
# by the NSEC and matches the wildcard (but the type does not exist)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
self.assertGreater(self.getMetric('aggressive-nsec-cache-nsec-hits'), hits)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
def test_Bogus(self):
+ self.wipe()
+
+ # query a name in example to fill the aggressive negcache
+ res = self.sendQuery('example.', 'A')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertAnswerEmpty(res)
+ self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
+
# query a name in a Bogus zone
- entries = self.getMetric('aggressive-nsec-cache-entries')
res = self.sendQuery('ted1.bogus.example.', 'A')
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
self.assertAnswerEmpty(res)
self.assertAnswerEmpty(res)
self.assertAuthorityHasSOA(res)
self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
- # we will accept a NSEC for root, which is secure..
- self.assertEqual(entries + 1, self.getMetric('aggressive-nsec-cache-entries'))
+
+ # Check that we stil have one aggressive cache entry
+ self.assertEqual(1, self.getMetric('aggressive-nsec-cache-entries'))
+ print(res.options)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(9, b''))
class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
_confdir = 'AggressiveNSECCacheNSEC3'
raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))
def testNXD(self):
+ self.wipe()
- # first we query a non-existent name, to get the needed NSEC3s in our cache
+ # first we query a nonexistent name, to get the needed NSEC3s in our cache
res = self.sendQuery('host2.secure.example.', 'TXT')
self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
self.assertAnswerEmpty(res)
self.assertAuthorityHasSOA(res)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
def testWildcard(self):
+ self.wipe()
# first let's get the SOA and wildcard NSEC in our cache by asking a name that matches the wildcard
# but a type that does not exist
self.assertAuthorityHasSOA(res)
self.assertMessageIsAuthenticated(res)
- # we query a non-existent name, but for which a wildcard matches,
+ # we query a nonexistent name, but for which a wildcard matches,
# to get the NSEC3 in our cache
res = self.sendQuery('test5.wildcard.secure.example.', 'A')
expected = dns.rrset.from_text('test5.wildcard.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
self.assertMatchingRRSIGInAnswer(res, expected)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
# now we ask for a type that does not exist at the wildcard
nbQueries = self.getMetric('all-outqueries')
self.assertAuthorityHasSOA(res)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
# we can also ask a different type, for a different name that is covered
# by the NSEC3s and matches the wildcard (but the type does not exist)
self.assertAuthorityHasSOA(res)
self.assertMessageIsAuthenticated(res)
self.assertEqual(nbQueries, self.getMetric('all-outqueries'))
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 15)
+ self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
def test_OptOut(self):
+ self.wipe()
+
# query a name in an opt-out zone
res = self.sendQuery('ns2.optout.example.', 'A')
self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
self.assertAnswerEmpty(res)
self.assertAuthorityHasSOA(res)
self.assertGreater(self.getMetric('all-outqueries'), nbQueries)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 0)