]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
CDS/CDNSKEY utilities (#872)
authorJakob Schlyter <jakob@kirei.se>
Fri, 23 Dec 2022 17:11:31 +0000 (18:11 +0100)
committerGitHub <noreply@github.com>
Fri, 23 Dec 2022 17:11:31 +0000 (09:11 -0800)
Add CDS and CDNSKEY utilities:

make_cdnskey()
make_cds()
make_ds_rdataset()
cds_rdataset_to_ds_rdataset()
dnskey_rdataset_to_cds_rdataset()
dnskey_rdataset_to_cdnskey_rdataset()

dns/dnssec.py
tests/test_dnssec.py

index 4cfb75e3b62933b98db70106b2e8ee8c254acbb3..3589b1f14cf4a09cf14c3fac398dbdbd1002a2bd 100644 (file)
@@ -17,7 +17,7 @@
 
 """Common DNSSEC-related functions and constants."""
 
-from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
 
 import hashlib
 import math
@@ -36,6 +36,8 @@ import dns.rdata
 import dns.rdatatype
 import dns.rdataclass
 import dns.rrset
+from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
+from dns.rdtypes.ANY.CDS import CDS
 from dns.rdtypes.ANY.DNSKEY import DNSKEY
 from dns.rdtypes.ANY.DS import DS
 from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
@@ -109,7 +111,7 @@ def to_timestamp(value: Union[datetime, str, float, int]) -> int:
         raise TypeError("Unsupported timestamp type")
 
 
-def key_id(key: DNSKEY) -> int:
+def key_id(key: Union[DNSKEY,CDNSKEY]) -> int:
     """Return the key id (a 16-bit number) for the specified key.
 
     *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``
@@ -193,7 +195,7 @@ def make_ds(
 
     *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
 
-    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``, the key the DS is about.
+    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, the key the DS is about.
 
     *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
     The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
@@ -230,8 +232,8 @@ def make_ds(
         check = policy.ok_to_create_ds
     if not check(algorithm):
         raise DeniedByPolicy
-    if not isinstance(key, DNSKEY):
-        raise ValueError("key is not a DNSKEY")
+    if not isinstance(key, (DNSKEY, CDNSKEY)):
+        raise ValueError("key is not a DNSKEY/CDNSKEY")
     if algorithm == DSDigest.SHA1:
         dshash = hashlib.sha1()
     elif algorithm == DSDigest.SHA256:
@@ -256,6 +258,41 @@ def make_ds(
     return cast(DS, ds)
 
 
+def make_cds(
+    name: Union[dns.name.Name, str],
+    key: dns.rdata.Rdata,
+    algorithm: Union[DSDigest, str],
+    origin: Optional[dns.name.Name] = None,
+) -> CDS:
+    """Create a CDS record for a DNSSEC key.
+
+    *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
+
+    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``,  key the DS is about.
+
+    *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+
+    Returns a ``dns.rdtypes.ANY.DS.CDS``
+    """
+
+    ds = make_ds(name, key, algorithm, origin)
+    return CDS(
+        rdclass=ds.rdclass,
+        rdtype=dns.rdatatype.CDS,
+        key_tag=ds.key_tag,
+        algorithm=ds.algorithm,
+        digest_type=ds.digest_type,
+        digest=ds.digest,
+    )
+
+
 def _find_candidate_keys(
     keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG
 ) -> Optional[List[DNSKEY]]:
@@ -376,6 +413,15 @@ def _bytes_to_long(b: bytes) -> int:
     return int.from_bytes(b, "big")
 
 
+def _get_rrname_rdataset(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
+    if isinstance(rrset, tuple):
+        return rrset[0], rrset[1]
+    else:
+        return rrset.name, rrset
+
+
 def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None:
     keyptr: bytes
     if _is_rsa(key.algorithm):
@@ -798,12 +844,7 @@ def _make_rrsig_signature_data(
 
     # For convenience, allow the rrset to be specified as a (name,
     # rdataset) tuple as well as a proper rrset
-    if isinstance(rrset, tuple):
-        rrname = rrset[0]
-        rdataset = rrset[1]
-    else:
-        rrname = rrset.name
-        rdataset = rrset
+    rrname, rdataset = _get_rrname_rdataset(rrset)
 
     data = b""
     data += rrsig.to_wire(origin=signer)[:18]
@@ -927,6 +968,44 @@ def _make_dnskey(
     )
 
 
+def _make_cdnskey(
+    public_key: PublicKey,
+    algorithm: Union[int, str],
+    flags: int = Flag.ZONE,
+    protocol: int = 3,
+) -> CDNSKEY:
+    """Convert a public key to CDNSKEY Rdata
+
+    *public_key*, the public key to convert, a
+    ``cryptography.hazmat.primitives.asymmetric`` public key class applicable
+    for DNSSEC.
+
+    *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
+
+    *flags: DNSKEY flags field as an integer.
+
+    *protocol*: DNSKEY protocol field as an integer.
+
+    Raises ``ValueError`` if the specified key algorithm parameters are not
+    unsupported, ``TypeError`` if the key type is unsupported,
+    `UnsupportedAlgorithm` if the algorithm is unknown and
+    `AlgorithmKeyMismatch` if the algorithm does not match the key type.
+
+    Return CDNSKEY ``Rdata``.
+    """
+
+    dnskey = _make_dnskey(public_key, algorithm, flags, protocol)
+
+    return CDNSKEY(
+        rdclass=dnskey.rdclass,
+        rdtype=dns.rdatatype.CDNSKEY,
+        flags=dnskey.flags,
+        protocol=dnskey.protocol,
+        algorithm=dnskey.algorithm,
+        key=dnskey.key,
+    )
+
+
 def nsec3_hash(
     domain: Union[dns.name.Name, str],
     salt: Optional[Union[str, bytes]],
@@ -988,6 +1067,153 @@ def nsec3_hash(
     return output
 
 
+def make_ds_rdataset(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    algorithms: Set[Union[DSDigest, str]],
+    origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+    """Create a DS record from DNSKEY/CDNSKEY/CDS.
+
+    *rrset*, the RRset to create DS Rdataset for.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings. If the RRset is a CDS, only digest
+    algorithms matching algorithms are accepted.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and
+    ``ValueError`` if the given RRset is not usable.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    rrname, rdataset = _get_rrname_rdataset(rrset)
+
+    if rdataset.rdtype not in (
+        dns.rdatatype.DNSKEY,
+        dns.rdatatype.CDNSKEY,
+        dns.rdatatype.CDS,
+    ):
+        raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS")
+
+    _algorithms = set()
+    for algorithm in algorithms:
+        try:
+            if isinstance(algorithm, str):
+                algorithm = DSDigest[algorithm.upper()]
+        except Exception:
+            raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+        _algorithms.add(algorithm)
+
+    if rdataset.rdtype == dns.rdatatype.CDS:
+        res = []
+        for rdata in cds_rdataset_to_ds_rdataset(rdataset):
+            if rdata.digest_type in _algorithms:
+                res.append(rdata)
+        if not len(res):
+            raise ValueError("no acceptable CDS rdata found")
+        return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+    res = []
+    for algorithm in _algorithms:
+        res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin))
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def cds_rdataset_to_ds_rdataset(
+    rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+    """Create a CDS record from DS.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+    Raises ``ValueError`` if the rdataset is not CDS.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype != dns.rdatatype.CDS:
+        raise ValueError("rdataset not a CDS")
+    res = []
+    for rdata in rdataset:
+        res.append(
+            CDS(
+                rdclass=rdata.rdclass,
+                rdtype=dns.rdatatype.DS,
+                key_tag=rdata.key_tag,
+                algorithm=rdata.algorithm,
+                digest_type=rdata.digest_type,
+                digest=rdata.digest,
+            )
+        )
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cds_rdataset(
+    name: Union[dns.name.Name, str],
+    rdataset: dns.rdataset.Rdataset,
+    algorithm: Union[DSDigest, str],
+    origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+    """Create a CDS record from DNSKEY/CDNSKEY.
+
+    *name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+    *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or
+    ``ValueError`` if the rdataset is not DNSKEY/CDNSKEY.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY):
+        raise ValueError("rdataset not a DNSKEY/CDNSKEY")
+    res = []
+    for rdata in rdataset:
+        res.append(make_cds(name, rdata, algorithm, origin))
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cdnskey_rdataset(
+    rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+    """Create a CDNSKEY record from DNSKEY.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype != dns.rdatatype.DNSKEY:
+        raise ValueError("rdataset not a DNSKEY")
+    res = []
+    for rdata in rdataset:
+        res.append(
+            CDNSKEY(
+                rdclass=rdataset.rdclass,
+                rdtype=rdataset.rdtype,
+                flags=rdata.flags,
+                protocol=rdata.protocol,
+                algorithm=rdata.algorithm,
+                key=rdata.key,
+            )
+        )
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
 def _need_pyca(*args, **kwargs):
     raise ImportError(
         "DNSSEC validation requires " + "python cryptography"
@@ -1010,12 +1236,14 @@ except ImportError:  # pragma: no cover
     validate_rrsig = _need_pyca
     sign = _need_pyca
     make_dnskey = _need_pyca
+    make_cdnskey = _need_pyca
     _have_pyca = False
 else:
     validate = _validate  # type: ignore
     validate_rrsig = _validate_rrsig  # type: ignore
     sign = _sign
     make_dnskey = _make_dnskey
+    make_cdnskey = _make_cdnskey
     _have_pyca = True
 
 ### BEGIN generated Algorithm constants
index 4a25cd2a0904ee1b22e5670dce97ef517c6ee753..098af693718b425fd5274c1c8c9c142bdf6db7ab 100644 (file)
@@ -25,7 +25,9 @@ import dns.name
 import dns.rdata
 import dns.rdataclass
 import dns.rdatatype
+import dns.rdtypes.ANY.CDNSKEY
 import dns.rdtypes.ANY.CDS
+import dns.rdtypes.ANY.DNSKEY
 import dns.rdtypes.ANY.DS
 import dns.rrset
 
@@ -164,6 +166,12 @@ good_ds = dns.rdata.from_text(
     "57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013",
 )
 
+good_cds = dns.rdata.from_text(
+    dns.rdataclass.IN,
+    dns.rdatatype.CDS,
+    "57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013",
+)
+
 when2 = 1290425644
 
 abs_example = dns.name.from_text("example")
@@ -937,6 +945,10 @@ class DNSSECMakeDSTestCase(unittest.TestCase):
         ds = dns.dnssec.make_ds(abs_dnspython_org, sep_key, "SHA256")
         self.assertEqual(ds, good_ds)
 
+    def testMakeSHA256CDS(self):  # type: () -> None
+        cds = dns.dnssec.make_cds(abs_dnspython_org, sep_key, "SHA256")
+        self.assertEqual(cds, good_cds)
+
     def testInvalidAlgorithm(self):  # type: () -> None
         algorithm: Any
         for algorithm in (10, "shax"):
@@ -1006,6 +1018,80 @@ class DNSSECMakeDSTestCase(unittest.TestCase):
                     dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.CDS, record)
                 self.assertEqual(msg, str(cm.exception))
 
+    def testMakeCDS(self):  # type: () -> None
+        name = dns.name.from_text("example.com")
+        key = ed448.Ed448PrivateKey.generate()
+
+        for dnskey in [
+            dns.dnssec.make_dnskey(
+                key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+            ),
+            dns.dnssec.make_cdnskey(
+                key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+            ),
+        ]:
+            dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey])
+            cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset(
+                name, dnskey_rdataset, "SHA256"
+            )
+            self.assertEqual(len(dnskey_rdataset), len(cds_rdataset))
+            for d, c in zip(dnskey_rdataset, cds_rdataset):
+                self.assertTrue(
+                    isinstance(
+                        d,
+                        (
+                            dns.rdtypes.ANY.DNSKEY.DNSKEY,
+                            dns.rdtypes.ANY.CDNSKEY.CDNSKEY,
+                        ),
+                    )
+                )
+                self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDS.CDS))
+                self.assertEqual(dns.dnssec.key_id(d), c.key_tag)
+                self.assertEqual(d.algorithm, c.algorithm)
+
+    def testMakeManyDSfromCDS(self):  # type: () -> None
+        name = dns.name.from_text("example.com")
+        nkeys = 3
+        algorithms = ["SHA256", "SHA384"]
+        keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)]
+
+        dnskeys = [
+            dns.dnssec.make_dnskey(
+                key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+            )
+            for key in keys
+        ]
+
+        dnskey_rdataset = dns.rdataset.from_rdata_list(3600, dnskeys)
+
+        cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset(
+            name, dnskey_rdataset, "SHA256"
+        )
+        cds_rrset = dns.rrset.from_rdata_list(name, 3600, cds_rdataset)
+
+        ds_rdataset = dns.dnssec.make_ds_rdataset(cds_rrset, algorithms)
+
+        self.assertEqual(len(cds_rdataset), nkeys)
+
+    def testMakeManyDSfromDNSKEY(self):  # type: () -> None
+        name = dns.name.from_text("example.com")
+        nkeys = 3
+        algorithms = ["SHA256", "SHA384"]
+        keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)]
+
+        dnskeys = [
+            dns.dnssec.make_dnskey(
+                key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+            )
+            for key in keys
+        ]
+
+        dnskey_rrset = dns.rrset.from_rdata_list(name, 3600, dnskeys)
+
+        ds_rdataset = dns.dnssec.make_ds_rdataset(dnskey_rrset, algorithms)
+
+        self.assertEqual(len(ds_rdataset), nkeys * len(algorithms))
+
 
 @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
 class DNSSECMakeDNSKEYTestCase(unittest.TestCase):
@@ -1035,6 +1121,30 @@ class DNSSECMakeDNSKEYTestCase(unittest.TestCase):
         with self.assertRaises(ValueError):
             dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA)
 
+    def testMakeCDNSKEY(self):  # type: () -> None
+        key = ed448.Ed448PrivateKey.generate()
+        dnskey = dns.dnssec.make_dnskey(
+            key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+        )
+        cdnskey = dns.dnssec.make_cdnskey(
+            key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+        )
+
+        self.assertEqual(dnskey.flags, cdnskey.flags)
+        self.assertEqual(dnskey.protocol, cdnskey.protocol)
+        self.assertEqual(dnskey.algorithm, cdnskey.algorithm)
+        self.assertEqual(dnskey.key, cdnskey.key)
+
+        dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey])
+        cdnskey_rdataset = dns.dnssec.dnskey_rdataset_to_cdnskey_rdataset(
+            dnskey_rdataset
+        )
+        self.assertEqual(len(dnskey_rdataset), len(cdnskey_rdataset))
+        for d, c in zip(dnskey_rdataset, cdnskey_rdataset):
+            self.assertTrue(isinstance(d, dns.rdtypes.ANY.DNSKEY.DNSKEY))
+            self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDNSKEY.CDNSKEY))
+            self.assertEqual(d, c)
+
     # XXXRTH This test is fine but is noticably slow, so I have commented it out for
     # now