import dns.wire
import dns.exception
import dns.immutable
+import dns.ipv4
+import dns.ipv6
import dns.name
import dns.rdataclass
import dns.rdatatype
import dns.tokenizer
+import dns.ttl
_chunksize = 32
*rdtype*, an ``int`` is the rdatatype of the Rdata.
"""
- self.rdclass = self.as_rdataclass(rdclass)
- self.rdtype = self.as_rdatatype(rdtype)
+ self.rdclass = self._as_rdataclass(rdclass)
+ self.rdtype = self._as_rdatatype(rdtype)
self.rdcomment = None
def _get_all_slots(self):
# doesn't do any additional checking.
return value
- def as_rdataclass(self, value):
+ # Type checking and conversion helpers. These are class methods as
+ # they don't touch object state and may be useful to others.
+
+ @classmethod
+ def _as_rdataclass(cls, value):
return dns.rdataclass.RdataClass.make(value)
- def as_rdatatype(self, value):
+ @classmethod
+ def _as_rdatatype(cls, value):
return dns.rdatatype.RdataType.make(value)
+ @classmethod
+ def _as_bytes(cls, value, encode=False, max_length=None):
+ if encode and isinstance(value, str):
+ value = value.encode()
+ elif not isinstance(value, bytes):
+ raise ValueError('not bytes')
+ if max_length is not None and len(value) > max_length:
+ raise ValueError('too long')
+ return value
+
+ @classmethod
+ def _as_name(cls, value):
+ # Note that proper name conversion (e.g. with origin and IDNA
+ # awareness) is expected to be done via from_text. This is just
+ # a simple thing for people invoking the constructor directly.
+ if isinstance(value, str):
+ return dns.name.from_text(value)
+ elif not isinstance(value, dns.name.Name):
+ raise ValueError('not a name')
+ return value
+
+ @classmethod
+ def _as_uint8(cls, value):
+ if not isinstance(value, int):
+ raise ValueError('not an integer')
+ if value < 0 or value > 255:
+ raise ValueError('not a uint8')
+ return value
+
+ @classmethod
+ def _as_uint16(cls, value):
+ if not isinstance(value, int):
+ raise ValueError('not an integer')
+ if value < 0 or value > 65535:
+ raise ValueError('not a uint16')
+ return value
+
+ @classmethod
+ def _as_uint32(cls, value):
+ if not isinstance(value, int):
+ raise ValueError('not an integer')
+ if value < 0 or value > 4294967295:
+ raise ValueError('not a uint32')
+ return value
+
+ @classmethod
+ def _as_int(cls, value, low=None, high=None):
+ if not isinstance(value, int):
+ raise ValueError('not an integer')
+ if low is not None and value < low:
+ raise ValueError('value too small')
+ if high is not None and value > high:
+ raise ValueError('value too large')
+ return value
+
+ @classmethod
+ def _as_ipv4_address(cls, value):
+ if isinstance(value, str):
+ # call to check validity
+ dns.ipv4.inet_aton(value)
+ return value
+ elif isinstance(value, bytes):
+ return dns.ipv4.inet_ntoa(value)
+ else:
+ raise ValueError('not an IPv4 address')
+
+ @classmethod
+ def _as_ipv6_address(cls, value):
+ if isinstance(value, str):
+ # call to check validity
+ dns.ipv6.inet_aton(value)
+ return value
+ elif isinstance(value, bytes):
+ return dns.ipv6.inet_ntoa(value)
+ else:
+ raise ValueError('not an IPv6 address')
+
+ @classmethod
+ def _as_bool(cls, value):
+ if isinstance(value, bool):
+ return value
+ else:
+ raise ValueError('not a boolean')
+
+ @classmethod
+ def _as_ttl(cls, value):
+ if isinstance(value, int):
+ return cls._as_int(value, 0, dns.ttl.MAX_TTL)
+ elif isinstance(value, str):
+ value = dns.ttl.from_text(value)
+ else:
+ raise ValueError('not a TTL')
+
class GenericRdata(Rdata):
def __init__(self, rdclass, rdtype, serial, flags, windows):
super().__init__(rdclass, rdtype)
- self.serial = self.as_value(serial)
- self.flags = self.as_value(flags)
- self.windows = self.as_value(dns.rdata._constify(windows))
+ self.serial = self._as_uint32(serial)
+ self.flags = self._as_uint16(flags)
+ self.windows = dns.rdata._constify(windows)
def to_text(self, origin=None, relativize=True, **kw):
text = Bitmap(self.windows).to_text()
def __init__(self, rdclass, rdtype, cpu, os):
super().__init__(rdclass, rdtype)
- if isinstance(cpu, str):
- self.cpu = self.as_value(cpu.encode())
- else:
- self.cpu = self.as_value(cpu)
- if isinstance(os, str):
- self.os = self.as_value(os.encode())
- else:
- self.os = self.as_value(os)
+ self.cpu = self._as_bytes(cpu, True, 255)
+ self.os = self._as_bytes(os, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
return '"{}" "{}"'.format(dns.rdata._escapify(self.cpu),
def __init__(self, rdclass, rdtype, address, subaddress):
super().__init__(rdclass, rdtype)
- if isinstance(address, str):
- self.address = self.as_value(address.encode())
- else:
- self.address = self.as_value(address)
- if isinstance(address, str):
- self.subaddress = self.as_value(subaddress.encode())
- else:
- self.subaddress = self.as_value(subaddress)
+ self.address = self._as_bytes(address, True, 255)
+ self.subaddress = self._as_bytes(subaddress, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
def __init__(self, rdclass, rdtype, next, windows):
super().__init__(rdclass, rdtype)
- self.next = self.as_value(next)
- self.windows = self.as_value(dns.rdata._constify(windows))
+ self.next = self._as_name(next)
+ self.windows = dns.rdata._constify(windows)
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt,
next, windows):
super().__init__(rdclass, rdtype)
- self.algorithm = self.as_value(algorithm)
- self.flags = self.as_value(flags)
- self.iterations = self.as_value(iterations)
- if isinstance(salt, str):
- self.salt = self.as_value(salt.encode())
- else:
- self.salt = self.as_value(salt)
- self.next = self.as_value(next)
- self.windows = self.as_value(dns.rdata._constify(windows))
+ self.algorithm = self._as_uint8(algorithm)
+ self.flags = self._as_uint8(flags)
+ self.iterations = self._as_uint16(iterations)
+ self.salt = self._as_bytes(salt, True, 255)
+ self.next = self._as_bytes(next, True, 255)
+ self.windows = dns.rdata._constify(windows)
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(
def __init__(self, rdclass, rdtype, key):
super().__init__(rdclass, rdtype)
- self.key = self.as_value(key)
+ self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.key)
def __init__(self, rdclass, rdtype, mbox, txt):
super().__init__(rdclass, rdtype)
- self.mbox = self.as_value(mbox)
- self.txt = self.as_value(txt)
+ self.mbox = self._as_name(mbox)
+ self.txt = self._as_name(txt)
def to_text(self, origin=None, relativize=True, **kw):
mbox = self.mbox.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum):
super().__init__(rdclass, rdtype)
- self.mname = self.as_value(mname)
- self.rname = self.as_value(rname)
- self.serial = self.as_value(serial)
- self.refresh = self.as_value(refresh)
- self.retry = self.as_value(retry)
- self.expire = self.as_value(expire)
- self.minimum = self.as_value(minimum)
+ self.mname = self._as_name(mname)
+ self.rname = self._as_name(rname)
+ self.serial = self._as_uint32(serial)
+ self.refresh = self._as_ttl(refresh)
+ self.retry = self._as_ttl(retry)
+ self.expire = self._as_ttl(expire)
+ self.minimum = self._as_ttl(minimum)
def to_text(self, origin=None, relativize=True, **kw):
mname = self.mname.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
- if isinstance(address, str):
- self.address = self.as_value(address.encode())
- else:
- self.address = self.as_value(address)
+ self.address = self._as_bytes(address, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
return '"%s"' % dns.rdata._escapify(self.address)
def __init__(self, rdclass, rdtype, domain, address):
super().__init__(rdclass, rdtype)
- self.domain = self.as_value(domain)
- self.address = self.as_value(address)
+ self.domain = self._as_name(domain)
+ self.address = self._as_uint16(address)
def to_text(self, origin=None, relativize=True, **kw):
domain = self.domain.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
- # check that it's OK
- dns.ipv4.inet_aton(address)
- self.address = self.as_value(address)
+ self.address = self._as_ipv4_address(address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- address = dns.ipv4.inet_ntoa(parser.get_remaining())
+ address = parser.get_remaining()
return cls(rdclass, rdtype, address)
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
- # check that it's OK
- dns.ipv6.inet_aton(address)
- self.address = self.as_value(address)
+ self.address = self._as_ipv6_address(address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- address = dns.ipv6.inet_ntoa(parser.get_remaining())
+ address = parser.get_remaining()
return cls(rdclass, rdtype, address)
__slots__ = ['family', 'negation', 'address', 'prefix']
def __init__(self, family, negation, address, prefix):
- self.family = family
- self.negation = negation
- self.address = address
- self.prefix = prefix
+ self.family = dns.rdata.Rdata._as_uint16(family)
+ self.negation = dns.rdata.Rdata._as_bool(negation)
+ if self.family == 1:
+ self.address = dns.rdata.Rdata._as_ipv4_address(address)
+ self.prefix = dns.rdata.Rdata._as_int(prefix, 0, 32)
+ elif self.family == 2:
+ self.address = dns.rdata.Rdata._as_ipv6_address(address)
+ self.prefix = dns.rdata.Rdata._as_int(prefix, 0, 128)
+ else:
+ self.address = dns.rdata.Rdata._as_bytes(address)
+ self.prefix = dns.rdata.Rdata._as_uint8(prefix)
def __str__(self):
if self.negation:
def __init__(self, rdclass, rdtype, items):
super().__init__(rdclass, rdtype)
- self.items = self.as_value(dns.rdata._constify(items))
+ for item in items:
+ if not isinstance(item, APLItem):
+ raise ValueError('item not an APLItem')
+ self.items = dns.rdata._constify(items)
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(str, self.items))
if header[0] == 1:
if l < 4:
address += b'\x00' * (4 - l)
- address = dns.ipv4.inet_ntoa(address)
elif header[0] == 2:
if l < 16:
address += b'\x00' * (16 - l)
- address = dns.ipv6.inet_ntoa(address)
else:
#
# This isn't really right according to the RFC, but it
def __init__(self, rdclass, rdtype, data):
super().__init__(rdclass, rdtype)
- self.data = self.as_value(data)
+ self.data = self._as_bytes(data)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.data)
gateway, key):
super().__init__(rdclass, rdtype)
Gateway(gateway_type, gateway).check()
- self.precedence = self.as_value(precedence)
- self.gateway_type = self.as_value(gateway_type)
- self.algorithm = self.as_value(algorithm)
- self.gateway = self.as_value(gateway)
- self.key = self.as_value(key)
+ self.precedence = self._as_uint8(precedence)
+ self.gateway_type = self._as_uint8(gateway_type)
+ self.algorithm = self._as_uint8(algorithm)
+ self.gateway = gateway
+ self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin,
file.write(s)
-def _sanitize(value):
- if isinstance(value, str):
- return value.encode()
- return value
-
-
@dns.immutable.immutable
class NAPTR(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, order, preference, flags, service,
regexp, replacement):
super().__init__(rdclass, rdtype)
- self.flags = self.as_value(_sanitize(flags))
- self.service = self.as_value(_sanitize(service))
- self.regexp = self.as_value(_sanitize(regexp))
- self.order = self.as_value(order)
- self.preference = self.as_value(preference)
- self.replacement = self.as_value(replacement)
+ self.flags = self._as_bytes(flags, True, 255)
+ self.service = self._as_bytes(service, True, 255)
+ self.regexp = self._as_bytes(regexp, True, 255)
+ self.order = self._as_uint16(order)
+ self.preference = self._as_uint16(preference)
+ self.replacement = self._as_name(replacement)
def to_text(self, origin=None, relativize=True, **kw):
replacement = self.replacement.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
- self.address = self.as_value(address)
+ self.address = self._as_bytes(address)
def to_text(self, origin=None, relativize=True, **kw):
return "0x%s" % binascii.hexlify(self.address).decode()
def __init__(self, rdclass, rdtype, preference, map822, mapx400):
super().__init__(rdclass, rdtype)
- self.preference = self.as_value(preference)
- self.map822 = self.as_value(map822)
- self.mapx400 = self.as_value(mapx400)
+ self.preference = self._as_uint16(preference)
+ self.map822 = self._as_name(map822)
+ self.mapx400 = self._as_name(mapx400)
def to_text(self, origin=None, relativize=True, **kw):
map822 = self.map822.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, priority, weight, port, target):
super().__init__(rdclass, rdtype)
- self.priority = self.as_value(priority)
- self.weight = self.as_value(weight)
- self.port = self.as_value(port)
- self.target = self.as_value(target)
+ self.priority = self._as_uint16(priority)
+ self.weight = self._as_uint16(weight)
+ self.port = self._as_uint16(port)
+ self.target = self._as_name(target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super().__init__(rdclass, rdtype)
- self.address = self.as_value(address)
- self.protocol = self.as_value(protocol)
- self.bitmap = self.as_value(dns.rdata._constify(bitmap))
+ self.address = self._as_ipv4_address(address)
+ self.protocol = self._as_uint8(protocol)
+ self.bitmap = self._as_bytes(dns.rdata._constify(bitmap))
def to_text(self, origin=None, relativize=True, **kw):
bits = []
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- address = dns.ipv4.inet_ntoa(parser.get_bytes(4))
+ address = parser.get_bytes(4)
protocol = parser.get_uint8()
bitmap = parser.get_remaining()
return cls(rdclass, rdtype, address, protocol, bitmap)
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super().__init__(rdclass, rdtype)
- self.flags = self.as_value(flags)
- self.protocol = self.as_value(protocol)
- self.algorithm = self.as_value(algorithm)
- self.key = self.as_value(key)
+ self.flags = self._as_uint16(flags)
+ self.protocol = self._as_uint8(protocol)
+ self.algorithm = dns.dnssec.Algorithm.make(algorithm)
+ self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.flags, self.protocol, self.algorithm,
relativize_to=None):
flags = tok.get_uint16()
protocol = tok.get_uint8()
- algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
+ algorithm = tok.get_string()
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
def __init__(self, rdclass, rdtype, key_tag, algorithm, digest_type,
digest):
super().__init__(rdclass, rdtype)
- self.key_tag = self.as_value(key_tag)
- self.algorithm = self.as_value(algorithm)
- self.digest_type = self.as_value(digest_type)
- self.digest = self.as_value(digest)
+ self.key_tag = self._as_uint16(key_tag)
+ self.algorithm = dns.dnssec.Algorithm.make(algorithm)
+ self.digest_type = self._as_uint8(digest_type)
+ self.digest = self._as_bytes(digest)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.key_tag, self.algorithm,
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
key_tag = tok.get_uint16()
- algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
+ algorithm = tok.get_string()
digest_type = tok.get_uint8()
digest = tok.concatenate_remaining_identifiers().encode()
digest = binascii.unhexlify(digest)
def __init__(self, rdclass, rdtype, eui):
super().__init__(rdclass, rdtype)
- if len(eui) != self.byte_len:
+ self.eui = self._as_bytes(eui)
+ if len(self.eui) != self.byte_len:
raise dns.exception.FormError('EUI%s rdata has to have %s bytes'
% (self.byte_len * 8, self.byte_len))
- self.eui = self.as_value(eui)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._hexify(self.eui, chunksize=2).replace(' ', '-')
def __init__(self, rdclass, rdtype, preference, exchange):
super().__init__(rdclass, rdtype)
- self.preference = self.as_value(preference)
- self.exchange = self.as_value(exchange)
+ self.preference = self._as_uint16(preference)
+ self.exchange = self._as_name(exchange)
def to_text(self, origin=None, relativize=True, **kw):
exchange = self.exchange.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, target):
super().__init__(rdclass, rdtype)
- self.target = self.as_value(target)
+ self.target = self._as_name(target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
def __init__(self, rdclass, rdtype, priority, target, params):
super().__init__(rdclass, rdtype)
- self.priority = self.as_value(priority)
- self.target = self.as_value(target)
- self.params = self.as_value(dns.immutable.constify(params))
+ self.priority = self._as_uint16(priority)
+ self.target = self._as_name(target)
+ self.params = dns.immutable.constify(params)
# Make sure any paramater listed as mandatory is present in the
# record.
mandatory = params.get(ParamKey.MANDATORY)