dns.rdatatype.KEY, # RFC 4035 section 2.5, RFC 3007
}
-_delete_for_other_data = {
- (dns.rdatatype.CNAME, dns.rdatatype.NONE),
- (dns.rdatatype.RRSIG, dns.rdatatype.CNAME),
-}
-
class Rdataset(dns.set.Set):
(self.rdtype == dns.rdatatype.RRSIG and
self.covers in _ok_for_cname)
+ def implies_cname(self):
+ """Does this rdataset imply a node is a CNAME node?
+
+ If the rdataset's type is CNAME or RRSIG(CNAME) then it implies a
+ node is a CNAME node, and ``True`` is returned. Otherwise it implies
+ the node is an an "other data" node, and ``False`` is returned.
+ """
+ return self.rdtype == dns.rdatatype.CNAME or \
+ (self.rdtype == dns.rdatatype.RRSIG and
+ self.covers == dns.rdatatype.CNAME)
+
def ok_for_other_data(self):
"""Is this rdataset compatible with an 'other data' (i.e. not CNAME)
node?"""
- return (self.rdtype, self.covers) not in _delete_for_other_data
+ return not self.implies_cname()
@dns.immutable.immutable
"""Tried to use an already-ended transaction."""
+def _ensure_immutable(rdataset):
+ if rdataset is None or isinstance(rdataset, dns.rdataset.ImmutableRdataset):
+ return rdataset
+ return dns.rdataset.ImmutableRdataset(rdataset)
+
+
class Transaction:
def __init__(self, manager, replacement=False, read_only=False):
self.replacement = replacement
self.read_only = read_only
self._ended = False
+ self._check_put_rdataset = []
+ self._check_delete_rdataset = []
+ self._check_delete_name = []
#
# This is the high level API
name = dns.name.from_text(name, None)
rdtype = dns.rdatatype.RdataType.make(rdtype)
rdataset = self._get_rdataset(name, rdtype, covers)
- if rdataset is not None and \
- not isinstance(rdataset, dns.rdataset.ImmutableRdataset):
- rdataset = dns.rdataset.ImmutableRdataset(rdataset)
- return rdataset
+ return _ensure_immutable(rdataset)
+
+ def get_rdatasets(self, name):
+ """Return the rdatasets at *name*, if any.
+
+ The returned rdatasets are immutable.
+ An empty tuple is returned if the name doesn't exist.
+ """
+ return [_ensure_immutable(rds) for rds in self._get_rdatasets(name)]
def _check_read_only(self):
if self.read_only:
"""
self._end(False)
+ def check_put_rdataset(self, check):
+ """Call *check* before putting (storing) an rdataset.
+
+ The function is called with the transaction, the name, and the rdataset.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_put_rdataset.append(check)
+
+ def check_delete_rdataset(self, check):
+ """Call *check* before deleting an rdataset.
+
+ The function is called with the transaction, the name, the rdatatype,
+ covered rdatatype.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_delete_rdataset.append(check)
+
+ def check_delete_name(self, check):
+ """Call *check* before putting (storing) an rdataset.
+
+ The function is called with the transaction and the name.
+
+ The check function may safely make non-mutating transaction method
+ calls, but behavior is undefined if mutating transaction methods are
+ called. The check function should raise an exception if it objects to
+ the put, and otherwise should return ``None``.
+ """
+ self._check_delete_name.append(check)
+
#
# Helper methods
#
trds.update(existing)
existing = trds
rdataset = existing.union(rdataset)
- self._put_rdataset(name, rdataset)
+ self._checked_put_rdataset(name, rdataset)
except IndexError:
raise TypeError(f'not enough parameters to {method}')
raise DeleteNotExact(f'{method}: missing rdatas')
rdataset = existing.difference(rdataset)
if len(rdataset) == 0:
- self._delete_rdataset(name, rdataset.rdtype,
- rdataset.covers)
+ self._checked_delete_rdataset(name, rdataset.rdtype,
+ rdataset.covers)
else:
- self._put_rdataset(name, rdataset)
+ self._checked_put_rdataset(name, rdataset)
elif exact:
raise DeleteNotExact(f'{method}: missing rdataset')
else:
if exact and not self._name_exists(name):
raise DeleteNotExact(f'{method}: name not known')
- self._delete_name(name)
+ self._checked_delete_name(name)
except IndexError:
raise TypeError(f'not enough parameters to {method}')
finally:
self._ended = True
+ def _checked_put_rdataset(self, name, rdataset):
+ for check in self._check_put_rdataset:
+ check(self, name, rdataset)
+ self._put_rdataset(name, rdataset)
+
+ def _checked_delete_rdataset(self, name, rdtype, covers):
+ for check in self._check_delete_rdataset:
+ check(self, name, rdtype, covers)
+ self._delete_rdataset(name, rdtype, covers)
+
+ def _checked_delete_name(self, name):
+ for check in self._check_delete_name:
+ check(self, name)
+ self._delete_name(name)
+
#
# Transactions are context managers.
#
def _delete_name(self, name):
"""Delete all data associated with *name*.
- It is not an error if the rdataset does not exist.
+ It is not an error if the name does not exist.
"""
raise NotImplementedError # pragma: no cover
def _iterate_rdatasets(self):
"""Return an iterator that yields (name, rdataset) tuples.
+ """
+ raise NotImplementedError # pragma: no cover
+
+ def _get_rdatasets(self, name):
+ """Return the rdatasets at *name*, if any.
- Not all Transaction subclasses implement this.
+ An empty list is returned if the name doesn't exist.
"""
raise NotImplementedError # pragma: no cover