]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Zone signer (#911)
authorJakob Schlyter <jakob@kirei.se>
Tue, 21 Mar 2023 01:14:59 +0000 (02:14 +0100)
committerGitHub <noreply@github.com>
Tue, 21 Mar 2023 01:14:59 +0000 (18:14 -0700)
* first cut at NSEC support

* use transactions, fix delegations

* rename to add_nsec_to_zone

* optimize NSEC generation

* split out function to get all secure names (could be useful for NSEC3 later)

* add `Bitmap.from_rdtypes()` and add missing typing

* more typing

* add missing import

* add more typing

* fix tok type

* remove _get_secure_names, optimize

* better zone testing (compare as text)
add test example with delegation below other delegation

* include NSEC itself in the bitmap

* lint

* Add names iteration to transactions via iterate_names().

Also make rdataset iteration more obvious by adding an
explicit iterate_rdatasets() API.

* use iterate_names()

* typo

* black

* use single iteration

* better type fix

* add optional transaction to add_nsec_to_zone

* idea for zone signer

* do not sign RRSIGs

* fix signer

* correctly sign DS

* simplify

* simplify by passing rrset to signer

* fix typing

* nit

* add DS

* add more test

* rewrite zone signer

* compact

* simplify

* make easier to read

* bring back rrset_signer

* move default RRset signer

* more

* more

* prettier context handling (mypy issue pending)

* make NSEC zone signer less complex

* update

* fix txn, sign as defined by SEP

* docs

* add back missing dnskey_include

* rename dnskey_include to add_dnskey

* check KSK/ZSK key tags in signed zone

---------

Co-authored-by: Bob Halley <halley@dnspython.org>
dns/dnssec.py
tests/test_dnssec.py

index 5dc26223f2e683cd720608b7b299b4fbf7e5c133..74ab4c28b6a9f805859486c964527af0ebce032c 100644 (file)
 
 """Common DNSSEC-related functions and constants."""
 
-from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
 
+from typing import Any, cast, Callable, Dict, List, Optional, Set, Tuple, Union
+
+import contextlib
+import functools
 import hashlib
 import math
 import struct
@@ -36,10 +39,14 @@ import dns.rdata
 import dns.rdatatype
 import dns.rdataclass
 import dns.rrset
+import dns.transaction
+import dns.zone
 from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
 from dns.rdtypes.ANY.CDS import CDS
 from dns.rdtypes.ANY.DNSKEY import DNSKEY
 from dns.rdtypes.ANY.DS import DS
+from dns.rdtypes.ANY.NSEC import NSEC, Bitmap
+from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM
 from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
 from dns.rdtypes.dnskeybase import Flag
 
@@ -74,6 +81,8 @@ PrivateKey = Union[
     "ed448.Ed448PrivateKey",
 ]
 
+RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None]
+
 
 def algorithm_from_text(text: str) -> Algorithm:
     """Convert text into a DNSSEC algorithm value.
@@ -1216,6 +1225,222 @@ def dnskey_rdataset_to_cdnskey_rdataset(
     return dns.rdataset.from_rdata_list(rdataset.ttl, res)
 
 
+def default_rrset_signer(
+    txn: dns.transaction.Transaction,
+    rrset: dns.rrset.RRset,
+    signer: dns.name.Name,
+    ksks: List[Tuple[PrivateKey, DNSKEY]],
+    zsks: List[Tuple[PrivateKey, DNSKEY]],
+    inception: Optional[Union[datetime, str, int, float]] = None,
+    expiration: Optional[Union[datetime, str, int, float]] = None,
+    lifetime: Optional[int] = None,
+) -> None:
+    """Default RRset signer"""
+
+    if rrset.rdtype in set(
+        [
+            dns.rdatatype.RdataType.DNSKEY,
+            dns.rdatatype.RdataType.CDS,
+            dns.rdatatype.RdataType.CDNSKEY,
+        ]
+    ):
+        keys = ksks
+    else:
+        keys = zsks
+
+    for (private_key, dnskey) in keys:
+        rrsig = dns.dnssec.sign(
+            rrset=rrset,
+            private_key=private_key,
+            dnskey=dnskey,
+            inception=inception,
+            expiration=expiration,
+            lifetime=lifetime,
+            signer=signer,
+        )
+        txn.add(rrset.name, rrset.ttl, rrsig)
+
+
+def sign_zone(
+    zone: dns.zone.Zone,
+    txn: Optional[dns.transaction.Transaction] = None,
+    keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
+    add_dnskey: bool = True,
+    dnskey_ttl: Optional[int] = None,
+    inception: Optional[Union[datetime, str, int, float]] = None,
+    expiration: Optional[Union[datetime, str, int, float]] = None,
+    lifetime: Optional[int] = None,
+    nsec3: Optional[NSEC3PARAM] = None,
+    rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+    """Sign zone.
+
+    *zone*, a ``dns.zone.Zone``, the zone to sign.
+
+    *txn*, a ``dns.transaction.Transaction``, an optional transaction to use
+    for signing.
+
+    *keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing.
+    KSK/ZSK roles are assigned automatically if the SEP flag is used, otherwise
+    all RRsets are signed by all keys.
+
+    *add_dnskey*, a ``bool``.  If ``True``, the default, all specified
+    DNSKEYs are automatically added to the zone on signing.
+
+    *dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified
+    the TTL of the existing DNSKEY RRset used or the TTL of the SOA RRset.
+
+    *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the
+    signature inception time.  If ``None``, the current time is used.  If a ``str``, the
+    format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX
+    epoch in text form; this is the same the RRSIG rdata's text form.
+    Values of type `int` or `float` are interpreted as seconds since the UNIX epoch.
+
+    *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+    expiration time.  If ``None``, the expiration time will be the inception time plus
+    the value of the *lifetime* parameter.  See the description of *inception* above
+    for how the various parameter types are interpreted.
+
+    *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds.  This
+    parameter is only meaningful if *expiration* is ``None``.
+
+    *nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet implemented.
+
+    *rrset_signer*, a ``Callable``, an optional function for signing RRsets.
+    The function requires two arguments: transaction and RRset. If the not
+    specified, ``dns.dnssec.default_rrset_signer`` will be used.
+
+    Returns ``None``.
+    """
+
+    ksks = []
+    zsks = []
+
+    # if we have both KSKs and ZSKs, split by SEP flag. if not, sign all
+    # records with all keys
+    if keys:
+        for key in keys:
+            if key[1].flags & Flag.SEP:
+                ksks.append(key)
+            else:
+                zsks.append(key)
+        if not ksks:
+            ksks = keys
+        if not zsks:
+            zsks = keys
+    else:
+        keys = []
+
+    if txn:
+        cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn)
+    else:
+        cm = zone.writer()
+
+    with cm as _txn:
+        if add_dnskey:
+            if dnskey_ttl is None:
+                dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY)
+                if dnskey:
+                    dnskey_ttl = dnskey.ttl
+                else:
+                    soa = _txn.get(zone.origin, dns.rdatatype.SOA)
+                    dnskey_ttl = soa.ttl
+            for (_, dnskey) in keys:
+                _txn.add(zone.origin, dnskey_ttl, dnskey)
+
+        if nsec3:
+            raise NotImplementedError("Signing with NSEC3 not yet implemented")
+        else:
+            _rrset_signer = rrset_signer or functools.partial(
+                default_rrset_signer,
+                signer=zone.origin,
+                ksks=ksks,
+                zsks=zsks,
+                inception=inception,
+                expiration=expiration,
+                lifetime=lifetime,
+            )
+            return _sign_zone_nsec(zone, _txn, _rrset_signer)
+
+
+def _sign_zone_nsec(
+    zone: dns.zone.Zone,
+    txn: dns.transaction.Transaction,
+    rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+    """NSEC zone signer"""
+
+    def _txn_add_nsec(
+        txn: dns.transaction.Transaction,
+        name: dns.name.Name,
+        next_secure: Optional[dns.name.Name],
+        rdclass: dns.rdataclass.RdataClass,
+        ttl: int,
+        rrset_signer: Optional[RRsetSigner] = None,
+    ) -> None:
+        """NSEC zone signer helper"""
+        mandatory_types = set(
+            [dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC]
+        )
+        node = txn.get_node(name)
+        if node and next_secure:
+            types = (
+                set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types
+            )
+            windows = Bitmap.from_rdtypes(list(types))
+            rrset = dns.rrset.from_rdata(
+                name,
+                ttl,
+                NSEC(
+                    rdclass=rdclass,
+                    rdtype=dns.rdatatype.RdataType.NSEC,
+                    next=next_secure,
+                    windows=windows,
+                ),
+            )
+            txn.add(rrset)
+            if rrset_signer:
+                rrset_signer(txn, rrset)
+
+    rrsig_ttl = zone.get_soa().minimum
+    delegation = None
+    last_secure = None
+
+    for name in sorted(txn.iterate_names()):
+        if delegation and name.is_subdomain(delegation):
+            # names below delegations are not secure
+            continue
+        elif txn.get(name, dns.rdatatype.NS) and name != zone.origin:
+            # inside delegation
+            delegation = name
+        else:
+            # outside delegation
+            delegation = None
+
+        if rrset_signer:
+            node = txn.get_node(name)
+            if node:
+                for rdataset in node.rdatasets:
+                    if rdataset.rdtype == dns.rdatatype.RRSIG:
+                        # do not sign RRSIGs
+                        continue
+                    elif delegation and rdataset.rdtype != dns.rdatatype.DS:
+                        # do not sign delegations except DS records
+                        continue
+                    else:
+                        rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset)
+                        rrset_signer(txn, rrset)
+
+        if last_secure:
+            _txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer)
+        last_secure = name
+
+    if last_secure:
+        _txn_add_nsec(
+            txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer
+        )
+
+
 def _need_pyca(*args, **kwargs):
     raise ImportError(
         "DNSSEC validation requires " + "python cryptography"
index f52f98040587fd493e6250b7419c88bf677a85f0..3074741f4f394ed6bfa555f8b0a5bf7cb567537c 100644 (file)
@@ -18,6 +18,7 @@
 from datetime import datetime, timedelta, timezone
 from typing import Any
 
+import functools
 import unittest
 
 import dns.dnssec
@@ -30,6 +31,9 @@ import dns.rdtypes.ANY.CDS
 import dns.rdtypes.ANY.DNSKEY
 import dns.rdtypes.ANY.DS
 import dns.rrset
+import dns.zone
+
+from dns.rdtypes.dnskeybase import Flag
 
 from .keys import test_dnskeys
 
@@ -580,6 +584,58 @@ fake_gost_ns_rrsig = dns.rrset.from_text(
     " SXTV9hCLVFWU4PS+/fxxfOHCetsY5tWWSxZi zSHfgpGfsHWzQoAamag4XYDyykc=",
 )
 
+test_zone_sans_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+ns1.example. 3600 IN A 10.0.0.1
+ns2.example. 3600 IN A 10.0.0.2
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
+test_zone_rrsigs = set(
+    [
+        ("example.", dns.rdatatype.DNSKEY),
+        ("example.", dns.rdatatype.NS),
+        ("example.", dns.rdatatype.NSEC),
+        ("example.", dns.rdatatype.SOA),
+        ("bar.foo.example.", dns.rdatatype.MX),
+        ("bar.foo.example.", dns.rdatatype.NSEC),
+        ("ns1.example.", dns.rdatatype.A),
+        ("ns1.example.", dns.rdatatype.NSEC),
+        ("ns2.example.", dns.rdatatype.A),
+        ("ns2.example.", dns.rdatatype.NSEC),
+        ("sub.example.", dns.rdatatype.DS),
+        ("sub.example.", dns.rdatatype.NSEC),
+    ]
+)
+
+test_zone_with_nsec = """
+example. 3600 IN SOA foo.example. bar.example. 1 2 3 4 5
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+example. 5 IN NSEC bar.foo.example. NS NSEC SOA RRSIG
+bar.foo.example. 3600 IN MX 0 blaz.foo.example.
+bar.foo.example. 5 IN NSEC ns1.example. MX NSEC RRSIG
+ns1.example. 3600 IN A 10.0.0.1
+ns1.example. 5 IN NSEC ns2.example. A NSEC RRSIG
+ns2.example. 3600 IN A 10.0.0.2
+ns2.example. 5 IN NSEC sub.example. A NSEC RRSIG
+sub.example. 3600 IN NS ns1.example.
+sub.example. 3600 IN NS ns2.example.
+sub.example. 3600 IN NS ns3.sub.example.
+sub.example. 3600 IN DS 12345 13 2 0100D208742A23024DF3C8827DFF3EB3E25126E9B72850E99D6055E18913CB2F
+sub.example. 5 IN NSEC example. DS NS NSEC RRSIG
+sub.sub.example. 3600 IN NS ns3.sub.example.
+ns3.sub.example. 3600 IN A 10.0.0.3
+"""
+
 
 @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
 class DNSSECValidatorTestCase(unittest.TestCase):
@@ -880,6 +936,75 @@ class DNSSECMiscTestCase(unittest.TestCase):
         ts = dns.dnssec.to_timestamp(441812220)
         self.assertEqual(ts, REFERENCE_TIMESTAMP)
 
+    def test_sign_zone(self):
+        zone = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+
+        algorithm = dns.dnssec.Algorithm.ED25519
+        lifetime = 3600
+
+        ksk_private_key = ed25519.Ed25519PrivateKey.generate()
+        ksk_dnskey = dns.dnssec.make_dnskey(
+            public_key=ksk_private_key.public_key(),
+            algorithm=algorithm,
+            flags=Flag.ZONE | Flag.SEP,
+        )
+
+        zsk_private_key = ed25519.Ed25519PrivateKey.generate()
+        zsk_dnskey = dns.dnssec.make_dnskey(
+            public_key=zsk_private_key.public_key(),
+            algorithm=algorithm,
+            flags=Flag.ZONE,
+        )
+
+        keys = [(ksk_private_key, ksk_dnskey), (zsk_private_key, zsk_dnskey)]
+
+        with zone.writer() as txn:
+            dns.dnssec.sign_zone(
+                zone=zone,
+                txn=txn,
+                keys=keys,
+                lifetime=lifetime,
+            )
+
+        rrsigs = set(
+            [
+                (str(name), rdataset.covers)
+                for (name, rdataset) in zone.iterate_rdatasets()
+                if rdataset.rdtype == dns.rdatatype.RRSIG
+            ]
+        )
+        self.assertEqual(rrsigs, test_zone_rrsigs)
+
+        signers = set(
+            [
+                (str(name), rdataset.covers, rdataset[0].key_tag)
+                for (name, rdataset) in zone.iterate_rdatasets()
+                if rdataset.rdtype == dns.rdatatype.RRSIG
+            ]
+        )
+        for name, covers, key_tag in signers:
+            if covers in [
+                dns.rdatatype.DNSKEY,
+                dns.rdatatype.CDNSKEY,
+                dns.rdatatype.CDS,
+            ]:
+                self.assertEqual(key_tag, dns.dnssec.key_id(ksk_dnskey))
+            else:
+                self.assertEqual(key_tag, dns.dnssec.key_id(zsk_dnskey))
+
+    def test_sign_zone_nsec_null_signer(self):
+        def rrset_signer(
+            txn: dns.transaction.Transaction,
+            rrset: dns.rrset.RRset,
+        ) -> None:
+            pass
+
+        zone1 = dns.zone.from_text(test_zone_sans_nsec, "example.", relativize=False)
+        dns.dnssec.sign_zone(zone1, rrset_signer=rrset_signer)
+
+        zone2 = dns.zone.from_text(test_zone_with_nsec, "example.", relativize=False)
+        self.assertEqual(zone1.to_text(), zone2.to_text())
+
 
 class DNSSECMakeDSTestCase(unittest.TestCase):
     def testMnemonicParser(self):