From 13147146aa8df4b63b5e0b69545f3bed7ab977a2 Mon Sep 17 00:00:00 2001 From: Bob Halley Date: Fri, 26 Nov 2021 15:12:50 -0800 Subject: [PATCH] First draft of CNAME and other data handling in zones. --- dns/node.py | 24 +++++++- dns/rdataset.py | 23 ++++++++ tests/test_zone.py | 141 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 178 insertions(+), 10 deletions(-) diff --git a/dns/node.py b/dns/node.py index 68c15266..261de37a 100644 --- a/dns/node.py +++ b/dns/node.py @@ -78,6 +78,26 @@ class Node: def __iter__(self): return iter(self.rdatasets) + def _append_rdataset(self, rdataset): + """Append rdataset to the node with special handling for CNAME and + other data conditions. + + Specifically, if the rdataset being appended is a CNAME, then + all rdatasets other than NSEC, NSEC3, and their covering RRSIGs + are deleted. If the rdataset being appended is NOT a CNAME, then + CNAME and RRSIG(CNAME) are deleted. + """ + # Make having just one rdataset at the node fast. + if len(self.rdatasets) > 0: + if rdataset.rdtype == dns.rdatatype.CNAME: + self.rdatasets = [rds for rds in self.rdatasets + if rds.ok_for_cname()] + else: + self.rdatasets = [rds for rds in self.rdatasets + if rds.ok_for_other_data()] + self.rdatasets.append(rdataset) + + def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE, create=False): """Find an rdataset matching the specified properties in the @@ -111,7 +131,7 @@ class Node: if not create: raise KeyError rds = dns.rdataset.Rdataset(rdclass, rdtype, covers) - self.rdatasets.append(rds) + self._append_rdataset(rds) return rds def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE, @@ -186,4 +206,4 @@ class Node: replacement = replacement.to_rdataset() self.delete_rdataset(replacement.rdclass, replacement.rdtype, replacement.covers) - self.rdatasets.append(replacement) + self._append_rdataset(replacement) diff --git a/dns/rdataset.py b/dns/rdataset.py index e69ee232..e9d8fc20 100644 --- a/dns/rdataset.py +++ b/dns/rdataset.py @@ -41,6 +41,20 @@ class IncompatibleTypes(dns.exception.DNSException): """An attempt was made to add DNS RR data of an incompatible type.""" +_ok_for_cname = { + (dns.rdatatype.CNAME, dns.rdatatype.NONE), + (dns.rdatatype.RRSIG, dns.rdatatype.CNAME), + (dns.rdatatype.NSEC, dns.rdatatype.NONE), + (dns.rdatatype.RRSIG, dns.rdatatype.NSEC), + (dns.rdatatype.NSEC3, dns.rdatatype.NONE), + (dns.rdatatype.RRSIG, dns.rdatatype.NSEC3), +} + +_delete_for_other_data = { + (dns.rdatatype.CNAME, dns.rdatatype.NONE), + (dns.rdatatype.RRSIG, dns.rdatatype.CNAME), +} + class Rdataset(dns.set.Set): """A DNS rdataset.""" @@ -323,6 +337,15 @@ class Rdataset(dns.set.Set): else: return self[0]._processing_order(iter(self)) + def ok_for_cname(self): + """Is this rdataset compatible with a CNAME node?""" + return (self.rdtype, self.covers) in _ok_for_cname + + def ok_for_other_data(self): + """Is this rdataset compatible with an 'other data' (i.e. not CNAME) + node?""" + return (self.rdtype, self.covers) not in _delete_for_other_data + @dns.immutable.immutable class ImmutableRdataset(Rdataset): diff --git a/tests/test_zone.py b/tests/test_zone.py index 3e34e720..1cd58dd1 100644 --- a/tests/test_zone.py +++ b/tests/test_zone.py @@ -192,6 +192,42 @@ ns1 3600 IN A 10.0.0.1 ; comment1 ns2 3600 IN A 10.0.0.2 ; comment2 """ + +example_cname = """$TTL 3600 +$ORIGIN example. +@ soa foo bar (1 2 3 4 5) +@ ns ns1 +@ ns ns2 +ns1 a 10.0.0.1 +ns2 a 10.0.0.2 +www a 10.0.0.3 +web cname www + nsec @ CNAME RRSIG + rrsig NSEC 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= + rrsig CNAME 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= +web2 cname www + nsec3 1 1 12 aabbccdd 2t7b4g4vsa5smi47k61mv5bv1a22bojr CNAME RRSIG + rrsig NSEC3 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= + rrsig CNAME 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= +""" + + +example_other_data = """$TTL 3600 +$ORIGIN example. +@ soa foo bar (1 2 3 4 5) +@ ns ns1 +@ ns ns2 +ns1 a 10.0.0.1 +ns2 a 10.0.0.2 +www a 10.0.0.3 +web a 10.0.0.4 + nsec @ A RRSIG + rrsig A 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= + rrsig NSEC 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= + rrsig CNAME 1 3 3600 20200101000000 20030101000000 2143 foo MxFcby9k/yvedMfQgKzhH5er0Mu/vILz 45IkskceFGgiWCn/GxHhai6VAuHAoNUz 4YoU1tVfSCSqQYn6//11U6Nld80jEeC8 aTrO+KKmCaY= +""" + + _keep_output = True def _rdata_sort(a): @@ -839,6 +875,58 @@ class ZoneTestCase(unittest.TestCase): self.assertTrue(rds is not rrs) self.assertFalse(isinstance(rds, dns.rrset.RRset)) + def testCnameAndOtherDataAddOther(self): + z = dns.zone.from_text(example_cname, 'example.', relativize=True) + rds = dns.rdataset.from_text('in', 'a', 300, '10.0.0.1') + z.replace_rdataset('web', rds) + z.replace_rdataset('web2', rds.copy()) + n = z.find_node('web') + self.assertEqual(len(n.rdatasets), 3) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC)) + n = z.find_node('web2') + self.assertEqual(len(n.rdatasets), 3) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC3)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC3)) + + def testCnameAndOtherDataAddCname(self): + z = dns.zone.from_text(example_other_data, 'example.', relativize=True) + rds = dns.rdataset.from_text('in', 'cname', 300, 'www') + z.replace_rdataset('web', rds) + n = z.find_node('web') + self.assertEqual(len(n.rdatasets), 4) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, + dns.rdatatype.CNAME), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.CNAME)) + + def testNameInZoneWithStr(self): + z = dns.zone.from_text(example_text, 'example.', relativize=False) + self.assertTrue('ns1.example.' in z) + self.assertTrue('bar.foo.example.' in z) + + def testNameInZoneWhereNameIsNotValid(self): + z = dns.zone.from_text(example_text, 'example.', relativize=False) + with self.assertRaises(KeyError): + self.assertTrue(1 in z) + class VersionedZoneTestCase(unittest.TestCase): def testUseTransaction(self): @@ -909,15 +997,52 @@ class VersionedZoneTestCase(unittest.TestCase): rds = txn.get('example.', 'soa') self.assertEqual(rds[0].serial, 1) - def testNameInZoneWithStr(self): - z = dns.zone.from_text(example_text, 'example.', relativize=False) - self.assertTrue('ns1.example.' in z) - self.assertTrue('bar.foo.example.' in z) + def testCnameAndOtherDataAddOther(self): + z = dns.zone.from_text(example_cname, 'example.', relativize=True, + zone_factory=dns.versioned.Zone) + rds = dns.rdataset.from_text('in', 'a', 300, '10.0.0.1') + with z.writer() as txn: + txn.replace('web', rds) + txn.replace('web2', rds.copy()) + n = z.find_node('web') + self.assertEqual(len(n.rdatasets), 3) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC)) + n = z.find_node('web2') + self.assertEqual(len(n.rdatasets), 3) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, dns.rdatatype.A), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC3)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC3)) + + def testCnameAndOtherDataAddCname(self): + z = dns.zone.from_text(example_other_data, 'example.', relativize=True, + zone_factory=dns.versioned.Zone) + rds = dns.rdataset.from_text('in', 'cname', 300, 'www') + with z.writer() as txn: + txn.replace('web', rds) + n = z.find_node('web') + self.assertEqual(len(n.rdatasets), 4) + self.assertEqual(n.find_rdataset(dns.rdataclass.IN, + dns.rdatatype.CNAME), + rds) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.NSEC)) + self.assertIsNotNone(n.get_rdataset(dns.rdataclass.IN, + dns.rdatatype.RRSIG, + dns.rdatatype.CNAME)) - def testNameInZoneWhereNameIsNotValid(self): - z = dns.zone.from_text(example_text, 'example.', relativize=False) - with self.assertRaises(KeyError): - self.assertTrue(1 in z) if __name__ == '__main__': unittest.main() -- 2.47.3