import struct
import time
import base64
-from datetime import datetime, timezone
+from datetime import datetime
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
"""The DNSSEC signature is invalid."""
+class DeniedByPolicy(dns.exception.DNSException):
+ """Denied by DNSSEC policy."""
+
+
PublicKey = Union[
"rsa.RSAPublicKey",
"ec.EllipticCurvePublicKey",
return total & 0xFFFF
+class Policy:
+ def __init__(self):
+ pass
+
+ def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+ def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover
+ return False
+
+
+class SimpleDeny(Policy):
+ def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds):
+ super().__init__()
+ self._deny_sign = deny_sign
+ self._deny_validate = deny_validate
+ self._deny_create_ds = deny_create_ds
+ self._deny_validate_ds = deny_validate_ds
+
+ def ok_to_sign(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_sign
+
+ def ok_to_validate(self, key: DNSKEY) -> bool:
+ return key.algorithm not in self._deny_validate
+
+ def ok_to_create_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_create_ds
+
+ def ok_to_validate_ds(self, algorithm: DSDigest) -> bool:
+ return algorithm not in self._deny_validate_ds
+
+
+rfc_8624_policy = SimpleDeny(
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST},
+ {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1},
+ {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST},
+ {DSDigest.NULL},
+)
+
+allow_all_policy = SimpleDeny(set(), set(), set(), set())
+
+
+default_policy = rfc_8624_policy
+
+
def make_ds(
name: Union[dns.name.Name, str],
key: dns.rdata.Rdata,
algorithm: Union[DSDigest, str],
origin: Optional[dns.name.Name] = None,
+ policy: Optional[Policy] = None,
+ validating: bool = False,
) -> DS:
"""Create a DS record for a DNSSEC key.
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ *validating*, a ``bool``. If ``True``, then policy is checked in
+ validating mode, i.e. "Is it ok to validate using this digest algorithm?".
+ Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with
+ this digest algorithm?".
+
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+ Raises ``DeniedByPolicy`` if the algorithm is denied by policy.
+
Returns a ``dns.rdtypes.ANY.DS.DS``
"""
+ if policy is None:
+ policy = default_policy
try:
if isinstance(algorithm, str):
algorithm = DSDigest[algorithm.upper()]
except Exception:
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
+ if validating:
+ check = policy.ok_to_validate_ds
+ else:
+ check = policy.ok_to_create_ds
+ if not check(algorithm):
+ raise DeniedByPolicy
if not isinstance(key, DNSKEY):
raise ValueError("key is not a DNSKEY")
if algorithm == DSDigest.SHA1:
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
dnspython but not implemented.
"""
+ if policy is None:
+ policy = default_policy
+
candidate_keys = _find_candidate_keys(keys, rrsig)
if candidate_keys is None:
raise ValidationFailure("unknown key")
chosen_hash = _make_hash(rrsig.algorithm)
for candidate_key in candidate_keys:
+ if not policy.ok_to_validate(candidate_key):
+ continue
try:
_validate_signature(sig, data, candidate_key, chosen_hash)
return
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
origin: Optional[dns.name.Name] = None,
now: Optional[float] = None,
+ policy: Optional[Policy] = None,
) -> None:
"""Validate an RRset against a signature RRset, throwing an exception
if none of the signatures validate.
use as the current time when validating. If ``None``, the actual current
time is used.
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
Raises ``ValidationFailure`` if the signature is expired, not yet valid,
the public key is invalid, the algorithm is unknown, the verification
fails, etc.
"""
+ if policy is None:
+ policy = default_policy
+
if isinstance(origin, str):
origin = dns.name.from_text(origin, dns.name.root)
if not isinstance(rrsig, RRSIG):
raise ValidationFailure("expected an RRSIG")
try:
- _validate_rrsig(rrset, rrsig, keys, origin, now)
+ _validate_rrsig(rrset, rrsig, keys, origin, now, policy)
return
except (ValidationFailure, UnsupportedAlgorithm):
pass
expiration: Optional[Union[datetime, str, int, float]] = None,
lifetime: Optional[int] = None,
verify: bool = False,
+ policy: Optional[Policy] = None,
) -> RRSIG:
"""Sign RRset using private key.
*verify*, a ``bool``. If set to ``True``, the signer will verify signatures
after they are created; the default is ``False``.
+
+ *policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
+ ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+ Raises ``DeniedByPolicy`` if the signature is denied by policy.
"""
+ if policy is None:
+ policy = default_policy
+ if not policy.ok_to_sign(dnskey):
+ raise DeniedByPolicy
+
if isinstance(rrset, tuple):
rdclass = rrset[1].rdclass
rdtype = rrset[1].rdtype
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
class DNSSECValidatorTestCase(unittest.TestCase):
def testAbsoluteRSAMD5Good(self): # type: () -> None
- dns.dnssec.validate(rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when)
+ dns.dnssec.validate(
+ rsamd5_ns,
+ rsamd5_ns_rrsig,
+ rsamd5_keys,
+ None,
+ rsamd5_when,
+ policy=dns.dnssec.allow_all_policy,
+ )
+
+ def testAbsoluteRSAMD5GoodDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.ValidationFailure):
+ dns.dnssec.validate(
+ rsamd5_ns, rsamd5_ns_rrsig, rsamd5_keys, None, rsamd5_when
+ )
def testRSAMD5Keyid(self):
self.assertEqual(dns.dnssec.key_id(rsamd5_keys[abs_example][0]), 30239)
self.assertRaises(dns.dnssec.ValidationFailure, bad)
def testAbsoluteDSAGood(self): # type: () -> None
- dns.dnssec.validate(abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2)
+ dns.dnssec.validate(
+ abs_dsa_soa,
+ abs_dsa_soa_rrsig,
+ abs_dsa_keys,
+ None,
+ when2,
+ policy=dns.dnssec.allow_all_policy,
+ )
+
+ def testAbsoluteDSAGoodDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.ValidationFailure):
+ dns.dnssec.validate(
+ abs_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+ )
def testAbsoluteDSABad(self): # type: () -> None
def bad(): # type: () -> None
dns.dnssec.validate(
- abs_other_dsa_soa, abs_dsa_soa_rrsig, abs_dsa_keys, None, when2
+ abs_other_dsa_soa,
+ abs_dsa_soa_rrsig,
+ abs_dsa_keys,
+ None,
+ when2,
+ policy=dns.dnssec.allow_all_policy,
)
self.assertRaises(dns.dnssec.ValidationFailure, bad)
def testMakeExampleSHA1DS(self): # type: () -> None
algorithm: Any
for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
- ds = dns.dnssec.make_ds(abs_example, example_sep_key, algorithm)
+ ds = dns.dnssec.make_ds(
+ abs_example,
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+ ds = dns.dnssec.make_ds(
+ "example.",
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+
+ def testMakeExampleSHA1DSValidationOkByPolicy(self): # type: () -> None
+ algorithm: Any
+ for algorithm in ("SHA1", "sha1", dns.dnssec.DSDigest.SHA1):
+ ds = dns.dnssec.make_ds(
+ abs_example,
+ example_sep_key,
+ algorithm,
+ policy=dns.dnssec.allow_all_policy,
+ )
+ self.assertEqual(ds, example_ds_sha1)
+ ds = dns.dnssec.make_ds(
+ "example.", example_sep_key, algorithm, validating=True
+ )
self.assertEqual(ds, example_ds_sha1)
- ds = dns.dnssec.make_ds("example.", example_sep_key, algorithm)
+
+ def testMakeExampleSHA1DSDeniedByPolicy(self): # type: () -> None
+ with self.assertRaises(dns.dnssec.DeniedByPolicy):
+ ds = dns.dnssec.make_ds(abs_example, example_sep_key, "SHA1")
self.assertEqual(ds, example_ds_sha1)
def testMakeExampleSHA256DS(self): # type: () -> None
with self.assertRaises(ValueError):
dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA)
- def testRSALargeExponent(self): # type: () -> None
- for key_size, public_exponent, dnskey_key_length in [
- (1024, 3, 130),
- (1024, 65537, 132),
- (2048, 3, 258),
- (2048, 65537, 260),
- (4096, 3, 514),
- (4096, 65537, 516),
- ]:
- key = rsa.generate_private_key(
- public_exponent=public_exponent,
- key_size=key_size,
- backend=default_backend(),
- )
- dnskey = dns.dnssec.make_dnskey(
- key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
- )
- self.assertEqual(len(dnskey.key), dnskey_key_length)
+ # XXXRTH This test is fine but is noticably slow, so I have commented it out for
+ # now
+
+ # def testRSALargeExponent(self): # type: () -> None
+ # for key_size, public_exponent, dnskey_key_length in [
+ # (1024, 3, 130),
+ # (1024, 65537, 132),
+ # (2048, 3, 258),
+ # (2048, 65537, 260),
+ # (4096, 3, 514),
+ # (4096, 65537, 516),
+ # ]:
+ # key = rsa.generate_private_key(
+ # public_exponent=public_exponent,
+ # key_size=key_size,
+ # backend=default_backend(),
+ # )
+ # dnskey = dns.dnssec.make_dnskey(
+ # key.public_key(), algorithm=dns.dnssec.Algorithm.RSASHA256
+ # )
+ # self.assertEqual(len(dnskey.key), dnskey_key_length)
@unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported")
def testSignatureDSA(self): # type: () -> None
key = dsa.generate_private_key(key_size=1024)
- self._test_signature(key, dns.dnssec.Algorithm.DSA, abs_soa)
+ self._test_signature(
+ key, dns.dnssec.Algorithm.DSA, abs_soa, policy=dns.dnssec.allow_all_policy
+ )
def testSignatureECDSAP256SHA256(self): # type: () -> None
key = ec.generate_private_key(curve=ec.SECP256R1, backend=default_backend())
rrset = (name, rdataset)
self._test_signature(key, dns.dnssec.Algorithm.ED448, rrset)
- def _test_signature(self, key, algorithm, rrset, signer=None): # type: () -> None
+ def _test_signature(self, key, algorithm, rrset, signer=None, policy=None):
ttl = 60
lifetime = 3600
if isinstance(rrset, tuple):
lifetime=lifetime,
signer=signer,
verify=True,
+ policy=policy,
)
keys = {signer: dnskey_rrset}
rrsigset = dns.rrset.from_rdata(rrname, ttl, rrsig)
- dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys)
+ dns.dnssec.validate(rrset=rrset, rrsigset=rrsigset, keys=keys, policy=policy)
if __name__ == "__main__":