return _wordbreak(base64.b64encode(data), chunksize)
+
__escaped = b'"\\'
def _escapify(qstring):
return dns.rdatatype.RdataType.make(value)
@classmethod
- def _as_bytes(cls, value, encode=False, max_length=None):
+ def _as_bytes(cls, value, encode=False, max_length=None, empty_ok=True):
if encode and isinstance(value, str):
value = value.encode()
elif isinstance(value, bytearray):
raise ValueError('not bytes')
if max_length is not None and len(value) > max_length:
raise ValueError('too long')
+ if not empty_ok and len(value) == 0:
+ raise ValueError('empty bytes not allowed')
return value
@classmethod
else:
raise ValueError('not a TTL')
+ @classmethod
+ def _as_tuple(cls, value, as_value):
+ try:
+ # For user convenience, if value is a singleton of the list
+ # element type, wrap it in a tuple.
+ return (as_value(value),)
+ except Exception:
+ # Otherwise, check each element of the iterable *value*
+ # against *as_value*.
+ return tuple(as_value(v) for v in value)
+
class GenericRdata(Rdata):
super().__init__(rdclass, rdtype)
self.serial = self._as_uint32(serial)
self.flags = self._as_uint16(flags)
- if isinstance(windows, Bitmap):
- bitmap = windows
- else:
- bitmap = Bitmap(windows)
- self.windows = tuple(bitmap.windows)
+ if not isinstance(windows, Bitmap):
+ windows = Bitmap(windows)
+ self.windows = windows.windows
def to_text(self, origin=None, relativize=True, **kw):
text = Bitmap(self.windows).to_text()
self.hit = self._as_bytes(hit, True, 255)
self.algorithm = self._as_uint8(algorithm)
self.key = self._as_bytes(key, True)
- self.servers = tuple([self._as_name(s) for s in servers])
+ self.servers = self._as_tuple(servers, self._as_name)
def to_text(self, origin=None, relativize=True, **kw):
hit = binascii.hexlify(self.hit).decode()
def __init__(self, rdclass, rdtype, next, windows):
super().__init__(rdclass, rdtype)
self.next = self._as_name(next)
- if isinstance(windows, Bitmap):
- bitmap = windows
- else:
- bitmap = Bitmap(windows)
- self.windows = tuple(bitmap.windows)
+ if not isinstance(windows, Bitmap):
+ windows = Bitmap(windows)
+ self.windows = windows.windows
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
self.iterations = self._as_uint16(iterations)
self.salt = self._as_bytes(salt, True, 255)
self.next = self._as_bytes(next, True, 255)
- if isinstance(windows, Bitmap):
- bitmap = windows
- else:
- bitmap = Bitmap(windows)
- self.windows = tuple(bitmap.windows)
+ if not isinstance(windows, Bitmap):
+ windows = Bitmap(windows)
+ self.windows = windows.windows
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(
"""
super().__init__(rdclass, rdtype)
- for option in options:
+ def as_option(option):
if not isinstance(option, dns.edns.Option):
raise ValueError('option is not a dns.edns.option')
- self.options = tuple(options)
+ return option
+ self.options = self._as_tuple(options, as_option)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
for opt in self.options:
@dns.immutable.immutable
class ALPNParam(Param):
def __init__(self, ids):
- for id in ids:
- id = dns.rdata.Rdata._as_bytes(id, True, 255)
- if len(id) == 0:
- raise dns.exception.FormError('empty ALPN')
- self.ids = tuple(ids)
+ self.ids = dns.rdata.Rdata._as_tuple(
+ ids, lambda x: dns.rdata.Rdata._as_bytes(x, True, 255, False))
@classmethod
def from_value(cls, value):
@dns.immutable.immutable
class IPv4HintParam(Param):
def __init__(self, addresses):
- for address in addresses:
- # check validity
- dns.ipv4.inet_aton(address)
- self.addresses = tuple(addresses)
+ self.addresses = dns.rdata.Rdata._as_tuple(
+ addresses, dns.rdata.Rdata._as_ipv4_address)
@classmethod
def from_value(cls, value):
@dns.immutable.immutable
class IPv6HintParam(Param):
def __init__(self, addresses):
- for address in addresses:
- # check validity
- dns.ipv6.inet_aton(address)
- self.addresses = tuple(addresses)
+ self.addresses = dns.rdata.Rdata._as_tuple(
+ addresses, dns.rdata.Rdata._as_ipv6_address)
@classmethod
def from_value(cls, value):
super().__init__(rdclass, rdtype)
self.priority = self._as_uint16(priority)
self.target = self._as_name(target)
- self.params = dns.immutable.constify(params)
+ for k, v in params.items():
+ k = ParamKey.make(k)
+ if not isinstance(v, Param) and v is not None:
+ raise ValueError("not a Param")
+ self.params = dns.immutable.Dict(params)
# Make sure any paramater listed as mandatory is present in the
# record.
mandatory = params.get(ParamKey.MANDATORY)
*strings*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
- if isinstance(strings, (bytes, str)):
- strings = (strings,)
- encoded_strings = []
- for string in strings:
- string = self._as_bytes(string, True, 255)
- encoded_strings.append(string)
- self.strings = tuple(encoded_strings)
+ self.strings = self._as_tuple(strings,
+ lambda x: self._as_bytes(x, True, 255))
def to_text(self, origin=None, relativize=True, **kw):
txt = ''