"""Common DNSSEC-related functions and constants."""
-from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
import hashlib
import math
import dns.rdatatype
import dns.rdataclass
import dns.rrset
+from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
+from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
raise TypeError("Unsupported timestamp type")
-def key_id(key: DNSKEY) -> int:
+def key_id(key: Union[DNSKEY,CDNSKEY]) -> int:
"""Return the key id (a 16-bit number) for the specified key.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
- *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``, the key the DS is about.
+ *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, the key the DS is about.
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
check = policy.ok_to_create_ds
if not check(algorithm):
raise DeniedByPolicy
- if not isinstance(key, DNSKEY):
- raise ValueError("key is not a DNSKEY")
+ if not isinstance(key, (DNSKEY, CDNSKEY)):
+ raise ValueError("key is not a DNSKEY/CDNSKEY")
if algorithm == DSDigest.SHA1:
dshash = hashlib.sha1()
elif algorithm == DSDigest.SHA256:
return cast(DS, ds)
+def make_cds(
+ name: Union[dns.name.Name, str],
+ key: dns.rdata.Rdata,
+ algorithm: Union[DSDigest, str],
+ origin: Optional[dns.name.Name] = None,
+) -> CDS:
+ """Create a CDS record for a DNSSEC key.
+
+ *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
+
+ *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, key the DS is about.
+
+ *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+ The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+ does not matter for these strings.
+
+ *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
+ then it will be made absolute using the specified origin.
+
+ Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+
+ Returns a ``dns.rdtypes.ANY.DS.CDS``
+ """
+
+ ds = make_ds(name, key, algorithm, origin)
+ return CDS(
+ rdclass=ds.rdclass,
+ rdtype=dns.rdatatype.CDS,
+ key_tag=ds.key_tag,
+ algorithm=ds.algorithm,
+ digest_type=ds.digest_type,
+ digest=ds.digest,
+ )
+
+
def _find_candidate_keys(
keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG
) -> Optional[List[DNSKEY]]:
return int.from_bytes(b, "big")
+def _get_rrname_rdataset(
+ rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
+ if isinstance(rrset, tuple):
+ return rrset[0], rrset[1]
+ else:
+ return rrset.name, rrset
+
+
def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None:
keyptr: bytes
if _is_rsa(key.algorithm):
# For convenience, allow the rrset to be specified as a (name,
# rdataset) tuple as well as a proper rrset
- if isinstance(rrset, tuple):
- rrname = rrset[0]
- rdataset = rrset[1]
- else:
- rrname = rrset.name
- rdataset = rrset
+ rrname, rdataset = _get_rrname_rdataset(rrset)
data = b""
data += rrsig.to_wire(origin=signer)[:18]
)
+def _make_cdnskey(
+ public_key: PublicKey,
+ algorithm: Union[int, str],
+ flags: int = Flag.ZONE,
+ protocol: int = 3,
+) -> CDNSKEY:
+ """Convert a public key to CDNSKEY Rdata
+
+ *public_key*, the public key to convert, a
+ ``cryptography.hazmat.primitives.asymmetric`` public key class applicable
+ for DNSSEC.
+
+ *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
+
+ *flags: DNSKEY flags field as an integer.
+
+ *protocol*: DNSKEY protocol field as an integer.
+
+ Raises ``ValueError`` if the specified key algorithm parameters are not
+ unsupported, ``TypeError`` if the key type is unsupported,
+ `UnsupportedAlgorithm` if the algorithm is unknown and
+ `AlgorithmKeyMismatch` if the algorithm does not match the key type.
+
+ Return CDNSKEY ``Rdata``.
+ """
+
+ dnskey = _make_dnskey(public_key, algorithm, flags, protocol)
+
+ return CDNSKEY(
+ rdclass=dnskey.rdclass,
+ rdtype=dns.rdatatype.CDNSKEY,
+ flags=dnskey.flags,
+ protocol=dnskey.protocol,
+ algorithm=dnskey.algorithm,
+ key=dnskey.key,
+ )
+
+
def nsec3_hash(
domain: Union[dns.name.Name, str],
salt: Optional[Union[str, bytes]],
return output
+def make_ds_rdataset(
+ rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ algorithms: Set[Union[DSDigest, str]],
+ origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+ """Create a DS record from DNSKEY/CDNSKEY/CDS.
+
+ *rrset*, the RRset to create DS Rdataset for. This can be a
+ ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+ tuple.
+
+ *algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms.
+ The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+ does not matter for these strings. If the RRset is a CDS, only digest
+ algorithms matching algorithms are accepted.
+
+ *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
+ then it will be made absolute using the specified origin.
+
+ Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and
+ ``ValueError`` if the given RRset is not usable.
+
+ Returns a ``dns.rdataset.Rdataset``
+ """
+
+ rrname, rdataset = _get_rrname_rdataset(rrset)
+
+ if rdataset.rdtype not in (
+ dns.rdatatype.DNSKEY,
+ dns.rdatatype.CDNSKEY,
+ dns.rdatatype.CDS,
+ ):
+ raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS")
+
+ _algorithms = set()
+ for algorithm in algorithms:
+ try:
+ if isinstance(algorithm, str):
+ algorithm = DSDigest[algorithm.upper()]
+ except Exception:
+ raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+ _algorithms.add(algorithm)
+
+ if rdataset.rdtype == dns.rdatatype.CDS:
+ res = []
+ for rdata in cds_rdataset_to_ds_rdataset(rdataset):
+ if rdata.digest_type in _algorithms:
+ res.append(rdata)
+ if not len(res):
+ raise ValueError("no acceptable CDS rdata found")
+ return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+ res = []
+ for algorithm in _algorithms:
+ res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin))
+ return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def cds_rdataset_to_ds_rdataset(
+ rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+ """Create a CDS record from DS.
+
+ *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+ Raises ``ValueError`` if the rdataset is not CDS.
+
+ Returns a ``dns.rdataset.Rdataset``
+ """
+
+ if rdataset.rdtype != dns.rdatatype.CDS:
+ raise ValueError("rdataset not a CDS")
+ res = []
+ for rdata in rdataset:
+ res.append(
+ CDS(
+ rdclass=rdata.rdclass,
+ rdtype=dns.rdatatype.DS,
+ key_tag=rdata.key_tag,
+ algorithm=rdata.algorithm,
+ digest_type=rdata.digest_type,
+ digest=rdata.digest,
+ )
+ )
+ return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cds_rdataset(
+ name: Union[dns.name.Name, str],
+ rdataset: dns.rdataset.Rdataset,
+ algorithm: Union[DSDigest, str],
+ origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+ """Create a CDS record from DNSKEY/CDNSKEY.
+
+ *name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record.
+
+ *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+ *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+ The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+ does not matter for these strings.
+
+ *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
+ then it will be made absolute using the specified origin.
+
+ Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or
+ ``ValueError`` if the rdataset is not DNSKEY/CDNSKEY.
+
+ Returns a ``dns.rdataset.Rdataset``
+ """
+
+ if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY):
+ raise ValueError("rdataset not a DNSKEY/CDNSKEY")
+ res = []
+ for rdata in rdataset:
+ res.append(make_cds(name, rdata, algorithm, origin))
+ return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cdnskey_rdataset(
+ rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+ """Create a CDNSKEY record from DNSKEY.
+
+ *rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for.
+
+ Returns a ``dns.rdataset.Rdataset``
+ """
+
+ if rdataset.rdtype != dns.rdatatype.DNSKEY:
+ raise ValueError("rdataset not a DNSKEY")
+ res = []
+ for rdata in rdataset:
+ res.append(
+ CDNSKEY(
+ rdclass=rdataset.rdclass,
+ rdtype=rdataset.rdtype,
+ flags=rdata.flags,
+ protocol=rdata.protocol,
+ algorithm=rdata.algorithm,
+ key=rdata.key,
+ )
+ )
+ return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
def _need_pyca(*args, **kwargs):
raise ImportError(
"DNSSEC validation requires " + "python cryptography"
validate_rrsig = _need_pyca
sign = _need_pyca
make_dnskey = _need_pyca
+ make_cdnskey = _need_pyca
_have_pyca = False
else:
validate = _validate # type: ignore
validate_rrsig = _validate_rrsig # type: ignore
sign = _sign
make_dnskey = _make_dnskey
+ make_cdnskey = _make_cdnskey
_have_pyca = True
### BEGIN generated Algorithm constants
import dns.rdata
import dns.rdataclass
import dns.rdatatype
+import dns.rdtypes.ANY.CDNSKEY
import dns.rdtypes.ANY.CDS
+import dns.rdtypes.ANY.DNSKEY
import dns.rdtypes.ANY.DS
import dns.rrset
"57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013",
)
+good_cds = dns.rdata.from_text(
+ dns.rdataclass.IN,
+ dns.rdatatype.CDS,
+ "57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013",
+)
+
when2 = 1290425644
abs_example = dns.name.from_text("example")
ds = dns.dnssec.make_ds(abs_dnspython_org, sep_key, "SHA256")
self.assertEqual(ds, good_ds)
+ def testMakeSHA256CDS(self): # type: () -> None
+ cds = dns.dnssec.make_cds(abs_dnspython_org, sep_key, "SHA256")
+ self.assertEqual(cds, good_cds)
+
def testInvalidAlgorithm(self): # type: () -> None
algorithm: Any
for algorithm in (10, "shax"):
dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.CDS, record)
self.assertEqual(msg, str(cm.exception))
+ def testMakeCDS(self): # type: () -> None
+ name = dns.name.from_text("example.com")
+ key = ed448.Ed448PrivateKey.generate()
+
+ for dnskey in [
+ dns.dnssec.make_dnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ ),
+ dns.dnssec.make_cdnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ ),
+ ]:
+ dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey])
+ cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset(
+ name, dnskey_rdataset, "SHA256"
+ )
+ self.assertEqual(len(dnskey_rdataset), len(cds_rdataset))
+ for d, c in zip(dnskey_rdataset, cds_rdataset):
+ self.assertTrue(
+ isinstance(
+ d,
+ (
+ dns.rdtypes.ANY.DNSKEY.DNSKEY,
+ dns.rdtypes.ANY.CDNSKEY.CDNSKEY,
+ ),
+ )
+ )
+ self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDS.CDS))
+ self.assertEqual(dns.dnssec.key_id(d), c.key_tag)
+ self.assertEqual(d.algorithm, c.algorithm)
+
+ def testMakeManyDSfromCDS(self): # type: () -> None
+ name = dns.name.from_text("example.com")
+ nkeys = 3
+ algorithms = ["SHA256", "SHA384"]
+ keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)]
+
+ dnskeys = [
+ dns.dnssec.make_dnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ )
+ for key in keys
+ ]
+
+ dnskey_rdataset = dns.rdataset.from_rdata_list(3600, dnskeys)
+
+ cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset(
+ name, dnskey_rdataset, "SHA256"
+ )
+ cds_rrset = dns.rrset.from_rdata_list(name, 3600, cds_rdataset)
+
+ ds_rdataset = dns.dnssec.make_ds_rdataset(cds_rrset, algorithms)
+
+ self.assertEqual(len(cds_rdataset), nkeys)
+
+ def testMakeManyDSfromDNSKEY(self): # type: () -> None
+ name = dns.name.from_text("example.com")
+ nkeys = 3
+ algorithms = ["SHA256", "SHA384"]
+ keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)]
+
+ dnskeys = [
+ dns.dnssec.make_dnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ )
+ for key in keys
+ ]
+
+ dnskey_rrset = dns.rrset.from_rdata_list(name, 3600, dnskeys)
+
+ ds_rdataset = dns.dnssec.make_ds_rdataset(dnskey_rrset, algorithms)
+
+ self.assertEqual(len(ds_rdataset), nkeys * len(algorithms))
+
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
class DNSSECMakeDNSKEYTestCase(unittest.TestCase):
with self.assertRaises(ValueError):
dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA)
+ def testMakeCDNSKEY(self): # type: () -> None
+ key = ed448.Ed448PrivateKey.generate()
+ dnskey = dns.dnssec.make_dnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ )
+ cdnskey = dns.dnssec.make_cdnskey(
+ key.public_key(), algorithm=dns.dnssec.Algorithm.ED448
+ )
+
+ self.assertEqual(dnskey.flags, cdnskey.flags)
+ self.assertEqual(dnskey.protocol, cdnskey.protocol)
+ self.assertEqual(dnskey.algorithm, cdnskey.algorithm)
+ self.assertEqual(dnskey.key, cdnskey.key)
+
+ dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey])
+ cdnskey_rdataset = dns.dnssec.dnskey_rdataset_to_cdnskey_rdataset(
+ dnskey_rdataset
+ )
+ self.assertEqual(len(dnskey_rdataset), len(cdnskey_rdataset))
+ for d, c in zip(dnskey_rdataset, cdnskey_rdataset):
+ self.assertTrue(isinstance(d, dns.rdtypes.ANY.DNSKEY.DNSKEY))
+ self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDNSKEY.CDNSKEY))
+ self.assertEqual(d, c)
+
# XXXRTH This test is fine but is noticably slow, so I have commented it out for
# now