From ce393dd198ea761be9b73c67354088faf6aebefa Mon Sep 17 00:00:00 2001 From: Bob Halley Date: Sat, 20 Nov 2010 12:09:45 +0000 Subject: [PATCH] convert to dnspython conventions, support relative and absolute names --- dns/dnssec.py | 170 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 138 insertions(+), 32 deletions(-) diff --git a/dns/dnssec.py b/dns/dnssec.py index 332e0127..3831a145 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -15,12 +15,27 @@ """Common DNSSEC-related functions and constants.""" +import cStringIO +import struct +import time + +import dns.exception import dns.hash import dns.name +import dns.node +import dns.rdataset import dns.rdata import dns.rdatatype import dns.rdataclass +class UnsupportedAlgorithm(dns.exception.DNSException): + """Raised if an algorithm is not supported.""" + pass + +class ValidationFailure(dns.exception.DNSException): + """The DNSSEC signature is invalid.""" + pass + RSAMD5 = 1 DH = 2 DSA = 3 @@ -55,10 +70,6 @@ _algorithm_by_text = { _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()]) -class UnknownAlgorithm(Exception): - """Raised if an algorithm is unknown.""" - pass - def algorithm_from_text(text): """Convert text into a DNSSEC algorithm value @rtype: int""" @@ -77,13 +88,13 @@ def algorithm_to_text(value): text = str(value) return text -def _to_rdata(record): +def _to_rdata(record, origin): s = cStringIO.StringIO() - record.to_wire(s) + record.to_wire(s, origin=origin) return s.getvalue() -def key_id(key): - rdata = _to_rdata(key) +def key_id(key, origin=None): + rdata = _to_rdata(key, origin) if key.algorithm == RSAMD5: return (ord(rdata[-3]) << 8) + ord(rdata[-2]) else: @@ -95,7 +106,7 @@ def key_id(key): total += ((total >> 16) & 0xffff); return total & 0xffff -def make_ds(name, key, algorithm): +def make_ds(name, key, algorithm, origin=None): if algorithm.upper() == 'SHA1': dsalg = 1 hash = dns.hash.get('SHA1')() @@ -103,21 +114,34 @@ def make_ds(name, key, algorithm): dsalg = 2 hash = dns.hash.get('SHA256')() else: - raise ValueError, 'unsupported algorithm "%s"' % algorithm + raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm if isinstance(name, (str, unicode)): - name = dns.name.from_text(name) + name = dns.name.from_text(name, origin) hash.update(name.canonicalize().to_wire()) - hash.update(_to_rdata(key)) + hash.update(_to_rdata(key, origin)) digest = hash.digest() dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, len(dsrdata)) + def _find_key(keys, rrsig): - for key in keys: - if key.algorithm == rrsig.algorithm and key_id(key) == rrsig.key_tag: - return key + value = keys.get(rrsig.signer) + if value is None: + return None + if isinstance(value, dns.node.Node): + try: + rdataset = node.find_rdataset(dns.rdataclass.IN, + dns.rdatatype.DNSKEY) + except KeyError: + return None + else: + rdataset = value + for rdata in rdataset: + if rdata.algorithm == rrsig.algorithm and \ + key_id(rdata) == rrsig.key_tag: + return rdata return None def _is_rsa(algorithm): @@ -150,7 +174,7 @@ def _make_hash(algorithm): return dns.hash.get('SHA256')() if _is_sha512(algorithm): return dns.hash.get('SHA512')() - raise ValueError, 'unknown algorithm' + raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm def _make_algorithm_id(algorithm): if _is_md5(algorithm): @@ -162,7 +186,7 @@ def _make_algorithm_id(algorithm): elif _is_sha512(algorithm): oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03] else: - raise ValueError, 'unknown algorithm %u' % algorithm + raise ValidationFailure, 'unknown algorithm %u' % algorithm olen = len(oid) dlen = _make_hash(algorithm).digest_size idbytes = [0x30] + [8 + olen + dlen] + \ @@ -170,16 +194,48 @@ def _make_algorithm_id(algorithm): [0x05, 0x00] + [0x04, dlen] return ''.join(map(chr, idbytes)) -def _validate(rrset, rrsig, keys): +def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): + """Validate an RRset against a single signature rdata + + The owner name of the rrsig is assumed to be the same as the owner name + of the rrset. + + @param rrset: The RRset to validate + @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) + tuple + @param rrsig: The signature rdata + @type rrsig: dns.rrset.Rdata + @param keys: The key dictionary. + @type keys: a dictionary keyed by dns.name.Name with node or rdataset values + @param origin: The origin to use for relative names + @type origin: dns.name.Name or None + @param now: The time to use when validating the signatures. The default + is the current time. + @type now: int + """ + + if isinstance(origin, (str, unicode)): + origin = dns.name.from_text(origin, dns.name.root) + key = _find_key(keys, rrsig) if not key: - raise ValueError, 'unknown key' + raise ValidationFailure, 'unknown key' + + # For convenience, allow the rrset to be specified as a (name, rdataset) + # tuple as well as a proper rrset + if isinstance(rrset, tuple): + rrname = rrset[0] + rdataset = rrset[1] + else: + rrname = rrset.name + rdataset = rrset - now = time.time() + if now is None: + now = time.time() if rrsig.expiration < now: - raise ValueError, 'expired' + raise ValidationFailure, 'expired' if rrsig.inception > now: - raise ValueError, 'not yet valid' + raise ValidationFailure, 'not yet valid' hash = _make_hash(rrsig.algorithm) @@ -216,23 +272,22 @@ def _validate(rrset, rrsig, keys): sig = (Crypto.Util.number.bytes_to_long(dsa_r), Crypto.Util.number.bytes_to_long(dsa_s)) else: - raise ValueError, 'unknown algorithm' + raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm - hash.update(_to_rdata(rrsig)[:18]) - hash.update(rrsig.signer.to_digestable()) + hash.update(_to_rdata(rrsig, origin)[:18]) + hash.update(rrsig.signer.to_digestable(origin)) - rrname = rrset.name if rrsig.labels < len(rrname) - 1: suffix = rrname.split(rrsig.labels + 1)[1] rrname = dns.name.from_text('*', suffix) - rrnamebuf = rrname.to_digestable() - rrfixed = struct.pack('!HHI', rrset.rdtype, rrset.rdclass, + rrnamebuf = rrname.to_digestable(origin) + rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl) - rrlist = sorted(rrset); + rrlist = sorted(rdataset); for rr in rrlist: hash.update(rrnamebuf) hash.update(rrfixed) - rrdata = rr.to_digestable() + rrdata = rr.to_digestable(origin) rrlen = struct.pack('!H', len(rrdata)) hash.update(rrlen) hash.update(rrdata) @@ -247,10 +302,59 @@ def _validate(rrset, rrsig, keys): elif _is_dsa(rrsig.algorithm): pass else: - raise ValueError, 'unknown algorithm' + # Raise here for code clarity; this won't actually ever happen + # since if the algorithm is really unknown we'd already have + # raised an exception above + raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm if not pubkey.verify(digest, sig): - raise ValueError, 'verify failure' + raise ValidationFailure, 'verify failure' + +def _validate(rrset, rrsigset, keys, origin=None, now=None): + """Validate an RRset + + @param rrset: The RRset to validate + @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) + tuple + @param rrsigset: The signature RRset + @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) + tuple + @param keys: The key dictionary. + @type keys: a dictionary keyed by dns.name.Name with node or rdataset values + @param origin: The origin to use for relative names + @type origin: dns.name.Name or None + @param now: The time to use when validating the signatures. The default + is the current time. + @type now: int + """ + + if isinstance(origin, (str, unicode)): + origin = dns.name.from_text(origin, dns.name.root) + + if isinstance(rrset, tuple): + rrname = rrset[0] + else: + rrname = rrset.name + + if isinstance(rrsigset, tuple): + rrsigname = rrsigset[0] + rrsigrdataset = rrsigset[1] + else: + rrsigname = rrsigset.name + rrsigrdataset = rrsigset + + rrname = rrname.choose_relativity(origin) + rrsigname = rrname.choose_relativity(origin) + if rrname != rrsigname: + raise ValidationFailure, "owner names do not match" + + for rrsig in rrsigrdataset: + try: + _validate_rrsig(rrset, rrsig, keys, origin, now) + return + except ValidationFailure, e: + pass + raise ValidationFailure, "no RRSIGs validated" def _need_pycrypto(*args, **kwargs): raise NotImplementedError, "DNSSEC validation requires pycrypto" @@ -259,5 +363,7 @@ try: from Crypto.PublicKey import RSA,DSA import Crypto.Util.number validate = _validate + validate_rrsig = _validate_rrsig except ImportError: validate = _need_pycrypto + validate_rrsig = _need_pycrypto -- 2.47.3