]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
convert to dnspython conventions, support relative and absolute names
authorBob Halley <halley@nominum.com>
Sat, 20 Nov 2010 12:09:45 +0000 (12:09 +0000)
committerBob Halley <halley@nominum.com>
Sat, 20 Nov 2010 12:09:45 +0000 (12:09 +0000)
dns/dnssec.py

index 332e01275748d346af6db3520065a2e3c91a8ae0..3831a145ddf7b937c6df2a3440a8f12b9595588b 100644 (file)
 
 """Common DNSSEC-related functions and constants."""
 
+import cStringIO
+import struct
+import time
+
+import dns.exception
 import dns.hash
 import dns.name
+import dns.node
+import dns.rdataset
 import dns.rdata
 import dns.rdatatype
 import dns.rdataclass
 
+class UnsupportedAlgorithm(dns.exception.DNSException):
+    """Raised if an algorithm is not supported."""
+    pass
+
+class ValidationFailure(dns.exception.DNSException):
+    """The DNSSEC signature is invalid."""
+    pass
+
 RSAMD5 = 1
 DH = 2
 DSA = 3
@@ -55,10 +70,6 @@ _algorithm_by_text = {
 
 _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
 
-class UnknownAlgorithm(Exception):
-    """Raised if an algorithm is unknown."""
-    pass
-
 def algorithm_from_text(text):
     """Convert text into a DNSSEC algorithm value
     @rtype: int"""
@@ -77,13 +88,13 @@ def algorithm_to_text(value):
         text = str(value)
     return text
 
-def _to_rdata(record):
+def _to_rdata(record, origin):
     s = cStringIO.StringIO()
-    record.to_wire(s)
+    record.to_wire(s, origin=origin)
     return s.getvalue()
 
-def key_id(key):
-    rdata = _to_rdata(key)
+def key_id(key, origin=None):
+    rdata = _to_rdata(key, origin)
     if key.algorithm == RSAMD5:
         return (ord(rdata[-3]) << 8) + ord(rdata[-2])
     else:
@@ -95,7 +106,7 @@ def key_id(key):
         total += ((total >> 16) & 0xffff);
         return total & 0xffff
 
-def make_ds(name, key, algorithm):
+def make_ds(name, key, algorithm, origin=None):
     if algorithm.upper() == 'SHA1':
         dsalg = 1
         hash = dns.hash.get('SHA1')()
@@ -103,21 +114,34 @@ def make_ds(name, key, algorithm):
         dsalg = 2
         hash = dns.hash.get('SHA256')()
     else:
-        raise ValueError, 'unsupported algorithm "%s"' % algorithm
+        raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
 
     if isinstance(name, (str, unicode)):
-        name = dns.name.from_text(name)
+        name = dns.name.from_text(name, origin)
     hash.update(name.canonicalize().to_wire())
-    hash.update(_to_rdata(key))
+    hash.update(_to_rdata(key, origin))
     digest = hash.digest()
 
     dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
     return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
                                len(dsrdata))
+
 def _find_key(keys, rrsig):
-    for key in keys:
-        if key.algorithm == rrsig.algorithm and key_id(key) == rrsig.key_tag:
-            return key
+    value = keys.get(rrsig.signer)
+    if value is None:
+        return None
+    if isinstance(value, dns.node.Node):
+        try:
+            rdataset = node.find_rdataset(dns.rdataclass.IN,
+                                          dns.rdatatype.DNSKEY)
+        except KeyError:
+            return None
+    else:
+        rdataset = value
+    for rdata in rdataset:
+        if rdata.algorithm == rrsig.algorithm and \
+               key_id(rdata) == rrsig.key_tag:
+            return rdata
     return None
 
 def _is_rsa(algorithm):
@@ -150,7 +174,7 @@ def _make_hash(algorithm):
         return dns.hash.get('SHA256')()
     if _is_sha512(algorithm):
         return dns.hash.get('SHA512')()
-    raise ValueError, 'unknown algorithm'
+    raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm
 
 def _make_algorithm_id(algorithm):
     if _is_md5(algorithm):
@@ -162,7 +186,7 @@ def _make_algorithm_id(algorithm):
     elif _is_sha512(algorithm):
         oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
     else:
-        raise ValueError, 'unknown algorithm %u' % algorithm
+        raise ValidationFailure, 'unknown algorithm %u' % algorithm
     olen = len(oid)
     dlen = _make_hash(algorithm).digest_size
     idbytes = [0x30] + [8 + olen + dlen] + \
@@ -170,16 +194,48 @@ def _make_algorithm_id(algorithm):
               [0x05, 0x00] + [0x04, dlen]
     return ''.join(map(chr, idbytes))
 
-def _validate(rrset, rrsig, keys):
+def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
+    """Validate an RRset against a single signature rdata
+
+    The owner name of the rrsig is assumed to be the same as the owner name
+    of the rrset.
+
+    @param rrset: The RRset to validate
+    @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+    tuple
+    @param rrsig: The signature rdata
+    @type rrsig: dns.rrset.Rdata
+    @param keys: The key dictionary.
+    @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
+    @param origin: The origin to use for relative names
+    @type origin: dns.name.Name or None
+    @param now: The time to use when validating the signatures.  The default
+    is the current time.
+    @type now: int
+    """
+
+    if isinstance(origin, (str, unicode)):
+        origin = dns.name.from_text(origin, dns.name.root)
+
     key = _find_key(keys, rrsig)
     if not key:
-        raise ValueError, 'unknown key'
+        raise ValidationFailure, 'unknown key'
+
+    # For convenience, allow the rrset to be specified as a (name, rdataset)
+    # tuple as well as a proper rrset
+    if isinstance(rrset, tuple):
+        rrname = rrset[0]
+        rdataset = rrset[1]
+    else:
+        rrname = rrset.name
+        rdataset = rrset
 
-    now = time.time()
+    if now is None:
+        now = time.time()
     if rrsig.expiration < now:
-        raise ValueError, 'expired'
+        raise ValidationFailure, 'expired'
     if rrsig.inception > now:
-        raise ValueError, 'not yet valid'
+        raise ValidationFailure, 'not yet valid'
 
     hash = _make_hash(rrsig.algorithm)
 
@@ -216,23 +272,22 @@ def _validate(rrset, rrsig, keys):
         sig = (Crypto.Util.number.bytes_to_long(dsa_r),
                Crypto.Util.number.bytes_to_long(dsa_s))
     else:
-        raise ValueError, 'unknown algorithm'
+        raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
 
-    hash.update(_to_rdata(rrsig)[:18])
-    hash.update(rrsig.signer.to_digestable())
+    hash.update(_to_rdata(rrsig, origin)[:18])
+    hash.update(rrsig.signer.to_digestable(origin))
 
-    rrname = rrset.name
     if rrsig.labels < len(rrname) - 1:
         suffix = rrname.split(rrsig.labels + 1)[1]
         rrname = dns.name.from_text('*', suffix)
-    rrnamebuf = rrname.to_digestable()
-    rrfixed = struct.pack('!HHI', rrset.rdtype, rrset.rdclass,
+    rrnamebuf = rrname.to_digestable(origin)
+    rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
                           rrsig.original_ttl)
-    rrlist = sorted(rrset);
+    rrlist = sorted(rdataset);
     for rr in rrlist:
         hash.update(rrnamebuf)
         hash.update(rrfixed)
-        rrdata = rr.to_digestable()
+        rrdata = rr.to_digestable(origin)
         rrlen = struct.pack('!H', len(rrdata))
         hash.update(rrlen)
         hash.update(rrdata)
@@ -247,10 +302,59 @@ def _validate(rrset, rrsig, keys):
     elif _is_dsa(rrsig.algorithm):
         pass
     else:
-        raise ValueError, 'unknown algorithm'
+        # Raise here for code clarity; this won't actually ever happen
+        # since if the algorithm is really unknown we'd already have
+        # raised an exception above
+        raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
 
     if not pubkey.verify(digest, sig):
-        raise ValueError, 'verify failure'
+        raise ValidationFailure, 'verify failure'
+
+def _validate(rrset, rrsigset, keys, origin=None, now=None):
+    """Validate an RRset
+
+    @param rrset: The RRset to validate
+    @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+    tuple
+    @param rrsigset: The signature RRset
+    @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+    tuple
+    @param keys: The key dictionary.
+    @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
+    @param origin: The origin to use for relative names
+    @type origin: dns.name.Name or None
+    @param now: The time to use when validating the signatures.  The default
+    is the current time.
+    @type now: int
+    """
+
+    if isinstance(origin, (str, unicode)):
+        origin = dns.name.from_text(origin, dns.name.root)
+
+    if isinstance(rrset, tuple):
+        rrname = rrset[0]
+    else:
+        rrname = rrset.name
+
+    if isinstance(rrsigset, tuple):
+        rrsigname = rrsigset[0]
+        rrsigrdataset = rrsigset[1]
+    else:
+        rrsigname = rrsigset.name
+        rrsigrdataset = rrsigset
+
+    rrname = rrname.choose_relativity(origin)
+    rrsigname = rrname.choose_relativity(origin)
+    if rrname != rrsigname:
+        raise ValidationFailure, "owner names do not match"
+
+    for rrsig in rrsigrdataset:
+        try:
+            _validate_rrsig(rrset, rrsig, keys, origin, now)
+            return
+        except ValidationFailure, e:
+            pass
+    raise ValidationFailure, "no RRSIGs validated"
 
 def _need_pycrypto(*args, **kwargs):
     raise NotImplementedError, "DNSSEC validation requires pycrypto"
@@ -259,5 +363,7 @@ try:
     from Crypto.PublicKey import RSA,DSA
     import Crypto.Util.number
     validate = _validate
+    validate_rrsig = _validate_rrsig
 except ImportError:
     validate = _need_pycrypto
+    validate_rrsig = _need_pycrypto