]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
refactor to have a get_node() in the txn API
authorBob Halley <halley@dnspython.org>
Thu, 2 Dec 2021 22:44:08 +0000 (14:44 -0800)
committerBob Halley <halley@dnspython.org>
Thu, 2 Dec 2021 22:44:08 +0000 (14:44 -0800)
dns/node.py
dns/rdataset.py
dns/transaction.py
dns/zone.py
dns/zonefile.py

index 173915475cbc70a20bf49ea2d7c90ce0f3ac5566..b8141c425726f917c448064a1de6dd02c32c4dd0 100644 (file)
 
 """DNS nodes.  A node is a set of rdatasets."""
 
+import enum
 import io
 
+import dns.immutable
 import dns.rdataset
 import dns.rdatatype
 import dns.renderer
 
 
+_cname_types = {
+    dns.rdatatype.CNAME,
+}
+
+# "neutral" types can coexist with a CNAME and thus are not "other data"
+_neutral_types = {
+    dns.rdatatype.NSEC,   # RFC 4035 section 2.5
+    dns.rdatatype.NSEC3,  # This is not likely to happen, but not impossible!
+    dns.rdatatype.KEY,    # RFC 4035 section 2.5, RFC 3007
+}
+
+def _matches_type_or_its_signature(rdtypes, rdtype, covers):
+    return rdtype in rdtypes or \
+           (rdtype == dns.rdatatype.RRSIG and covers in rdtypes)
+
+
+@enum.unique
+class NodeKind(enum.Enum):
+    """Rdatasets in nodes
+    """
+    REGULAR = 0      # a.k.a "other data"
+    NEUTRAL = 1
+    CNAME = 2
+
+    @classmethod
+    def classify(cls, rdtype, covers):
+        if _matches_type_or_its_signature(_cname_types, rdtype, covers):
+            return NodeKind.CNAME
+        elif _matches_type_or_its_signature(_neutral_types, rdtype, covers):
+            return NodeKind.NEUTRAL
+        else:
+            return NodeKind.REGULAR
+
+    @classmethod
+    def classify_rdataset(cls, rdataset):
+        return cls.classify(rdataset.rdtype, rdataset.covers)
+
+
 class Node:
 
     """A Node is a set of rdatasets.
@@ -95,22 +135,26 @@ class Node:
         """Append rdataset to the node with special handling for CNAME and
         other data conditions.
 
-        Specifically, if the rdataset being appended is a CNAME, then
-        all rdatasets other than NSEC, NSEC3, and their covering RRSIGs
-        are deleted.  If the rdataset being appended is NOT a CNAME, then
-        CNAME and RRSIG(CNAME) are deleted.
+        Specifically, if the rdataset being appended has ``NodeKind.CNAME``,
+        then all rdatasets other than KEY, NSEC, NSEC3, and their covering
+        RRSIGs are deleted.  If the rdataset being appended has
+        ``NodeKind.REGUALAR`` then CNAME and RRSIG(CNAME) are deleted.
         """
         # Make having just one rdataset at the node fast.
         if len(self.rdatasets) > 0:
-            if rdataset.implies_cname():
-                self.rdatasets = [rds for rds in self.rdatasets
-                                  if rds.ok_for_cname()]
-            elif rdataset.implies_other_data():
-                self.rdatasets = [rds for rds in self.rdatasets
-                                  if rds.ok_for_other_data()]
+            kind = NodeKind.classify_rdataset(rdataset)
+            if kind == NodeKind.CNAME:
+                self.rdatasets = [rds for rds in self.rdatasets if
+                                  NodeKind.classify_rdataset(rds) !=
+                                  NodeKind.REGULAR]
+            elif kind == NodeKind.REGULAR:
+                self.rdatasets = [rds for rds in self.rdatasets if
+                                  NodeKind.classify_rdataset(rds) !=
+                                  NodeKind.CNAME]
+            # Otherwise the rdataset is NodeKind.NEUTRAL and we do not need to
+            # edit self.rdatasets.
         self.rdatasets.append(rdataset)
 
-
     def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
                       create=False):
         """Find an rdataset matching the specified properties in the
@@ -221,11 +265,56 @@ class Node:
                              replacement.covers)
         self._append_rdataset(replacement)
 
-    def is_cname(self):
-        """Is this a CNAME node?
+    def classify(self):
+        """Classify a node.
+
+        A node which contains a CNAME or RRSIG(CNAME) is a
+        ``NodeKind.CNAME`` node.
 
-        If the node has a CNAME or an RRSIG(CNAME) it is considered a CNAME
-        node for CNAME-and-other-data purposes, and ``True`` is returned.
-        Otherwise the node is an "other data" node, and ``False`` is returned.
+        A node which contains only "neutral" types, i.e. types allowed to
+        co-exist with a CNAME, is a ``NodeKind.NEUTRAL`` node.  The neutral
+        types are NSEC, NSEC3, KEY, and their associated RRSIGS.  An empty node
+        is also considered neutral.
+
+        A node which contains some rdataset which is not a CNAME, RRSIG(CNAME),
+        or a neutral type is a a ``NodeKind.REGULAR`` node.  Regular nodes are
+        also commonly referred to as "other data".
         """
-        return any(rdataset.implies_cname() for rdataset in self.rdatasets)
+        for rdataset in self.rdatasets:
+            kind = NodeKind.classify(rdataset.rdtype, rdataset.covers)
+            if kind != NodeKind.NEUTRAL:
+                return kind
+        return NodeKind.NEUTRAL
+
+    def is_immutable(self):
+        return False
+
+
+dns.immutable.immutable
+class ImmutableNode(Node):
+    def __init__(self, node):
+        super().__init__()
+        self.rdatasets = tuple(
+            [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
+        )
+
+    def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+                      create=False):
+        if create:
+            raise TypeError("immutable")
+        return super().find_rdataset(rdclass, rdtype, covers, False)
+
+    def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
+                     create=False):
+        if create:
+            raise TypeError("immutable")
+        return super().get_rdataset(rdclass, rdtype, covers, False)
+
+    def delete_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
+        raise TypeError("immutable")
+
+    def replace_rdataset(self, replacement):
+        raise TypeError("immutable")
+
+    def is_immutable(self):
+        return True
index f948d76170c6c38a558f24d67668db3300b78df4..e69ee2325aee6a74675f02bede9d28ca737b0812 100644 (file)
@@ -41,14 +41,6 @@ class IncompatibleTypes(dns.exception.DNSException):
     """An attempt was made to add DNS RR data of an incompatible type."""
 
 
-_ok_for_cname = {
-    dns.rdatatype.CNAME,
-    dns.rdatatype.NSEC,   # RFC 4035 section 2.5
-    dns.rdatatype.NSEC3,  # This is not likely to happen, but not impossible!
-    dns.rdatatype.KEY,    # RFC 4035 section 2.5, RFC 3007
-}
-
-
 class Rdataset(dns.set.Set):
 
     """A DNS rdataset."""
@@ -331,36 +323,6 @@ class Rdataset(dns.set.Set):
         else:
             return self[0]._processing_order(iter(self))
 
-    def ok_for_cname(self):
-        """Is this rdataset compatible with a CNAME node?"""
-        return self.rdtype in _ok_for_cname or \
-               (self.rdtype == dns.rdatatype.RRSIG and
-                self.covers in _ok_for_cname)
-
-    def implies_cname(self):
-        """Does this rdataset imply a node is a CNAME node?
-
-        If the rdataset's type is CNAME or RRSIG(CNAME) then it implies a
-        node is a CNAME node, and ``True`` is returned.  Otherwise it implies
-        the node is an an "other data" node, and ``False`` is returned.
-        """
-        return self.rdtype == dns.rdatatype.CNAME or \
-            (self.rdtype == dns.rdatatype.RRSIG and
-            self.covers == dns.rdatatype.CNAME)
-
-    def ok_for_other_data(self):
-        """Is this rdataset compatible with an 'other data' (i.e. not CNAME)
-        node?"""
-        return not self.implies_cname()
-
-    def implies_other_data(self):
-        """Does this rdataset imply a node is an other data node?
-
-        Note that implies_other_data() is not simply "not implies_cname()" as
-        some types, e.g. NSEC and RRSIG(NSEC) are neutral.
-        """
-        return not self.ok_for_cname()
-
 
 @dns.immutable.immutable
 class ImmutableRdataset(Rdataset):
index 0df1e56498452204a891835ca063eaa30f35b4a8..ae7417edb10c8e929a92ecab3ccfe3038a2cba61 100644 (file)
@@ -79,11 +79,16 @@ class AlreadyEnded(dns.exception.DNSException):
     """Tried to use an already-ended transaction."""
 
 
-def _ensure_immutable(rdataset):
+def _ensure_immutable_rdataset(rdataset):
     if rdataset is None or isinstance(rdataset, dns.rdataset.ImmutableRdataset):
         return rdataset
     return dns.rdataset.ImmutableRdataset(rdataset)
 
+def _ensure_immutable_node(node):
+    if node is None or node.is_immutable():
+        return node
+    return dns.node.ImmutableNode(node)
+
 
 class Transaction:
 
@@ -111,15 +116,14 @@ class Transaction:
             name = dns.name.from_text(name, None)
         rdtype = dns.rdatatype.RdataType.make(rdtype)
         rdataset = self._get_rdataset(name, rdtype, covers)
-        return _ensure_immutable(rdataset)
+        return _ensure_immutable_rdataset(rdataset)
 
-    def get_rdatasets(self, name):
-        """Return the rdatasets at *name*, if any.
+    def get_node(self, name):
+        """Return the node at *name*, if any.
 
-        The returned rdatasets are immutable.
-        An empty list is returned if the name doesn't exist.
+        Returns an immutable node or ``None``.
         """
-        return [_ensure_immutable(rds) for rds in self._get_rdatasets(name)]
+        return _ensure_immutable_node(self._get_node(name))
 
     def _check_read_only(self):
         if self.read_only:
@@ -575,9 +579,9 @@ class Transaction:
         """
         raise NotImplementedError  # pragma: no cover
 
-    def _get_rdatasets(self, name):
-        """Return the rdatasets at *name*, if any.
+    def _get_node(self, name):
+        """Return the node at *name*, if any.
 
-        An empty list is returned if the name doesn't exist.
+        Returns a node or ``None``.
         """
         raise NotImplementedError  # pragma: no cover
index 9d999c747bcb6c8ca9da44ea6fb48f019579635d..dc4274a2f0b663ee6e1c1330036299d35a649c6e 100644 (file)
@@ -854,6 +854,9 @@ class ImmutableVersionedNode(VersionedNode):
     def replace_rdataset(self, replacement):
         raise TypeError("immutable")
 
+    def is_immutable(self):
+        return True
+
 
 class Version:
     def __init__(self, zone, id, nodes=None, origin=None):
@@ -1024,11 +1027,8 @@ class Transaction(dns.transaction.Transaction):
             for rdataset in node:
                 yield (name, rdataset)
 
-    def _get_rdatasets(self, name):
-        node = self.version.get_node(name)
-        if node is None:
-            return []
-        return node.rdatasets
+    def _get_node(self, name):
+        return self.version.get_node(name)
 
 
 def from_text(text, origin=None, rdclass=dns.rdataclass.IN,
index 4d72c718747cf05f7dca2b865135d25732a6a2d8..bcafe1d4b2c8f06c38a0385426235f9422896d8d 100644 (file)
@@ -43,19 +43,22 @@ class CNAMEAndOtherData(dns.exception.DNSException):
 
 
 def _check_cname_and_other_data(txn, name, rdataset):
-    rdatasets = txn.get_rdatasets(name)
-    if any(rds.implies_cname() for rds in rdatasets):
-        # This is a CNAME node.
-        if not rdataset.ok_for_cname():
-            raise CNAMEAndOtherData('rdataset not ok for CNAME node')
-    elif any(rds.implies_other_data() for rds in rdatasets):
-        # This is an other data node
-        if not rdataset.ok_for_other_data():
-            raise CNAMEAndOtherData('rdataset is a CNAME but node '
-                                    'has other data')
-    # Otherwise the node consists of neutral types that can be
-    # present at either a CNAME or an other data node, e.g. NSEC or
-    # RRSIG(NSEC)
+    rdataset_kind = dns.node.NodeKind.classify_rdataset(rdataset)
+    node = txn.get_node(name)
+    if node is not None:
+        node_kind = node.classify()
+    else:
+        node_kind = dns.node.NodeKind.NEUTRAL
+    if node_kind == dns.node.NodeKind.CNAME and \
+       rdataset_kind == dns.node.NodeKind.REGULAR:
+        raise CNAMEAndOtherData('rdataset type is not compatible with a '
+                                'CNAME node')
+    elif node_kind == dns.node.NodeKind.REGULAR and \
+       rdataset_kind == dns.node.NodeKind.CNAME:
+        raise CNAMEAndOtherData('CNAME rdataset is not compatible with a '
+                                'regular data node')
+    # Otherwise at least one of the node and the rdataset is neutral, so
+    # adding the rdataset is ok
 
 
 class Reader:
@@ -466,12 +469,16 @@ class RRsetsReaderTransaction(dns.transaction.Transaction):
     def _get_rdataset(self, name, rdtype, covers):
         return self.rdatasets.get((name, rdtype, covers))
 
-    def _get_rdatasets(self, name):
+    def _get_node(self, name):
         rdatasets = []
         for (rdataset_name, _, _), rdataset in self.rdatasets.items():
             if name == rdataset_name:
                 rdatasets.append(rdataset)
-        return rdatasets
+        if len(rdatasets) == 0:
+            return None
+        node = dns.node.Node()
+        node.rdatasets = rdatasets
+        return node
 
     def _put_rdataset(self, name, rdataset):
         self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset