"""Common DNSSEC-related functions and constants."""
+import cStringIO
+import struct
+import time
+
+import dns.exception
import dns.hash
import dns.name
+import dns.node
+import dns.rdataset
import dns.rdata
import dns.rdatatype
import dns.rdataclass
+class UnsupportedAlgorithm(dns.exception.DNSException):
+ """Raised if an algorithm is not supported."""
+ pass
+
+class ValidationFailure(dns.exception.DNSException):
+ """The DNSSEC signature is invalid."""
+ pass
+
RSAMD5 = 1
DH = 2
DSA = 3
_algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
-class UnknownAlgorithm(Exception):
- """Raised if an algorithm is unknown."""
- pass
-
def algorithm_from_text(text):
"""Convert text into a DNSSEC algorithm value
@rtype: int"""
text = str(value)
return text
-def _to_rdata(record):
+def _to_rdata(record, origin):
s = cStringIO.StringIO()
- record.to_wire(s)
+ record.to_wire(s, origin=origin)
return s.getvalue()
-def key_id(key):
- rdata = _to_rdata(key)
+def key_id(key, origin=None):
+ rdata = _to_rdata(key, origin)
if key.algorithm == RSAMD5:
return (ord(rdata[-3]) << 8) + ord(rdata[-2])
else:
total += ((total >> 16) & 0xffff);
return total & 0xffff
-def make_ds(name, key, algorithm):
+def make_ds(name, key, algorithm, origin=None):
if algorithm.upper() == 'SHA1':
dsalg = 1
hash = dns.hash.get('SHA1')()
dsalg = 2
hash = dns.hash.get('SHA256')()
else:
- raise ValueError, 'unsupported algorithm "%s"' % algorithm
+ raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
if isinstance(name, (str, unicode)):
- name = dns.name.from_text(name)
+ name = dns.name.from_text(name, origin)
hash.update(name.canonicalize().to_wire())
- hash.update(_to_rdata(key))
+ hash.update(_to_rdata(key, origin))
digest = hash.digest()
dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
len(dsrdata))
+
def _find_key(keys, rrsig):
- for key in keys:
- if key.algorithm == rrsig.algorithm and key_id(key) == rrsig.key_tag:
- return key
+ value = keys.get(rrsig.signer)
+ if value is None:
+ return None
+ if isinstance(value, dns.node.Node):
+ try:
+ rdataset = node.find_rdataset(dns.rdataclass.IN,
+ dns.rdatatype.DNSKEY)
+ except KeyError:
+ return None
+ else:
+ rdataset = value
+ for rdata in rdataset:
+ if rdata.algorithm == rrsig.algorithm and \
+ key_id(rdata) == rrsig.key_tag:
+ return rdata
return None
def _is_rsa(algorithm):
return dns.hash.get('SHA256')()
if _is_sha512(algorithm):
return dns.hash.get('SHA512')()
- raise ValueError, 'unknown algorithm'
+ raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm
def _make_algorithm_id(algorithm):
if _is_md5(algorithm):
elif _is_sha512(algorithm):
oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
else:
- raise ValueError, 'unknown algorithm %u' % algorithm
+ raise ValidationFailure, 'unknown algorithm %u' % algorithm
olen = len(oid)
dlen = _make_hash(algorithm).digest_size
idbytes = [0x30] + [8 + olen + dlen] + \
[0x05, 0x00] + [0x04, dlen]
return ''.join(map(chr, idbytes))
-def _validate(rrset, rrsig, keys):
+def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
+ """Validate an RRset against a single signature rdata
+
+ The owner name of the rrsig is assumed to be the same as the owner name
+ of the rrset.
+
+ @param rrset: The RRset to validate
+ @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+ tuple
+ @param rrsig: The signature rdata
+ @type rrsig: dns.rrset.Rdata
+ @param keys: The key dictionary.
+ @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
+ @param origin: The origin to use for relative names
+ @type origin: dns.name.Name or None
+ @param now: The time to use when validating the signatures. The default
+ is the current time.
+ @type now: int
+ """
+
+ if isinstance(origin, (str, unicode)):
+ origin = dns.name.from_text(origin, dns.name.root)
+
key = _find_key(keys, rrsig)
if not key:
- raise ValueError, 'unknown key'
+ raise ValidationFailure, 'unknown key'
+
+ # For convenience, allow the rrset to be specified as a (name, rdataset)
+ # tuple as well as a proper rrset
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ rdataset = rrset[1]
+ else:
+ rrname = rrset.name
+ rdataset = rrset
- now = time.time()
+ if now is None:
+ now = time.time()
if rrsig.expiration < now:
- raise ValueError, 'expired'
+ raise ValidationFailure, 'expired'
if rrsig.inception > now:
- raise ValueError, 'not yet valid'
+ raise ValidationFailure, 'not yet valid'
hash = _make_hash(rrsig.algorithm)
sig = (Crypto.Util.number.bytes_to_long(dsa_r),
Crypto.Util.number.bytes_to_long(dsa_s))
else:
- raise ValueError, 'unknown algorithm'
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
- hash.update(_to_rdata(rrsig)[:18])
- hash.update(rrsig.signer.to_digestable())
+ hash.update(_to_rdata(rrsig, origin)[:18])
+ hash.update(rrsig.signer.to_digestable(origin))
- rrname = rrset.name
if rrsig.labels < len(rrname) - 1:
suffix = rrname.split(rrsig.labels + 1)[1]
rrname = dns.name.from_text('*', suffix)
- rrnamebuf = rrname.to_digestable()
- rrfixed = struct.pack('!HHI', rrset.rdtype, rrset.rdclass,
+ rrnamebuf = rrname.to_digestable(origin)
+ rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
rrsig.original_ttl)
- rrlist = sorted(rrset);
+ rrlist = sorted(rdataset);
for rr in rrlist:
hash.update(rrnamebuf)
hash.update(rrfixed)
- rrdata = rr.to_digestable()
+ rrdata = rr.to_digestable(origin)
rrlen = struct.pack('!H', len(rrdata))
hash.update(rrlen)
hash.update(rrdata)
elif _is_dsa(rrsig.algorithm):
pass
else:
- raise ValueError, 'unknown algorithm'
+ # Raise here for code clarity; this won't actually ever happen
+ # since if the algorithm is really unknown we'd already have
+ # raised an exception above
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
if not pubkey.verify(digest, sig):
- raise ValueError, 'verify failure'
+ raise ValidationFailure, 'verify failure'
+
+def _validate(rrset, rrsigset, keys, origin=None, now=None):
+ """Validate an RRset
+
+ @param rrset: The RRset to validate
+ @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+ tuple
+ @param rrsigset: The signature RRset
+ @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
+ tuple
+ @param keys: The key dictionary.
+ @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
+ @param origin: The origin to use for relative names
+ @type origin: dns.name.Name or None
+ @param now: The time to use when validating the signatures. The default
+ is the current time.
+ @type now: int
+ """
+
+ if isinstance(origin, (str, unicode)):
+ origin = dns.name.from_text(origin, dns.name.root)
+
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ else:
+ rrname = rrset.name
+
+ if isinstance(rrsigset, tuple):
+ rrsigname = rrsigset[0]
+ rrsigrdataset = rrsigset[1]
+ else:
+ rrsigname = rrsigset.name
+ rrsigrdataset = rrsigset
+
+ rrname = rrname.choose_relativity(origin)
+ rrsigname = rrname.choose_relativity(origin)
+ if rrname != rrsigname:
+ raise ValidationFailure, "owner names do not match"
+
+ for rrsig in rrsigrdataset:
+ try:
+ _validate_rrsig(rrset, rrsig, keys, origin, now)
+ return
+ except ValidationFailure, e:
+ pass
+ raise ValidationFailure, "no RRSIGs validated"
def _need_pycrypto(*args, **kwargs):
raise NotImplementedError, "DNSSEC validation requires pycrypto"
from Crypto.PublicKey import RSA,DSA
import Crypto.Util.number
validate = _validate
+ validate_rrsig = _validate_rrsig
except ImportError:
validate = _need_pycrypto
+ validate_rrsig = _need_pycrypto