secure.example. 3600 IN SOA {soa}
secure.example. 3600 IN NS ns.secure.example.
ns.secure.example. 3600 IN A {prefix}.9
+secure.example. 3600 IN MX 10 mx1.secure.example.
+secure.example. 3600 IN MX 20 mx2.secure.example.
+
+naptr.secure.example. 60 IN NAPTR 10 10 "a" "X" "A" s1.secure.example.
+naptr.secure.example. 60 IN NAPTR 10 10 "s" "Y" "B" service1.secure.example.
+naptr.secure.example. 60 IN NAPTR 10 10 "s" "Z" "C" service2.secure.example.
+service1.secure.example. 60 IN SRV 20 100 8080 a.secure.example.
+service2.secure.example. 60 IN SRV 20 100 8080 b.secure.example.
+
secure.example. 3600 IN A 192.0.2.17
+mx1.secure.example. 3600 IN A 192.0.2.18
+mx2.secure.example. 3600 IN AAAA 1::2
+s1.secure.example. 3600 IN A 192.0.2.19
+a.secure.example. 3600 IN A 192.0.2.20
+a.secure.example. 3600 IN A 192.0.2.22
+b.secure.example. 3600 IN A 192.0.2.21
+b.secure.example. 3600 IN AAAA 1::3
host1.secure.example. 3600 IN A 192.0.2.2
cname.secure.example. 3600 IN CNAME host1.secure.example.
if not found:
raise AssertionError("RRset not found in answer\n\n%s" % ret)
+ def assertRRsetInAdditional(self, msg, rrset):
+ """Asserts the rrset (without comparing TTL) exists in the
+ additional section of msg
+
+ @param msg: the dns.message.Message to check
+ @param rrset: a dns.rrset.RRset object"""
+
+ ret = ''
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ if not isinstance(rrset, dns.rrset.RRset):
+ raise TypeError("rrset is not a dns.rrset.RRset")
+
+ found = False
+ for ans in msg.additional:
+ ret += "%s\n" % ans.to_text()
+ if ans.match(rrset.name, rrset.rdclass, rrset.rdtype, 0, None):
+ self.assertEqual(ans, rrset, "'%s' != '%s'" % (ans.to_text(), rrset.to_text()))
+ found = True
+
+ if not found:
+ raise AssertionError("RRset not found in additional section\n\n%s" % ret)
+
def assertMatchingRRSIGInAnswer(self, msg, coveredRRset, keys=None):
"""Looks for coveredRRset in the answer section and if there is an RRSIG RRset
that covers that RRset. If keys is not None, this function will also try to
except dns.dnssec.ValidationFailure as e:
raise AssertionError("Signature validation failed for %s:\n%s" % (msg.question[0].to_text(), e))
+ def assertMatchingRRSIGInAdditional(self, msg, coveredRRset, keys=None):
+ """Looks for coveredRRset in the additional section and if there is an RRSIG RRset
+ that covers that RRset. If keys is not None, this function will also try to
+ validate the RRset against the RRSIG
+
+ @param msg: The dns.message.Message to check
+ @param coveredRRset: The RRSet to check for
+ @param keys: a dictionary keyed by dns.name.Name with node or rdataset values to use for validation"""
+
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ if not isinstance(coveredRRset, dns.rrset.RRset):
+ raise TypeError("coveredRRset is not a dns.rrset.RRset")
+
+ msgRRsigRRSet = None
+ msgRRSet = None
+
+ ret = ''
+ for ans in msg.additional:
+ ret += ans.to_text() + "\n"
+
+ if ans.match(coveredRRset.name, coveredRRset.rdclass, coveredRRset.rdtype, 0, None):
+ msgRRSet = ans
+ if ans.match(coveredRRset.name, dns.rdataclass.IN, dns.rdatatype.RRSIG, coveredRRset.rdtype, None):
+ msgRRsigRRSet = ans
+ if msgRRSet and msgRRsigRRSet:
+ break
+
+ if not msgRRSet:
+ raise AssertionError("RRset for '%s' not found in additional" % msg.question[0].to_text())
+
+ if not msgRRsigRRSet:
+ raise AssertionError("No RRSIGs found in additional for %s:\nFull answer:\n%s" % (msg.question[0].to_text(), ret))
+
+ if keys:
+ try:
+ dns.dnssec.validate(msgRRSet, msgRRsigRRSet.to_rdataset(), keys)
+ except dns.dnssec.ValidationFailure as e:
+ raise AssertionError("Signature validation failed for %s:\n%s" % (msg.question[0].to_text(), e))
+
def assertNoRRSIGsInAnswer(self, msg):
"""Checks if there are _no_ RRSIGs in the answer section of msg"""
def assertAnswerEmpty(self, msg):
self.assertTrue(len(msg.answer) == 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
+ def assertAdditionalEmpty(self, msg):
+ self.assertTrue(len(msg.additional) == 0, "Data found in the the additional section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.additional])))
+
def assertRcodeEqual(self, msg, rcode):
if not isinstance(msg, dns.message.Message):
raise TypeError("msg is not a dns.message.Message but a %s" % type(msg))
--- /dev/null
+import dns
+import os
+from recursortests import RecursorTest
+
+class testAdditionalsDefault(RecursorTest):
+ _confdir = 'AdditionalsDefault'
+
+ _config_template = """
+ dnssec=validate
+ disable-packetcache
+ """
+ _lua_config_file = """
+ addAllowedAdditionalQType(pdns.MX, {pdns.A, pdns.AAAA})
+ """
+
+ def testMX(self):
+ expected = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+ adds1 = dns.rrset.from_text('mx1.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
+ adds2 = dns.rrset.from_text('mx2.secure.example.', 0, dns.rdataclass.IN, 'AAAA', '1::2')
+ query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
+ query1.flags |= dns.flags.AD
+ query2 = dns.message.make_query('mx1.secure.example', 'A', want_dnssec=True)
+ query2.flags |= dns.flags.AD
+ query3 = dns.message.make_query('mx2.secure.example', 'AAAA', want_dnssec=True)
+ query3.flags |= dns.flags.AD
+
+ res = self.sendUDPQuery(query1)
+ self.assertMessageIsAuthenticated(res)
+ self.assertRRsetInAnswer(res, expected)
+ self.assertMatchingRRSIGInAnswer(res, expected)
+ self.assertAdditionalEmpty(res)
+ # fill the cache
+ res = self.sendUDPQuery(query2)
+ res = self.sendUDPQuery(query3)
+ # query 1 again
+ res = self.sendUDPQuery(query1)
+ self.assertMessageIsAuthenticated(res)
+ self.assertRRsetInAnswer(res, expected)
+ self.assertMatchingRRSIGInAnswer(res, expected)
+ self.assertRRsetInAdditional(res, adds1)
+ self.assertRRsetInAdditional(res, adds2)
+
+class testAdditionalsResolveImmediately(RecursorTest):
+ _confdir = 'AdditionalsResolveImmediately'
+ _config_template = """
+ dnssec=validate
+ disable-packetcache
+ """
+ _lua_config_file = """
+ addAllowedAdditionalQType(pdns.MX, {pdns.A, pdns.AAAA}, { mode = "ResolveImmediately"})
+ addAllowedAdditionalQType(pdns.NAPTR, {pdns.A, pdns.AAAA, pdns.SRV}, { mode = "ResolveImmediately"})
+ addAllowedAdditionalQType(pdns.SRV, {pdns.A, pdns.AAAA}, { mode = "ResolveImmediately"})
+ """
+
+ def testMX(self):
+ expected = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+ adds1 = dns.rrset.from_text('mx1.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
+ adds2 = dns.rrset.from_text('mx2.secure.example.', 0, dns.rdataclass.IN, 'AAAA', '1::2')
+ query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
+ query1.flags |= dns.flags.AD
+
+ res = self.sendUDPQuery(query1)
+ self.assertMessageIsAuthenticated(res)
+ self.assertRRsetInAnswer(res, expected)
+ self.assertMatchingRRSIGInAnswer(res, expected)
+ self.assertRRsetInAdditional(res, adds1)
+ self.assertRRsetInAdditional(res, adds2)
+ self.assertMatchingRRSIGInAdditional(res, adds1)
+ self.assertMatchingRRSIGInAdditional(res, adds2)
+
+ def testNAPTR(self):
+ exp = dns.rrset.from_text('naptr.secure.example.', 0, dns.rdataclass.IN, 'NAPTR',
+ '10 10 "s" "Z" "C" service2.secure.example.',
+ '10 10 "s" "Y" "B" service1.secure.example.',
+ '10 10 "a" "X" "A" s1.secure.example.');
+ adds1 = dns.rrset.from_text('s1.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.19')
+ adds2 = dns.rrset.from_text('service1.secure.example.', 0, dns.rdataclass.IN, 'SRV', '20 100 8080 a.secure.example.')
+ adds3 = dns.rrset.from_text('service2.secure.example.', 0, dns.rdataclass.IN, 'SRV', '20 100 8080 b.secure.example.')
+ adds4 = dns.rrset.from_text('a.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.20', '192.0.2.22')
+ adds5 = dns.rrset.from_text('b.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.21')
+ adds6 = dns.rrset.from_text('b.secure.example.', 0, dns.rdataclass.IN, 'AAAA', '1::3')
+ adds7 = dns.rrset.from_text('s1.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.19')
+
+ query1 = dns.message.make_query('naptr.secure.example', 'NAPTR', want_dnssec=True)
+ query1.flags |= dns.flags.AD
+ res = self.sendUDPQuery(query1)
+ self.assertMessageIsAuthenticated(res)
+ self.assertRRsetInAnswer(res, exp)
+ self.assertMatchingRRSIGInAnswer(res, exp)
+ self.assertRRsetInAdditional(res, adds1)
+ self.assertMatchingRRSIGInAdditional(res, adds1)
+ self.assertRRsetInAdditional(res, adds2)
+ self.assertMatchingRRSIGInAdditional(res, adds2)
+ self.assertRRsetInAdditional(res, adds3)
+ self.assertMatchingRRSIGInAdditional(res, adds3)
+ self.assertRRsetInAdditional(res, adds4)
+ self.assertMatchingRRSIGInAdditional(res, adds4)
+ self.assertRRsetInAdditional(res, adds5)
+ self.assertMatchingRRSIGInAdditional(res, adds5)
+ self.assertRRsetInAdditional(res, adds6)
+ self.assertMatchingRRSIGInAdditional(res, adds6)
+ self.assertRRsetInAdditional(res, adds7)
+ self.assertMatchingRRSIGInAdditional(res, adds7)
+
+class testAdditionalsResolveCacheOnly(RecursorTest):
+ _confdir = 'AdditionalsResolveCacheOnly'
+ _config_template = """
+ dnssec=validate
+ disable-packetcache
+ """
+ _lua_config_file = """
+ addAllowedAdditionalQType(pdns.MX, {pdns.A, pdns.AAAA}, { mode = "ResolveImmediately"})
+ """
+
+ def testMX(self):
+ expected = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+ adds1 = dns.rrset.from_text('mx1.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
+ adds2 = dns.rrset.from_text('mx2.secure.example.', 0, dns.rdataclass.IN, 'AAAA', '1::2')
+ query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
+ query1.flags |= dns.flags.AD
+
+ res = self.sendUDPQuery(query1)
+ self.assertMessageIsAuthenticated(res)
+ self.assertRRsetInAnswer(res, expected)
+ self.assertMatchingRRSIGInAnswer(res, expected)
+ self.assertRRsetInAdditional(res, adds1)
+ self.assertRRsetInAdditional(res, adds2)
+ self.assertMatchingRRSIGInAdditional(res, adds1)
+ self.assertMatchingRRSIGInAdditional(res, adds2)
+