]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
DNSSEC policy. (#869)
authorBob Halley <halley@dnspython.org>
Thu, 15 Dec 2022 14:22:27 +0000 (06:22 -0800)
committerGitHub <noreply@github.com>
Thu, 15 Dec 2022 14:22:27 +0000 (06:22 -0800)
dns/dnssec.py
dns/dnssectypes.py
tests/test_dnssec.py

index 11f0701e08b726e4e5ef661e2c67e3ac47761267..4cfb75e3b62933b98db70106b2e8ee8c254acbb3 100644 (file)
@@ -24,7 +24,7 @@ import math
 import struct
 import time
 import base64
-from datetime import datetime, timezone
+from datetime import datetime
 
 from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
 
@@ -54,6 +54,10 @@ class ValidationFailure(dns.exception.DNSException):
     """The DNSSEC signature is invalid."""
 
 
+class DeniedByPolicy(dns.exception.DNSException):
+    """Denied by DNSSEC policy."""
+
+
 PublicKey = Union[
     "rsa.RSAPublicKey",
     "ec.EllipticCurvePublicKey",
@@ -126,11 +130,64 @@ def key_id(key: DNSKEY) -> int:
         return total & 0xFFFF
 
 
+class Policy:
+    def __init__(self):
+        pass
+
+    def ok_to_sign(self, _: DNSKEY) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_validate(self, _: DNSKEY) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_create_ds(self, _: DSDigest) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_validate_ds(self, _: DSDigest) -> bool:  # pragma: no cover
+        return False
+
+
+class SimpleDeny(Policy):
+    def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds):
+        super().__init__()
+        self._deny_sign = deny_sign
+        self._deny_validate = deny_validate
+        self._deny_create_ds = deny_create_ds
+        self._deny_validate_ds = deny_validate_ds
+
+    def ok_to_sign(self, key: DNSKEY) -> bool:
+        return key.algorithm not in self._deny_sign
+
+    def ok_to_validate(self, key: DNSKEY) -> bool:
+        return key.algorithm not in self._deny_validate
+
+    def ok_to_create_ds(self, algorithm: DSDigest) -> bool:
+        return algorithm not in self._deny_create_ds
+
+    def ok_to_validate_ds(self, algorithm: DSDigest) -> bool:
+        return algorithm not in self._deny_validate_ds
+
+
+rfc_8624_policy = SimpleDeny(
+    {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST},
+    {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1},
+    {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST},
+    {DSDigest.NULL},
+)
+
+allow_all_policy = SimpleDeny(set(), set(), set(), set())
+
+
+default_policy = rfc_8624_policy
+
+
 def make_ds(
     name: Union[dns.name.Name, str],
     key: dns.rdata.Rdata,
     algorithm: Union[DSDigest, str],
     origin: Optional[dns.name.Name] = None,
+    policy: Optional[Policy] = None,
+    validating: bool = False,
 ) -> DS:
     """Create a DS record for a DNSSEC key.
 
@@ -145,16 +202,34 @@ def make_ds(
     *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
     then it will be made absolute using the specified origin.
 
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    *validating*, a ``bool``.  If ``True``, then policy is checked in
+    validating mode, i.e. "Is it ok to validate using this digest algorithm?".
+    Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with
+    this digest algorithm?".
+
     Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
 
+    Raises ``DeniedByPolicy`` if the algorithm is denied by policy.
+
     Returns a ``dns.rdtypes.ANY.DS.DS``
     """
 
+    if policy is None:
+        policy = default_policy
     try:
         if isinstance(algorithm, str):
             algorithm = DSDigest[algorithm.upper()]
     except Exception:
         raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+    if validating:
+        check = policy.ok_to_validate_ds
+    else:
+        check = policy.ok_to_create_ds
+    if not check(algorithm):
+        raise DeniedByPolicy
     if not isinstance(key, DNSKEY):
         raise ValueError("key is not a DNSKEY")
     if algorithm == DSDigest.SHA1:
@@ -388,6 +463,7 @@ def _validate_rrsig(
     keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
     origin: Optional[dns.name.Name] = None,
     now: Optional[float] = None,
+    policy: Optional[Policy] = None,
 ) -> None:
     """Validate an RRset against a single signature rdata, throwing an
     exception if validation is not successful.
@@ -410,6 +486,9 @@ def _validate_rrsig(
     use as the current time when validating.  If ``None``, the actual current
     time is used.
 
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
     Raises ``ValidationFailure`` if the signature is expired, not yet valid,
     the public key is invalid, the algorithm is unknown, the verification
     fails, etc.
@@ -418,6 +497,9 @@ def _validate_rrsig(
     dnspython but not implemented.
     """
 
+    if policy is None:
+        policy = default_policy
+
     candidate_keys = _find_candidate_keys(keys, rrsig)
     if candidate_keys is None:
         raise ValidationFailure("unknown key")
@@ -448,6 +530,8 @@ def _validate_rrsig(
     chosen_hash = _make_hash(rrsig.algorithm)
 
     for candidate_key in candidate_keys:
+        if not policy.ok_to_validate(candidate_key):
+            continue
         try:
             _validate_signature(sig, data, candidate_key, chosen_hash)
             return
@@ -464,6 +548,7 @@ def _validate(
     keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
     origin: Optional[dns.name.Name] = None,
     now: Optional[float] = None,
+    policy: Optional[Policy] = None,
 ) -> None:
     """Validate an RRset against a signature RRset, throwing an exception
     if none of the signatures validate.
@@ -488,11 +573,17 @@ def _validate(
     use as the current time when validating.  If ``None``, the actual current
     time is used.
 
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
     Raises ``ValidationFailure`` if the signature is expired, not yet valid,
     the public key is invalid, the algorithm is unknown, the verification
     fails, etc.
     """
 
+    if policy is None:
+        policy = default_policy
+
     if isinstance(origin, str):
         origin = dns.name.from_text(origin, dns.name.root)
 
@@ -517,7 +608,7 @@ def _validate(
         if not isinstance(rrsig, RRSIG):
             raise ValidationFailure("expected an RRSIG")
         try:
-            _validate_rrsig(rrset, rrsig, keys, origin, now)
+            _validate_rrsig(rrset, rrsig, keys, origin, now, policy)
             return
         except (ValidationFailure, UnsupportedAlgorithm):
             pass
@@ -533,6 +624,7 @@ def _sign(
     expiration: Optional[Union[datetime, str, int, float]] = None,
     lifetime: Optional[int] = None,
     verify: bool = False,
+    policy: Optional[Policy] = None,
 ) -> RRSIG:
     """Sign RRset using private key.
 
@@ -564,8 +656,18 @@ def _sign(
 
     *verify*, a ``bool``.  If set to ``True``, the signer will verify signatures
     after they are created; the default is ``False``.
+
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    Raises ``DeniedByPolicy`` if the signature is denied by policy.
     """
 
+    if policy is None:
+        policy = default_policy
+    if not policy.ok_to_sign(dnskey):
+        raise DeniedByPolicy
+
     if isinstance(rrset, tuple):
         rdclass = rrset[1].rdclass
         rdtype = rrset[1].rdtype
index 2a747168c46277a38e28b5065767e8dfa8902dba..02131e0adaeb85eb49351f4953c854023315fab9 100644 (file)
@@ -50,8 +50,10 @@ class Algorithm(dns.enum.IntEnum):
 class DSDigest(dns.enum.IntEnum):
     """DNSSEC Delegation Signer Digest Algorithm"""
 
+    NULL = 0
     SHA1 = 1
     SHA256 = 2
+    GOST = 3
     SHA384 = 4
 
     @classmethod
index 9aed87975edaae363ecfe807a441c5fac865e2bd..4a25cd2a0904ee1b22e5670dce97ef517c6ee753 100644 (file)
@@ -576,7 +576,20 @@ fake_gost_ns_rrsig = dns.rrset.from_text(
 @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
 class DNSSECValidatorTestCase(unittest.TestCase):
     def testAbsoluteRSAMD5Good(self):  # type: () -> None
-        dns.dnssec.validate(rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when)
+        dns.dnssec.validate(
+            rsamd5_ns,
+            rsamd5_ns_rrsig,
+            rsamd5_keys,
+            None,
+            rsamd5_when,
+            policy=dns.dnssec.allow_all_policy,
+        )
+
+    def testAbsoluteRSAMD5GoodDeniedByPolicy(self):  # type: () -> None
+        with self.assertRaises(dns.dnssec.ValidationFailure):
+            dns.dnssec.validate(
+                rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when
+            )
 
     def testRSAMD5Keyid(self):
         self.assertEqual(dns.dnssec.key_id(rsamd5_keys[abs_example][0]), 30239)
@@ -610,12 +623,30 @@ class DNSSECValidatorTestCase(unittest.TestCase):
         self.assertRaises(dns.dnssec.ValidationFailure, bad)
 
     def testAbsoluteDSAGood(self):  # type: () -> None
-        dns.dnssec.validate(abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2)
+        dns.dnssec.validate(
+            abs_dsa_soa,
+            abs_dsa_soa_rrsig,
+            abs_dsa_keys,
+            None,
+            when2,
+            policy=dns.dnssec.allow_all_policy,
+        )
+
+    def testAbsoluteDSAGoodDeniedByPolicy(self):  # type: () -> None
+        with self.assertRaises(dns.dnssec.ValidationFailure):
+            dns.dnssec.validate(
+                abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+            )
 
     def testAbsoluteDSABad(self):  # type: () -> None
         def bad():  # type: () -> None
             dns.dnssec.validate(
-                abs_other_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+                abs_other_dsa_soa,
+                abs_dsa_soa_rrsig,
+                abs_dsa_keys,
+                None,
+                when2,
+                policy=dns.dnssec.allow_all_policy,
             )
 
         self.assertRaises(dns.dnssec.ValidationFailure, bad)
@@ -855,9 +886,39 @@ class DNSSECMakeDSTestCase(unittest.TestCase):
     def testMakeExampleSHA1DS(self):  # type: () -> None
         algorithm: Any
         for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
-            ds = dns.dnssec.make_ds(abs_example, example_sep_key, algorithm)
+            ds = dns.dnssec.make_ds(
+                abs_example,
+                example_sep_key,
+                algorithm,
+                policy=dns.dnssec.allow_all_policy,
+            )
+            self.assertEqual(ds, example_ds_sha1)
+            ds = dns.dnssec.make_ds(
+                "example.",
+                example_sep_key,
+                algorithm,
+                policy=dns.dnssec.allow_all_policy,
+            )
+            self.assertEqual(ds, example_ds_sha1)
+
+    def testMakeExampleSHA1DSValidationOkByPolicy(self):  # type: () -> None
+        algorithm: Any
+        for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
+            ds = dns.dnssec.make_ds(
+                abs_example,
+                example_sep_key,
+                algorithm,
+                policy=dns.dnssec.allow_all_policy,
+            )
+            self.assertEqual(ds, example_ds_sha1)
+            ds = dns.dnssec.make_ds(
+                "example.", example_sep_key, algorithm, validating=True
+            )
             self.assertEqual(ds, example_ds_sha1)
-            ds = dns.dnssec.make_ds("example.", example_sep_key, algorithm)
+
+    def testMakeExampleSHA1DSDeniedByPolicy(self):  # type: () -> None
+        with self.assertRaises(dns.dnssec.DeniedByPolicy):
+            ds = dns.dnssec.make_ds(abs_example, example_sep_key, "SHA1")
             self.assertEqual(ds, example_ds_sha1)
 
     def testMakeExampleSHA256DS(self):  # type: () -> None
@@ -974,24 +1035,27 @@ class DNSSECMakeDNSKEYTestCase(unittest.TestCase):
         with self.assertRaises(ValueError):
             dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA)
 
-    def testRSALargeExponent(self):  # type: () -> None
-        for key_size, public_exponent, dnskey_key_length in [
-            (1024, 3, 130),
-            (1024, 65537, 132),
-            (2048, 3, 258),
-            (2048, 65537, 260),
-            (4096, 3, 514),
-            (4096, 65537, 516),
-        ]:
-            key = rsa.generate_private_key(
-                public_exponent=public_exponent,
-                key_size=key_size,
-                backend=default_backend(),
-            )
-            dnskey = dns.dnssec.make_dnskey(
-                key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
-            )
-            self.assertEqual(len(dnskey.key), dnskey_key_length)
+    # XXXRTH This test is fine but is noticably slow, so I have commented it out for
+    # now
+
+    # def testRSALargeExponent(self):  # type: () -> None
+    #     for key_size, public_exponent, dnskey_key_length in [
+    #         (1024, 3, 130),
+    #         (1024, 65537, 132),
+    #         (2048, 3, 258),
+    #         (2048, 65537, 260),
+    #         (4096, 3, 514),
+    #         (4096, 65537, 516),
+    #     ]:
+    #         key = rsa.generate_private_key(
+    #             public_exponent=public_exponent,
+    #             key_size=key_size,
+    #             backend=default_backend(),
+    #         )
+    #         dnskey = dns.dnssec.make_dnskey(
+    #             key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
+    #         )
+    #         self.assertEqual(len(dnskey.key), dnskey_key_length)
 
 
 @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
@@ -1014,7 +1078,9 @@ class DNSSECSignatureTestCase(unittest.TestCase):
 
     def testSignatureDSA(self):  # type: () -> None
         key = dsa.generate_private_key(key_size=1024)
-        self._test_signature(key, dns.dnssec.Algorithm.DSA, abs_soa)
+        self._test_signature(
+            key, dns.dnssec.Algorithm.DSA, abs_soa, policy=dns.dnssec.allow_all_policy
+        )
 
     def testSignatureECDSAP256SHA256(self):  # type: () -> None
         key = ec.generate_private_key(curve=ec.SECP256R1, backend=default_backend())
@@ -1039,7 +1105,7 @@ class DNSSECSignatureTestCase(unittest.TestCase):
         rrset = (name, rdataset)
         self._test_signature(key, dns.dnssec.Algorithm.ED448, rrset)
 
-    def _test_signature(self, key, algorithm, rrset, signer=None):  # type: () -> None
+    def _test_signature(self, key, algorithm, rrset, signer=None, policy=None):
         ttl = 60
         lifetime = 3600
         if isinstance(rrset, tuple):
@@ -1058,10 +1124,11 @@ class DNSSECSignatureTestCase(unittest.TestCase):
             lifetime=lifetime,
             signer=signer,
             verify=True,
+            policy=policy,
         )
         keys = {signer: dnskey_rrset}
         rrsigset = dns.rrset.from_rdata(rrname, ttl, rrsig)
-        dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys)
+        dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys, policy=policy)
 
 
 if __name__ == "__main__":