]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
DNSSEC refactoring. 634/head
authorBrian Wellington <bwelling@xbill.org>
Fri, 12 Feb 2021 22:02:48 +0000 (14:02 -0800)
committerBrian Wellington <bwelling@xbill.org>
Fri, 12 Feb 2021 22:02:48 +0000 (14:02 -0800)
Reorders some of the operations related to RRSIG validation.  The net
effects here are that the code performs all of the parts of validation
that are not specific to the keys first before the parts that are
specific to keys (and need to be repeated for each key).

This also fixes the inconsistency that if there are multiple keys that
match an RRSIG, a single malformed key would lead to an exception, but a
signle failed signature validation would not.  In both cases, an
exception should only be raised if no keys successfully validate the
RRSIG.

dns/dnssec.py

index 095fabd45a2366c627a85cea72a93e85c3d6ca08..f09ecd60715c50964cead4d1fed6d4b824e07aa0 100644 (file)
@@ -166,23 +166,15 @@ def make_ds(name, key, algorithm, origin=None):
 
 
 def _find_candidate_keys(keys, rrsig):
-    candidate_keys = []
     value = keys.get(rrsig.signer)
-    if value is None:
-        return None
     if isinstance(value, dns.node.Node):
-        try:
-            rdataset = value.find_rdataset(dns.rdataclass.IN,
-                                           dns.rdatatype.DNSKEY)
-        except KeyError:
-            return None
+        rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
     else:
         rdataset = value
-    for rdata in rdataset:
-        if rdata.algorithm == rrsig.algorithm and \
-                key_id(rdata) == rrsig.key_tag:
-            candidate_keys.append(rdata)
-    return candidate_keys
+    if rdataset is None:
+        return None
+    return [rd for rd in rdataset if
+            rd.algorithm == rrsig.algorithm and key_id(rd) == rrsig.key_tag]
 
 
 def _is_rsa(algorithm):
@@ -251,6 +243,82 @@ def _bytes_to_long(b):
     return int.from_bytes(b, 'big')
 
 
+def _validate_signature(sig, data, key, chosen_hash):
+    if _is_rsa(key.algorithm):
+        keyptr = key.key
+        (bytes_,) = struct.unpack('!B', keyptr[0:1])
+        keyptr = keyptr[1:]
+        if bytes_ == 0:
+            (bytes_,) = struct.unpack('!H', keyptr[0:2])
+            keyptr = keyptr[2:]
+        rsa_e = keyptr[0:bytes_]
+        rsa_n = keyptr[bytes_:]
+        try:
+            public_key = rsa.RSAPublicNumbers(
+                _bytes_to_long(rsa_e),
+                _bytes_to_long(rsa_n)).public_key(default_backend())
+        except ValueError:
+            raise ValidationFailure('invalid public key')
+        public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
+    elif _is_dsa(key.algorithm):
+        keyptr = key.key
+        (t,) = struct.unpack('!B', keyptr[0:1])
+        keyptr = keyptr[1:]
+        octets = 64 + t * 8
+        dsa_q = keyptr[0:20]
+        keyptr = keyptr[20:]
+        dsa_p = keyptr[0:octets]
+        keyptr = keyptr[octets:]
+        dsa_g = keyptr[0:octets]
+        keyptr = keyptr[octets:]
+        dsa_y = keyptr[0:octets]
+        try:
+            public_key = dsa.DSAPublicNumbers(
+                _bytes_to_long(dsa_y),
+                dsa.DSAParameterNumbers(
+                    _bytes_to_long(dsa_p),
+                    _bytes_to_long(dsa_q),
+                    _bytes_to_long(dsa_g))).public_key(default_backend())
+        except ValueError:
+            raise ValidationFailure('invalid public key')
+        public_key.verify(sig, data, chosen_hash)
+    elif _is_ecdsa(key.algorithm):
+        keyptr = key.key
+        if key.algorithm == Algorithm.ECDSAP256SHA256:
+            curve = ec.SECP256R1()
+            octets = 32
+        else:
+            curve = ec.SECP384R1()
+            octets = 48
+        ecdsa_x = keyptr[0:octets]
+        ecdsa_y = keyptr[octets:octets * 2]
+        try:
+            public_key = ec.EllipticCurvePublicNumbers(
+                curve=curve,
+                x=_bytes_to_long(ecdsa_x),
+                y=_bytes_to_long(ecdsa_y)).public_key(default_backend())
+        except ValueError:
+            raise ValidationFailure('invalid public key')
+        public_key.verify(sig, data, ec.ECDSA(chosen_hash))
+    elif _is_eddsa(key.algorithm):
+        keyptr = key.key
+        if key.algorithm == Algorithm.ED25519:
+            loader = ed25519.Ed25519PublicKey
+        else:
+            loader = ed448.Ed448PublicKey
+        try:
+            public_key = loader.from_public_bytes(keyptr)
+        except ValueError:
+            raise ValidationFailure('invalid public key')
+        public_key.verify(sig, data)
+    elif _is_gost(key.algorithm):
+        raise UnsupportedAlgorithm(
+            'algorithm "%s" not supported by dnspython' %
+            algorithm_to_text(key.algorithm))
+    else:
+        raise ValidationFailure('unknown algorithm %u' % key.algorithm)
+
+
 def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
     """Validate an RRset against a single signature rdata, throwing an
     exception if validation is not successful.
@@ -288,148 +356,69 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
     if candidate_keys is None:
         raise ValidationFailure('unknown key')
 
-    for candidate_key in candidate_keys:
-        # For convenience, allow the rrset to be specified as a (name,
-        # rdataset) tuple as well as a proper rrset
-        if isinstance(rrset, tuple):
-            rrname = rrset[0]
-            rdataset = rrset[1]
-        else:
-            rrname = rrset.name
-            rdataset = rrset
-
-        if now is None:
-            now = time.time()
-        if rrsig.expiration < now:
-            raise ValidationFailure('expired')
-        if rrsig.inception > now:
-            raise ValidationFailure('not yet valid')
-
-        if _is_rsa(rrsig.algorithm):
-            keyptr = candidate_key.key
-            (bytes_,) = struct.unpack('!B', keyptr[0:1])
-            keyptr = keyptr[1:]
-            if bytes_ == 0:
-                (bytes_,) = struct.unpack('!H', keyptr[0:2])
-                keyptr = keyptr[2:]
-            rsa_e = keyptr[0:bytes_]
-            rsa_n = keyptr[bytes_:]
-            try:
-                public_key = rsa.RSAPublicNumbers(
-                    _bytes_to_long(rsa_e),
-                    _bytes_to_long(rsa_n)).public_key(default_backend())
-            except ValueError:
-                raise ValidationFailure('invalid public key')
-            sig = rrsig.signature
-        elif _is_dsa(rrsig.algorithm):
-            keyptr = candidate_key.key
-            (t,) = struct.unpack('!B', keyptr[0:1])
-            keyptr = keyptr[1:]
-            octets = 64 + t * 8
-            dsa_q = keyptr[0:20]
-            keyptr = keyptr[20:]
-            dsa_p = keyptr[0:octets]
-            keyptr = keyptr[octets:]
-            dsa_g = keyptr[0:octets]
-            keyptr = keyptr[octets:]
-            dsa_y = keyptr[0:octets]
-            try:
-                public_key = dsa.DSAPublicNumbers(
-                    _bytes_to_long(dsa_y),
-                    dsa.DSAParameterNumbers(
-                        _bytes_to_long(dsa_p),
-                        _bytes_to_long(dsa_q),
-                        _bytes_to_long(dsa_g))).public_key(default_backend())
-            except ValueError:
-                raise ValidationFailure('invalid public key')
-            sig_r = rrsig.signature[1:21]
-            sig_s = rrsig.signature[21:]
-            sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
-                                             _bytes_to_long(sig_s))
-        elif _is_ecdsa(rrsig.algorithm):
-            keyptr = candidate_key.key
-            if rrsig.algorithm == Algorithm.ECDSAP256SHA256:
-                curve = ec.SECP256R1()
-                octets = 32
-            else:
-                curve = ec.SECP384R1()
-                octets = 48
-            ecdsa_x = keyptr[0:octets]
-            ecdsa_y = keyptr[octets:octets * 2]
-            try:
-                public_key = ec.EllipticCurvePublicNumbers(
-                    curve=curve,
-                    x=_bytes_to_long(ecdsa_x),
-                    y=_bytes_to_long(ecdsa_y)).public_key(default_backend())
-            except ValueError:
-                raise ValidationFailure('invalid public key')
-            sig_r = rrsig.signature[0:octets]
-            sig_s = rrsig.signature[octets:]
-            sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
-                                             _bytes_to_long(sig_s))
-
-        elif _is_eddsa(rrsig.algorithm):
-            keyptr = candidate_key.key
-            if rrsig.algorithm == Algorithm.ED25519:
-                loader = ed25519.Ed25519PublicKey
-            else:
-                loader = ed448.Ed448PublicKey
-            try:
-                public_key = loader.from_public_bytes(keyptr)
-            except ValueError:
-                raise ValidationFailure('invalid public key')
-            sig = rrsig.signature
-        elif _is_gost(rrsig.algorithm):
-            raise UnsupportedAlgorithm(
-                'algorithm "%s" not supported by dnspython' %
-                algorithm_to_text(rrsig.algorithm))
+    # For convenience, allow the rrset to be specified as a (name,
+    # rdataset) tuple as well as a proper rrset
+    if isinstance(rrset, tuple):
+        rrname = rrset[0]
+        rdataset = rrset[1]
+    else:
+        rrname = rrset.name
+        rdataset = rrset
+
+    if now is None:
+        now = time.time()
+    if rrsig.expiration < now:
+        raise ValidationFailure('expired')
+    if rrsig.inception > now:
+        raise ValidationFailure('not yet valid')
+
+    if _is_dsa(rrsig.algorithm):
+        sig_r = rrsig.signature[1:21]
+        sig_s = rrsig.signature[21:]
+        sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+                                         _bytes_to_long(sig_s))
+    elif _is_ecdsa(rrsig.algorithm):
+        if rrsig.algorithm == Algorithm.ECDSAP256SHA256:
+            octets = 32
         else:
-            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
-
-        data = b''
-        data += rrsig.to_wire(origin=origin)[:18]
-        data += rrsig.signer.to_digestable(origin)
-
-        # Derelativize the name before considering labels.
-        rrname = rrname.derelativize(origin)
-
-        if len(rrname) - 1 < rrsig.labels:
-            raise ValidationFailure('owner name longer than RRSIG labels')
-        elif rrsig.labels < len(rrname) - 1:
-            suffix = rrname.split(rrsig.labels + 1)[1]
-            rrname = dns.name.from_text('*', suffix)
-        rrnamebuf = rrname.to_digestable()
-        rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
-                              rrsig.original_ttl)
-        rrlist = sorted(rdataset)
-        for rr in rrlist:
-            data += rrnamebuf
-            data += rrfixed
-            rrdata = rr.to_digestable(origin)
-            rrlen = struct.pack('!H', len(rrdata))
-            data += rrlen
-            data += rrdata
-
-        chosen_hash = _make_hash(rrsig.algorithm)
+            octets = 48
+        sig_r = rrsig.signature[0:octets]
+        sig_s = rrsig.signature[octets:]
+        sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+                                         _bytes_to_long(sig_s))
+    else:
+        sig = rrsig.signature
+
+    data = b''
+    data += rrsig.to_wire(origin=origin)[:18]
+    data += rrsig.signer.to_digestable(origin)
+
+    # Derelativize the name before considering labels.
+    rrname = rrname.derelativize(origin)
+
+    if len(rrname) - 1 < rrsig.labels:
+        raise ValidationFailure('owner name longer than RRSIG labels')
+    elif rrsig.labels < len(rrname) - 1:
+        suffix = rrname.split(rrsig.labels + 1)[1]
+        rrname = dns.name.from_text('*', suffix)
+    rrnamebuf = rrname.to_digestable()
+    rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
+                          rrsig.original_ttl)
+    for rr in sorted(rdataset):
+        data += rrnamebuf
+        data += rrfixed
+        rrdata = rr.to_digestable(origin)
+        rrlen = struct.pack('!H', len(rrdata))
+        data += rrlen
+        data += rrdata
+
+    chosen_hash = _make_hash(rrsig.algorithm)
+
+    for candidate_key in candidate_keys:
         try:
-            if _is_rsa(rrsig.algorithm):
-                public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
-            elif _is_dsa(rrsig.algorithm):
-                public_key.verify(sig, data, chosen_hash)
-            elif _is_ecdsa(rrsig.algorithm):
-                public_key.verify(sig, data, ec.ECDSA(chosen_hash))
-            elif _is_eddsa(rrsig.algorithm):
-                public_key.verify(sig, data)
-            else:
-                # Raise here for code clarity; this won't actually ever happen
-                # since if the algorithm is really unknown we'd already have
-                # raised an exception above
-                raise ValidationFailure('unknown algorithm %u' %
-                                        rrsig.algorithm)  # pragma: no cover
-            # If we got here, we successfully verified so we can return
-            # without error
+            _validate_signature(sig, data, candidate_key, chosen_hash)
             return
-        except InvalidSignature:
+        except (InvalidSignature, ValidationFailure):
             # this happens on an individual validation failure
             continue
     # nothing verified -- raise failure: