def _is_sha512(algorithm):
return algorithm == RSASHA512
-class _IdentityHasher:
- def __init__(self):
- self.value = b''
- def update(self, s):
- self.value += s
def _make_hash(algorithm):
if _is_md5(algorithm):
- return MD5.new()
+ return hashes.MD5()
if _is_sha1(algorithm):
- return SHA1.new()
+ return hashes.SHA1()
if _is_sha256(algorithm):
- return SHA256.new()
+ return hashes.SHA256()
if _is_sha384(algorithm):
- return SHA384.new()
+ return hashes.SHA384()
if _is_sha512(algorithm):
- return SHA512.new()
- if _is_eddsa(algorithm):
- return _IdentityHasher()
+ return hashes.SHA512()
+ if algorithm == ED25519:
+ return hashes.SHA512()
+ if algorithm == ED448:
+ return hashes.SHAKE256(114)
raise ValidationFailure('unknown hash for algorithm %u' % algorithm)
+def _bytes_to_long(b):
+ return int.from_bytes(b, 'big')
+
+
def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
"""Validate an RRset against a single signature rdata
rsa_e = keyptr[0:bytes_]
rsa_n = keyptr[bytes_:]
try:
- pubkey = CryptoRSA.construct(
- (number.bytes_to_long(rsa_n),
- number.bytes_to_long(rsa_e)))
+ public_key = rsa.RSAPublicNumbers(
+ _bytes_to_long(rsa_e),
+ _bytes_to_long(rsa_n)).public_key(default_backend())
except ValueError:
raise ValidationFailure('invalid public key')
sig = rrsig.signature
dsa_g = keyptr[0:octets]
keyptr = keyptr[octets:]
dsa_y = keyptr[0:octets]
- pubkey = CryptoDSA.construct(
- (number.bytes_to_long(dsa_y),
- number.bytes_to_long(dsa_g),
- number.bytes_to_long(dsa_p),
- number.bytes_to_long(dsa_q)))
- sig = rrsig.signature[1:]
+ try:
+ public_key = dsa.DSAPublicNumbers(
+ _bytes_to_long(dsa_y),
+ dsa.DSAParameterNumbers(
+ _bytes_to_long(dsa_p),
+ _bytes_to_long(dsa_q),
+ _bytes_to_long(dsa_g))).public_key(default_backend())
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ sig_r = rrsig.signature[1:21]
+ sig_s = rrsig.signature[21:]
+ sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+ _bytes_to_long(sig_s))
elif _is_ecdsa(rrsig.algorithm):
keyptr = candidate_key.key
if rrsig.algorithm == ECDSAP256SHA256:
- curve = 'secp256r1'
+ curve = ec.SECP256R1()
octets = 32
else:
- curve = 'secp384r1'
+ curve = ec.SECP384R1()
octets = 48
ecdsa_x = keyptr[0:octets]
ecdsa_y = keyptr[octets:octets * 2]
- pubkey = CryptoECC.construct(
- curve=curve,
- point_x=number.bytes_to_long(ecdsa_x),
- point_y=number.bytes_to_long(ecdsa_y))
- sig = rrsig.signature
+ try:
+ public_key = ec.EllipticCurvePublicNumbers(
+ curve=curve,
+ x=_bytes_to_long(ecdsa_x),
+ y=_bytes_to_long(ecdsa_y)).public_key(default_backend())
+ except ValueError:
+ raise ValidationFailure('invalid public key')
+ sig_r = rrsig.signature[0:octets]
+ sig_s = rrsig.signature[octets:]
+ sig = utils.encode_dss_signature(_bytes_to_long(sig_r),
+ _bytes_to_long(sig_s))
elif _is_eddsa(rrsig.algorithm):
keyptr = candidate_key.key
- if not (_have_ecpy and sys.version_info >= (3, 6)):
+ if not (_have_pyca):
#pylint: disable=line-too-long
- raise ImportError('DNSSEC validation for algorithm %u requires ecpy library and Python 3.6 or newer' % rrsig.algorithm)
+ raise ImportError('DNSSEC validation for algorithm %u requires python cryptography library' % rrsig.algorithm)
if rrsig.algorithm == ED25519:
- curve = 'Ed25519'
+ loader = ed25519.Ed25519PublicKey
else:
- curve = 'Ed448'
- point = Curve.get_curve(curve).decode_point(keyptr)
- pubkey = ECPublicKey(point)
+ loader = ed448.Ed448PublicKey
+ try:
+ public_key = loader.from_public_bytes(keyptr)
+ except ValueError:
+ raise ValidationFailure('invalid public key')
sig = rrsig.signature
elif _is_gost(rrsig.algorithm):
raise UnsupportedAlgorithm(
else:
raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
- hash = _make_hash(rrsig.algorithm)
- hash.update(_to_rdata(rrsig, origin)[:18])
- hash.update(rrsig.signer.to_digestable(origin))
+ data = b''
+ data += _to_rdata(rrsig, origin)[:18]
+ data += rrsig.signer.to_digestable(origin)
if rrsig.labels < len(rrname) - 1:
suffix = rrname.split(rrsig.labels + 1)[1]
rrsig.original_ttl)
rrlist = sorted(rdataset)
for rr in rrlist:
- hash.update(rrnamebuf)
- hash.update(rrfixed)
+ data += rrnamebuf
+ data += rrfixed
rrdata = rr.to_digestable(origin)
rrlen = struct.pack('!H', len(rrdata))
- hash.update(rrlen)
- hash.update(rrdata)
+ data += rrlen
+ data += rrdata
+ chosen_hash = _make_hash(rrsig.algorithm)
try:
if _is_rsa(rrsig.algorithm):
- verifier = pkcs1_15.new(pubkey)
- # will raise ValueError if verify fails:
- verifier.verify(hash, sig)
- elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
- verifier = DSS.new(pubkey, 'fips-186-3')
- verifier.verify(hash, sig)
+ public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
+ elif _is_dsa(rrsig.algorithm):
+ public_key.verify(sig, data, chosen_hash)
+ elif _is_ecdsa(rrsig.algorithm):
+ public_key.verify(sig, data, ec.ECDSA(chosen_hash))
elif _is_eddsa(rrsig.algorithm):
- if rrsig.algorithm == ED25519:
- verifier = EDDSA(hashlib.sha512)
- else:
- verifier = EDDSA(hashlib.shake_256, 114)
- if not verifier.verify(hash.value, sig, pubkey):
- raise ValueError
+ public_key.verify(sig, data)
else:
# Raise here for code clarity; this won't actually ever happen
# since if the algorithm is really unknown we'd already have
raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
# If we got here, we successfully verified so we can return without error
return
- except ValueError:
+ except InvalidSignature:
# this happens on an individual validation failure
continue
# nothing verified -- raise failure:
return output
-def _need_pycrypto(*args, **kwargs):
- raise ImportError("DNSSEC validation requires pycryptodome/pycryptodomex")
+def _need_pyca(*args, **kwargs):
+ raise ImportError("DNSSEC validation requires python cryptography")
try:
- try:
- # test we're using pycryptodome, not pycrypto (which misses SHA1 for example)
- from Crypto.Hash import MD5, SHA1, SHA256, SHA384, SHA512
- from Crypto.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA
- from Crypto.PublicKey import ECC as CryptoECC
- from Crypto.Signature import pkcs1_15, DSS
- from Crypto.Util import number
- except ImportError:
- from Cryptodome.Hash import MD5, SHA1, SHA256, SHA384, SHA512
- from Cryptodome.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA
- from Cryptodome.PublicKey import ECC as CryptoECC
- from Cryptodome.Signature import pkcs1_15, DSS
- from Cryptodome.Util import number
+ from cryptography.exceptions import InvalidSignature
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import padding
+ from cryptography.hazmat.primitives.asymmetric import utils
+ from cryptography.hazmat.primitives.asymmetric import dsa
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.hazmat.primitives.asymmetric import ed25519
+ from cryptography.hazmat.primitives.asymmetric import ed448
+ from cryptography.hazmat.primitives.asymmetric import rsa
except ImportError:
- validate = _need_pycrypto
- validate_rrsig = _need_pycrypto
- _have_pycrypto = False
- _have_ecpy = False
+ validate = _need_pyca
+ validate_rrsig = _need_pyca
+ _have_pyca = False
else:
validate = _validate
validate_rrsig = _validate_rrsig
- _have_pycrypto = True
-
- try:
- from ecpy.curves import Curve
- from ecpy.keys import ECPublicKey
- from ecpy.eddsa import EDDSA
- except ImportError:
- _have_ecpy = False
- else:
- _have_ecpy = True
+ _have_pyca = True
when5 = 1440021600
-@unittest.skipUnless(dns.dnssec._have_pycrypto,
- "Pycryptodome cannot be imported")
+@unittest.skipUnless(dns.dnssec._have_pyca,
+ "Python Cryptography cannot be imported")
class DNSSECValidatorTestCase(unittest.TestCase):
def testAbsoluteRSAGood(self): # type: () -> None
abs_ecdsa384_keys, None, when4)
self.assertRaises(dns.dnssec.ValidationFailure, bad)
- @unittest.skipUnless(dns.dnssec._have_ecpy,
- "python EDDSA cannot be imported")
- @unittest.skipUnless(sys.version_info >= (3, 6),
- "Python 3.6 or later is needed")
def testAbsoluteED25519Good(self): # type: () -> None
dns.dnssec.validate(abs_ed25519_mx, abs_ed25519_mx_rrsig_1,
abs_ed25519_keys_1, None, when5)
dns.dnssec.validate(abs_ed25519_mx, abs_ed25519_mx_rrsig_2,
abs_ed25519_keys_2, None, when5)
- @unittest.skipUnless(dns.dnssec._have_ecpy,
- "python EDDSA cannot be imported")
- @unittest.skipUnless(sys.version_info >= (3, 6),
- "Python 3.6 or later is needed")
def testAbsoluteED25519Bad(self): # type: () -> None
with self.assertRaises(dns.dnssec.ValidationFailure):
dns.dnssec.validate(abs_other_ed25519_mx, abs_ed25519_mx_rrsig_1,
dns.dnssec.validate(abs_other_ed25519_mx, abs_ed25519_mx_rrsig_2,
abs_ed25519_keys_2, None, when5)
- @unittest.skipUnless(dns.dnssec._have_ecpy,
- "python EDDSA cannot be imported")
- @unittest.skipUnless(sys.version_info >= (3, 6),
- "Python 3.6 or later is needed")
def testAbsoluteED448Good(self): # type: () -> None
dns.dnssec.validate(abs_ed448_mx, abs_ed448_mx_rrsig_1,
abs_ed448_keys_1, None, when5)
dns.dnssec.validate(abs_ed448_mx, abs_ed448_mx_rrsig_2,
abs_ed448_keys_2, None, when5)
- @unittest.skipUnless(dns.dnssec._have_ecpy,
- "python EDDSA cannot be imported")
- @unittest.skipUnless(sys.version_info >= (3, 6),
- "Python 3.6 or later is needed")
def testAbsoluteED448Bad(self): # type: () -> None
with self.assertRaises(dns.dnssec.ValidationFailure):
dns.dnssec.validate(abs_other_ed448_mx, abs_ed448_mx_rrsig_1,