From 5bf70eab23bbade7473024e6146f8b757e8805fb Mon Sep 17 00:00:00 2001 From: Brian Wellington Date: Fri, 12 Feb 2021 14:02:48 -0800 Subject: [PATCH] DNSSEC refactoring. Reorders some of the operations related to RRSIG validation. The net effects here are that the code performs all of the parts of validation that are not specific to the keys first before the parts that are specific to keys (and need to be repeated for each key). This also fixes the inconsistency that if there are multiple keys that match an RRSIG, a single malformed key would lead to an exception, but a signle failed signature validation would not. In both cases, an exception should only be raised if no keys successfully validate the RRSIG. --- dns/dnssec.py | 293 ++++++++++++++++++++++++-------------------------- 1 file changed, 141 insertions(+), 152 deletions(-) diff --git a/dns/dnssec.py b/dns/dnssec.py index 095fabd4..f09ecd60 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -166,23 +166,15 @@ def make_ds(name, key, algorithm, origin=None): def _find_candidate_keys(keys, rrsig): - candidate_keys = [] value = keys.get(rrsig.signer) - if value is None: - return None if isinstance(value, dns.node.Node): - try: - rdataset = value.find_rdataset(dns.rdataclass.IN, - dns.rdatatype.DNSKEY) - except KeyError: - return None + rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) else: rdataset = value - for rdata in rdataset: - if rdata.algorithm == rrsig.algorithm and \ - key_id(rdata) == rrsig.key_tag: - candidate_keys.append(rdata) - return candidate_keys + if rdataset is None: + return None + return [rd for rd in rdataset if + rd.algorithm == rrsig.algorithm and key_id(rd) == rrsig.key_tag] def _is_rsa(algorithm): @@ -251,6 +243,82 @@ def _bytes_to_long(b): return int.from_bytes(b, 'big') +def _validate_signature(sig, data, key, chosen_hash): + if _is_rsa(key.algorithm): + keyptr = key.key + (bytes_,) = struct.unpack('!B', keyptr[0:1]) + keyptr = keyptr[1:] + if bytes_ == 0: + (bytes_,) = struct.unpack('!H', keyptr[0:2]) + keyptr = keyptr[2:] + rsa_e = keyptr[0:bytes_] + rsa_n = keyptr[bytes_:] + try: + public_key = rsa.RSAPublicNumbers( + _bytes_to_long(rsa_e), + _bytes_to_long(rsa_n)).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash) + elif _is_dsa(key.algorithm): + keyptr = key.key + (t,) = struct.unpack('!B', keyptr[0:1]) + keyptr = keyptr[1:] + octets = 64 + t * 8 + dsa_q = keyptr[0:20] + keyptr = keyptr[20:] + dsa_p = keyptr[0:octets] + keyptr = keyptr[octets:] + dsa_g = keyptr[0:octets] + keyptr = keyptr[octets:] + dsa_y = keyptr[0:octets] + try: + public_key = dsa.DSAPublicNumbers( + _bytes_to_long(dsa_y), + dsa.DSAParameterNumbers( + _bytes_to_long(dsa_p), + _bytes_to_long(dsa_q), + _bytes_to_long(dsa_g))).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, chosen_hash) + elif _is_ecdsa(key.algorithm): + keyptr = key.key + if key.algorithm == Algorithm.ECDSAP256SHA256: + curve = ec.SECP256R1() + octets = 32 + else: + curve = ec.SECP384R1() + octets = 48 + ecdsa_x = keyptr[0:octets] + ecdsa_y = keyptr[octets:octets * 2] + try: + public_key = ec.EllipticCurvePublicNumbers( + curve=curve, + x=_bytes_to_long(ecdsa_x), + y=_bytes_to_long(ecdsa_y)).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data, ec.ECDSA(chosen_hash)) + elif _is_eddsa(key.algorithm): + keyptr = key.key + if key.algorithm == Algorithm.ED25519: + loader = ed25519.Ed25519PublicKey + else: + loader = ed448.Ed448PublicKey + try: + public_key = loader.from_public_bytes(keyptr) + except ValueError: + raise ValidationFailure('invalid public key') + public_key.verify(sig, data) + elif _is_gost(key.algorithm): + raise UnsupportedAlgorithm( + 'algorithm "%s" not supported by dnspython' % + algorithm_to_text(key.algorithm)) + else: + raise ValidationFailure('unknown algorithm %u' % key.algorithm) + + def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): """Validate an RRset against a single signature rdata, throwing an exception if validation is not successful. @@ -288,148 +356,69 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): if candidate_keys is None: raise ValidationFailure('unknown key') - for candidate_key in candidate_keys: - # For convenience, allow the rrset to be specified as a (name, - # rdataset) tuple as well as a proper rrset - if isinstance(rrset, tuple): - rrname = rrset[0] - rdataset = rrset[1] - else: - rrname = rrset.name - rdataset = rrset - - if now is None: - now = time.time() - if rrsig.expiration < now: - raise ValidationFailure('expired') - if rrsig.inception > now: - raise ValidationFailure('not yet valid') - - if _is_rsa(rrsig.algorithm): - keyptr = candidate_key.key - (bytes_,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - if bytes_ == 0: - (bytes_,) = struct.unpack('!H', keyptr[0:2]) - keyptr = keyptr[2:] - rsa_e = keyptr[0:bytes_] - rsa_n = keyptr[bytes_:] - try: - public_key = rsa.RSAPublicNumbers( - _bytes_to_long(rsa_e), - _bytes_to_long(rsa_n)).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig = rrsig.signature - elif _is_dsa(rrsig.algorithm): - keyptr = candidate_key.key - (t,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - octets = 64 + t * 8 - dsa_q = keyptr[0:20] - keyptr = keyptr[20:] - dsa_p = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_g = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_y = keyptr[0:octets] - try: - public_key = dsa.DSAPublicNumbers( - _bytes_to_long(dsa_y), - dsa.DSAParameterNumbers( - _bytes_to_long(dsa_p), - _bytes_to_long(dsa_q), - _bytes_to_long(dsa_g))).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig_r = rrsig.signature[1:21] - sig_s = rrsig.signature[21:] - sig = utils.encode_dss_signature(_bytes_to_long(sig_r), - _bytes_to_long(sig_s)) - elif _is_ecdsa(rrsig.algorithm): - keyptr = candidate_key.key - if rrsig.algorithm == Algorithm.ECDSAP256SHA256: - curve = ec.SECP256R1() - octets = 32 - else: - curve = ec.SECP384R1() - octets = 48 - ecdsa_x = keyptr[0:octets] - ecdsa_y = keyptr[octets:octets * 2] - try: - public_key = ec.EllipticCurvePublicNumbers( - curve=curve, - x=_bytes_to_long(ecdsa_x), - y=_bytes_to_long(ecdsa_y)).public_key(default_backend()) - except ValueError: - raise ValidationFailure('invalid public key') - sig_r = rrsig.signature[0:octets] - sig_s = rrsig.signature[octets:] - sig = utils.encode_dss_signature(_bytes_to_long(sig_r), - _bytes_to_long(sig_s)) - - elif _is_eddsa(rrsig.algorithm): - keyptr = candidate_key.key - if rrsig.algorithm == Algorithm.ED25519: - loader = ed25519.Ed25519PublicKey - else: - loader = ed448.Ed448PublicKey - try: - public_key = loader.from_public_bytes(keyptr) - except ValueError: - raise ValidationFailure('invalid public key') - sig = rrsig.signature - elif _is_gost(rrsig.algorithm): - raise UnsupportedAlgorithm( - 'algorithm "%s" not supported by dnspython' % - algorithm_to_text(rrsig.algorithm)) + # For convenience, allow the rrset to be specified as a (name, + # rdataset) tuple as well as a proper rrset + if isinstance(rrset, tuple): + rrname = rrset[0] + rdataset = rrset[1] + else: + rrname = rrset.name + rdataset = rrset + + if now is None: + now = time.time() + if rrsig.expiration < now: + raise ValidationFailure('expired') + if rrsig.inception > now: + raise ValidationFailure('not yet valid') + + if _is_dsa(rrsig.algorithm): + sig_r = rrsig.signature[1:21] + sig_s = rrsig.signature[21:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) + elif _is_ecdsa(rrsig.algorithm): + if rrsig.algorithm == Algorithm.ECDSAP256SHA256: + octets = 32 else: - raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) - - data = b'' - data += rrsig.to_wire(origin=origin)[:18] - data += rrsig.signer.to_digestable(origin) - - # Derelativize the name before considering labels. - rrname = rrname.derelativize(origin) - - if len(rrname) - 1 < rrsig.labels: - raise ValidationFailure('owner name longer than RRSIG labels') - elif rrsig.labels < len(rrname) - 1: - suffix = rrname.split(rrsig.labels + 1)[1] - rrname = dns.name.from_text('*', suffix) - rrnamebuf = rrname.to_digestable() - rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, - rrsig.original_ttl) - rrlist = sorted(rdataset) - for rr in rrlist: - data += rrnamebuf - data += rrfixed - rrdata = rr.to_digestable(origin) - rrlen = struct.pack('!H', len(rrdata)) - data += rrlen - data += rrdata - - chosen_hash = _make_hash(rrsig.algorithm) + octets = 48 + sig_r = rrsig.signature[0:octets] + sig_s = rrsig.signature[octets:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) + else: + sig = rrsig.signature + + data = b'' + data += rrsig.to_wire(origin=origin)[:18] + data += rrsig.signer.to_digestable(origin) + + # Derelativize the name before considering labels. + rrname = rrname.derelativize(origin) + + if len(rrname) - 1 < rrsig.labels: + raise ValidationFailure('owner name longer than RRSIG labels') + elif rrsig.labels < len(rrname) - 1: + suffix = rrname.split(rrsig.labels + 1)[1] + rrname = dns.name.from_text('*', suffix) + rrnamebuf = rrname.to_digestable() + rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, + rrsig.original_ttl) + for rr in sorted(rdataset): + data += rrnamebuf + data += rrfixed + rrdata = rr.to_digestable(origin) + rrlen = struct.pack('!H', len(rrdata)) + data += rrlen + data += rrdata + + chosen_hash = _make_hash(rrsig.algorithm) + + for candidate_key in candidate_keys: try: - if _is_rsa(rrsig.algorithm): - public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash) - elif _is_dsa(rrsig.algorithm): - public_key.verify(sig, data, chosen_hash) - elif _is_ecdsa(rrsig.algorithm): - public_key.verify(sig, data, ec.ECDSA(chosen_hash)) - elif _is_eddsa(rrsig.algorithm): - public_key.verify(sig, data) - else: - # Raise here for code clarity; this won't actually ever happen - # since if the algorithm is really unknown we'd already have - # raised an exception above - raise ValidationFailure('unknown algorithm %u' % - rrsig.algorithm) # pragma: no cover - # If we got here, we successfully verified so we can return - # without error + _validate_signature(sig, data, candidate_key, chosen_hash) return - except InvalidSignature: + except (InvalidSignature, ValidationFailure): # this happens on an individual validation failure continue # nothing verified -- raise failure: -- 2.47.3