"""Common DNSSEC-related functions and constants."""
-from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, cast, Callable, Dict, List, Optional, Set, Tuple, Union
+
+import contextlib
+import functools
import hashlib
import math
import struct
import dns.rdatatype
import dns.rdataclass
import dns.rrset
+import dns.transaction
+import dns.zone
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
+from dns.rdtypes.ANY.NSEC import NSEC, Bitmap
+from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
from dns.rdtypes.dnskeybase import Flag
"ed448.Ed448PrivateKey",
]
+RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None]
+
def algorithm_from_text(text: str) -> Algorithm:
"""Convert text into a DNSSEC algorithm value.
return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+def default_rrset_signer(
+ txn: dns.transaction.Transaction,
+ rrset: dns.rrset.RRset,
+ signer: dns.name.Name,
+ ksks: List[Tuple[PrivateKey, DNSKEY]],
+ zsks: List[Tuple[PrivateKey, DNSKEY]],
+ inception: Optional[Union[datetime, str, int, float]] = None,
+ expiration: Optional[Union[datetime, str, int, float]] = None,
+ lifetime: Optional[int] = None,
+) -> None:
+ """Default RRset signer"""
+
+ if rrset.rdtype in set(
+ [
+ dns.rdatatype.RdataType.DNSKEY,
+ dns.rdatatype.RdataType.CDS,
+ dns.rdatatype.RdataType.CDNSKEY,
+ ]
+ ):
+ keys = ksks
+ else:
+ keys = zsks
+
+ for (private_key, dnskey) in keys:
+ rrsig = dns.dnssec.sign(
+ rrset=rrset,
+ private_key=private_key,
+ dnskey=dnskey,
+ inception=inception,
+ expiration=expiration,
+ lifetime=lifetime,
+ signer=signer,
+ )
+ txn.add(rrset.name, rrset.ttl, rrsig)
+
+
+def sign_zone(
+ zone: dns.zone.Zone,
+ txn: Optional[dns.transaction.Transaction] = None,
+ keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
+ add_dnskey: bool = True,
+ dnskey_ttl: Optional[int] = None,
+ inception: Optional[Union[datetime, str, int, float]] = None,
+ expiration: Optional[Union[datetime, str, int, float]] = None,
+ lifetime: Optional[int] = None,
+ nsec3: Optional[NSEC3PARAM] = None,
+ rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+ """Sign zone.
+
+ *zone*, a ``dns.zone.Zone``, the zone to sign.
+
+ *txn*, a ``dns.transaction.Transaction``, an optional transaction to use
+ for signing.
+
+ *keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing.
+ KSK/ZSK roles are assigned automatically if the SEP flag is used, otherwise
+ all RRsets are signed by all keys.
+
+ *add_dnskey*, a ``bool``. If ``True``, the default, all specified
+ DNSKEYs are automatically added to the zone on signing.
+
+ *dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified
+ the TTL of the existing DNSKEY RRset used or the TTL of the SOA RRset.
+
+ *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the
+ signature inception time. If ``None``, the current time is used. If a ``str``, the
+ format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX
+ epoch in text form; this is the same the RRSIG rdata's text form.
+ Values of type `int` or `float` are interpreted as seconds since the UNIX epoch.
+
+ *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+ expiration time. If ``None``, the expiration time will be the inception time plus
+ the value of the *lifetime* parameter. See the description of *inception* above
+ for how the various parameter types are interpreted.
+
+ *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This
+ parameter is only meaningful if *expiration* is ``None``.
+
+ *nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet implemented.
+
+ *rrset_signer*, a ``Callable``, an optional function for signing RRsets.
+ The function requires two arguments: transaction and RRset. If the not
+ specified, ``dns.dnssec.default_rrset_signer`` will be used.
+
+ Returns ``None``.
+ """
+
+ ksks = []
+ zsks = []
+
+ # if we have both KSKs and ZSKs, split by SEP flag. if not, sign all
+ # records with all keys
+ if keys:
+ for key in keys:
+ if key[1].flags & Flag.SEP:
+ ksks.append(key)
+ else:
+ zsks.append(key)
+ if not ksks:
+ ksks = keys
+ if not zsks:
+ zsks = keys
+ else:
+ keys = []
+
+ if txn:
+ cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn)
+ else:
+ cm = zone.writer()
+
+ with cm as _txn:
+ if add_dnskey:
+ if dnskey_ttl is None:
+ dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY)
+ if dnskey:
+ dnskey_ttl = dnskey.ttl
+ else:
+ soa = _txn.get(zone.origin, dns.rdatatype.SOA)
+ dnskey_ttl = soa.ttl
+ for (_, dnskey) in keys:
+ _txn.add(zone.origin, dnskey_ttl, dnskey)
+
+ if nsec3:
+ raise NotImplementedError("Signing with NSEC3 not yet implemented")
+ else:
+ _rrset_signer = rrset_signer or functools.partial(
+ default_rrset_signer,
+ signer=zone.origin,
+ ksks=ksks,
+ zsks=zsks,
+ inception=inception,
+ expiration=expiration,
+ lifetime=lifetime,
+ )
+ return _sign_zone_nsec(zone, _txn, _rrset_signer)
+
+
+def _sign_zone_nsec(
+ zone: dns.zone.Zone,
+ txn: dns.transaction.Transaction,
+ rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+ """NSEC zone signer"""
+
+ def _txn_add_nsec(
+ txn: dns.transaction.Transaction,
+ name: dns.name.Name,
+ next_secure: Optional[dns.name.Name],
+ rdclass: dns.rdataclass.RdataClass,
+ ttl: int,
+ rrset_signer: Optional[RRsetSigner] = None,
+ ) -> None:
+ """NSEC zone signer helper"""
+ mandatory_types = set(
+ [dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC]
+ )
+ node = txn.get_node(name)
+ if node and next_secure:
+ types = (
+ set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types
+ )
+ windows = Bitmap.from_rdtypes(list(types))
+ rrset = dns.rrset.from_rdata(
+ name,
+ ttl,
+ NSEC(
+ rdclass=rdclass,
+ rdtype=dns.rdatatype.RdataType.NSEC,
+ next=next_secure,
+ windows=windows,
+ ),
+ )
+ txn.add(rrset)
+ if rrset_signer:
+ rrset_signer(txn, rrset)
+
+ rrsig_ttl = zone.get_soa().minimum
+ delegation = None
+ last_secure = None
+
+ for name in sorted(txn.iterate_names()):
+ if delegation and name.is_subdomain(delegation):
+ # names below delegations are not secure
+ continue
+ elif txn.get(name, dns.rdatatype.NS) and name != zone.origin:
+ # inside delegation
+ delegation = name
+ else:
+ # outside delegation
+ delegation = None
+
+ if rrset_signer:
+ node = txn.get_node(name)
+ if node:
+ for rdataset in node.rdatasets:
+ if rdataset.rdtype == dns.rdatatype.RRSIG:
+ # do not sign RRSIGs
+ continue
+ elif delegation and rdataset.rdtype != dns.rdatatype.DS:
+ # do not sign delegations except DS records
+ continue
+ else:
+ rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset)
+ rrset_signer(txn, rrset)
+
+ if last_secure:
+ _txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer)
+ last_secure = name
+
+ if last_secure:
+ _txn_add_nsec(
+ txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer
+ )
+
+
def _need_pyca(*args, **kwargs):
raise ImportError(
"DNSSEC validation requires " + "python cryptography"
from datetime import datetime, timedelta, timezone
from typing import Any
+import functools
import unittest
import dns.dnssec
import dns.rdtypes.ANY.DNSKEY
import dns.rdtypes.ANY.DS
import dns.rrset
+import dns.zone
+
+from dns.rdtypes.dnskeybase import Flag
from .keys import test_dnskeys
" SXTV9hCLVFWU4PS+/fxxfOHCetsY5tWWSxZi zSHfgpGfsHWzQoAamag4XYDyykc=",
)
+test_zone_sans_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+ns1.example. 3600 IN A 10.0.0.1
+ns2.example. 3600 IN A 10.0.0.2
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
+test_zone_rrsigs = set(
+ [
+ ("example.", dns.rdatatype.DNSKEY),
+ ("example.", dns.rdatatype.NS),
+ ("example.", dns.rdatatype.NSEC),
+ ("example.", dns.rdatatype.SOA),
+ ("bar.foo.example.", dns.rdatatype.MX),
+ ("bar.foo.example.", dns.rdatatype.NSEC),
+ ("ns1.example.", dns.rdatatype.A),
+ ("ns1.example.", dns.rdatatype.NSEC),
+ ("ns2.example.", dns.rdatatype.A),
+ ("ns2.example.", dns.rdatatype.NSEC),
+ ("sub.example.", dns.rdatatype.DS),
+ ("sub.example.", dns.rdatatype.NSEC),
+ ]
+)
+
+test_zone_with_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+example. 5 IN NSEC bar.foo.example. NS NSEC SOA RRSIG
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+bar.foo.example. 5 IN NSEC ns1.example. MX NSEC RRSIG
+ns1.example. 3600 IN A 10.0.0.1
+ns1.example. 5 IN NSEC ns2.example. A NSEC RRSIG
+ns2.example. 3600 IN A 10.0.0.2
+ns2.example. 5 IN NSEC sub.example. A NSEC RRSIG
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.example. 5 IN NSEC example. DS NS NSEC RRSIG
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
class DNSSECValidatorTestCase(unittest.TestCase):
ts = dns.dnssec.to_timestamp(441812220)
self.assertEqual(ts, REFERENCE_TIMESTAMP)
+ def test_sign_zone(self):
+ zone = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+
+ algorithm = dns.dnssec.Algorithm.ED25519
+ lifetime = 3600
+
+ ksk_private_key = ed25519.Ed25519PrivateKey.generate()
+ ksk_dnskey = dns.dnssec.make_dnskey(
+ public_key=ksk_private_key.public_key(),
+ algorithm=algorithm,
+ flags=Flag.ZONE | Flag.SEP,
+ )
+
+ zsk_private_key = ed25519.Ed25519PrivateKey.generate()
+ zsk_dnskey = dns.dnssec.make_dnskey(
+ public_key=zsk_private_key.public_key(),
+ algorithm=algorithm,
+ flags=Flag.ZONE,
+ )
+
+ keys = [(ksk_private_key, ksk_dnskey), (zsk_private_key, zsk_dnskey)]
+
+ with zone.writer() as txn:
+ dns.dnssec.sign_zone(
+ zone=zone,
+ txn=txn,
+ keys=keys,
+ lifetime=lifetime,
+ )
+
+ rrsigs = set(
+ [
+ (str(name), rdataset.covers)
+ for (name, rdataset) in zone.iterate_rdatasets()
+ if rdataset.rdtype == dns.rdatatype.RRSIG
+ ]
+ )
+ self.assertEqual(rrsigs, test_zone_rrsigs)
+
+ signers = set(
+ [
+ (str(name), rdataset.covers, rdataset[0].key_tag)
+ for (name, rdataset) in zone.iterate_rdatasets()
+ if rdataset.rdtype == dns.rdatatype.RRSIG
+ ]
+ )
+ for name, covers, key_tag in signers:
+ if covers in [
+ dns.rdatatype.DNSKEY,
+ dns.rdatatype.CDNSKEY,
+ dns.rdatatype.CDS,
+ ]:
+ self.assertEqual(key_tag, dns.dnssec.key_id(ksk_dnskey))
+ else:
+ self.assertEqual(key_tag, dns.dnssec.key_id(zsk_dnskey))
+
+ def test_sign_zone_nsec_null_signer(self):
+ def rrset_signer(
+ txn: dns.transaction.Transaction,
+ rrset: dns.rrset.RRset,
+ ) -> None:
+ pass
+
+ zone1 = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+ dns.dnssec.sign_zone(zone1, rrset_signer=rrset_signer)
+
+ zone2 = dns.zone.from_text(test_zone_with_nsec, "example.", relativize=False)
+ self.assertEqual(zone1.to_text(), zone2.to_text())
+
class DNSSECMakeDSTestCase(unittest.TestCase):
def testMnemonicParser(self):