name = name.relativize(origin)
dlabels = [d for d in name.labels if (d.isdigit() and len(d) == 1)]
if len(dlabels) != len(name.labels):
- raise dns.exception.SyntaxError, 'non-digit labels in ENUM domain name'
+ raise dns.exception.SyntaxError('non-digit labels in ENUM domain name')
dlabels.reverse()
text = ''.join(dlabels)
if want_plus_prefix:
def random_between(self, first, last):
size = last - first + 1
if size > 4294967296L:
- raise ValueError, 'too big'
+ raise ValueError('too big')
if size > 65536:
rand = self.random_32
max = 4294967295L
"""
if len(address) != 16:
- raise ValueError, "IPv6 addresses are 16 bytes long"
+ raise ValueError("IPv6 addresses are 16 bytes long")
hex = address.encode('hex_codec')
chunks = []
i = 0
def inet_aton(text):
"""Convert a text format IPv6 address into network format.
-
+
@param text: the textual address
@type text: string
@rtype: string
@raises dns.exception.SyntaxError: the text was not properly formatted
"""
-
+
#
# Our aim here is not something fast; we just want something that works.
#
elif section is self.additional:
return 3
else:
- raise ValueError, 'unknown section'
+ raise ValueError('unknown section')
def find_rrset(self, section, name, rdclass, rdtype,
covers=dns.rdatatype.NONE, deleting=None, create=False,
i == (count - 1)):
raise BadTSIG
if self.message.keyring is None:
- raise UnknownTSIGKey, 'got signed message without keyring'
+ raise UnknownTSIGKey('got signed message without keyring')
secret = self.message.keyring.get(absolute_name)
if secret is None:
- raise UnknownTSIGKey, "key '%s' unknown" % name
+ raise UnknownTSIGKey("key '%s' unknown" % name)
self.message.tsig_ctx = \
dns.tsig.validate(self.wire,
absolute_name,
@rtype: dns.message.Message object"""
if query.flags & dns.flags.QR:
- raise dns.exception.FormError, 'specified query message is not a query'
+ raise dns.exception.FormError('specified query message is not a query')
response = dns.message.Message(query.id)
response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
if recursion_available:
@raises LabelTooLong: an individual label is too long
@raises EmptyLabel: a label is empty (i.e. the root label) and appears
in a position other than the end of the label sequence"""
-
+
l = len(labels)
total = 0
i = -1
The dns.name.Name class represents a DNS name as a tuple of labels.
Instances of the class are immutable.
-
+
@ivar labels: The tuple of labels in the name. Each label is a string of
up to 63 octets."""
__slots__ = ['labels']
-
+
def __init__(self, labels):
"""Initialize a domain name from a list of labels.
@param labels: the labels
@type labels: any iterable whose values are strings
"""
-
+
super(Name, self).__setattr__('labels', tuple(labels))
_validate_labels(self.labels)
def __setattr__(self, name, value):
- raise TypeError, "object doesn't support attribute assignment"
+ raise TypeError("object doesn't support attribute assignment")
def is_absolute(self):
"""Is the most significant label of this name the root label?
@rtype: bool
"""
-
+
return len(self.labels) > 0 and self.labels[-1] == ''
def is_wild(self):
"""Is this name wild? (I.e. Is the least significant label '*'?)
@rtype: bool
"""
-
+
return len(self.labels) > 0 and self.labels[0] == '*'
def __hash__(self):
"""Return a case-insensitive hash of the name.
@rtype: int
"""
-
+
h = 0L
for label in self.labels:
for c in label:
0 if self == other. A relative name is always less than an
absolute name. If both names have the same relativity, then
the DNSSEC order relation is used to order them.
-
+
I{nlabels} is the number of significant labels that the two names
have in common.
"""
The notion of subdomain includes equality.
@rtype: bool
"""
-
+
(nr, o, nl) = self.fullcompare(other)
if nr == NAMERELN_SUBDOMAIN or nr == NAMERELN_EQUAL:
return True
The notion of subdomain includes equality.
@rtype: bool
"""
-
+
(nr, o, nl) = self.fullcompare(other)
if nr == NAMERELN_SUPERDOMAIN or nr == NAMERELN_EQUAL:
return True
DNSSEC canonical form.
@rtype: dns.name.Name object
"""
-
+
return Name([x.lower() for x in self.labels])
def __eq__(self, other):
def __repr__(self):
return '<DNS name ' + self.__str__() + '>'
-
+
def __str__(self):
return self.to_text(False)
root label) for absolute names. The default is False.
@rtype: string
"""
-
+
if len(self.labels) == 0:
return '@'
if len(self.labels) == 1 and self.labels[0] == '':
root label) for absolute names. The default is False.
@rtype: string
"""
-
+
if len(self.labels) == 0:
return u'@'
if len(self.labels) == 1 and self.labels[0] == '':
"""Convert name to a format suitable for digesting in hashes.
The name is canonicalized and converted to uncompressed wire format.
-
+
@param origin: If the name is relative and origin is not None, then
origin will be appended to it.
@type origin: dns.name.Name object
if it is missing, then this exception is raised
@rtype: string
"""
-
+
if not self.is_absolute():
if origin is None or not origin.is_absolute():
raise NeedAbsoluteNameOrOrigin
labels = self.labels
dlabels = ["%s%s" % (chr(len(x)), x.lower()) for x in labels]
return ''.join(dlabels)
-
+
def to_wire(self, file = None, compress = None, origin = None):
"""Convert name to wire format, possibly compressing it.
"""The length of the name (in labels).
@rtype: int
"""
-
+
return len(self.labels)
def __getitem__(self, index):
@returns: the tuple (prefix, suffix)
@rtype: tuple
"""
-
+
l = len(self.labels)
if depth == 0:
return (self, dns.name.empty)
elif depth == l:
return (dns.name.empty, self)
elif depth < 0 or depth > l:
- raise ValueError, \
- 'depth must be >= 0 and <= the length of the name'
+ raise ValueError('depth must be >= 0 and <= the length of the name')
return (Name(self[: -depth]), Name(self[-depth :]))
def concatenate(self, other):
@raises AbsoluteConcatenation: self is absolute and other is
not the empty name
"""
-
+
if self.is_absolute() and len(other) > 0:
raise AbsoluteConcatenation
labels = list(self.labels)
relative to origin. Otherwise return self.
@rtype: dns.name.Name object
"""
-
+
if not origin is None and self.is_subdomain(origin):
return Name(self[: -len(origin)])
else:
concatenation of self and origin. Otherwise return self.
@rtype: dns.name.Name object
"""
-
+
if not self.is_absolute():
return self.concatenate(origin)
else:
false the name is derelativized.
@rtype: dns.name.Name object
"""
-
+
if origin:
if relativize:
return self.relativize(origin)
if self == root or self == empty:
raise NoParent
return Name(self.labels[1:])
-
+
root = Name([''])
empty = Name([])
"""Convert unicode text into a Name object.
Lables are encoded in IDN ACE form.
-
+
@rtype: dns.name.Name object
"""
if not isinstance(text, unicode):
- raise ValueError, "input to from_unicode() must be a unicode string"
+ raise ValueError("input to from_unicode() must be a unicode string")
if not (origin is None or isinstance(origin, Name)):
- raise ValueError, "origin must be a Name or None"
+ raise ValueError("origin must be a Name or None")
labels = []
label = u''
escaping = False
if isinstance(text, unicode) and sys.hexversion >= 0x02030000:
return from_unicode(text, origin)
else:
- raise ValueError, "input to from_text() must be a string"
+ raise ValueError("input to from_text() must be a string")
if not (origin is None or isinstance(origin, Name)):
- raise ValueError, "origin must be a Name or None"
+ raise ValueError("origin must be a Name or None")
labels = []
label = ''
escaping = False
"""
if not isinstance(message, str):
- raise ValueError, "input to from_wire() must be a byte string"
+ raise ValueError("input to from_wire() must be a byte string")
labels = []
biggest_pointer = current
hops = 0
def __setitem__(self, key, value):
if not isinstance(key, dns.name.Name):
- raise ValueError, 'NameDict key must be a name'
+ raise ValueError('NameDict key must be a name')
depth = len(key)
if depth > self.max_depth:
self.max_depth = depth
def get_deepest_match(self, name):
"""Find the deepest match to I{name} in the dictionary.
-
+
The deepest match is the longest name in the dictionary which is
a superdomain of I{name}.
@type name: dns.name.Name object
@rtype: (key, value) tuple
"""
-
+
depth = len(name)
if depth > self.max_depth:
depth = self.max_depth
from_address[1:] == destination[1:]):
break
if not ignore_unexpected:
- raise UnexpectedSource, \
- 'got a response from %s instead of %s' % (from_address,
- destination)
+ raise UnexpectedSource('got a response from '
+ '%s instead of %s' % (from_address,
+ destination))
finally:
s.close()
r = dns.message.from_wire(wire, keyring=q.keyring, request_mac=q.mac,
if v[0] != errno.EINPROGRESS and \
v[0] != errno.EWOULDBLOCK and \
v[0] != errno.EALREADY:
- raise ty, v
+ raise v
def tcp(q, where, timeout=None, port=53, af=None, source=None, source_port=0,
one_rr_per_rrset=False):
source = (source, source_port, 0, 0)
if use_udp:
if rdtype != dns.rdatatype.IXFR:
- raise ValueError, 'cannot do a UDP AXFR'
+ raise ValueError('cannot do a UDP AXFR')
s = socket.socket(af, socket.SOCK_DGRAM, 0)
else:
s = socket.socket(af, socket.SOCK_STREAM, 0)
raise dns.exception.FormError
rrset = r.answer[0]
if rrset.rdtype != dns.rdatatype.SOA:
- raise dns.exception.FormError, "first RRset is not an SOA"
+ raise dns.exception.FormError("first RRset is not an SOA")
answer_index = 1
soa_rrset = rrset.copy()
if rdtype == dns.rdatatype.IXFR:
#
for rrset in r.answer[answer_index:]:
if done:
- raise dns.exception.FormError, "answers after final SOA"
+ raise dns.exception.FormError("answers after final SOA")
if rrset.rdtype == dns.rdatatype.SOA and rrset.name == oname:
if expecting_SOA:
if rrset[0].serial != serial:
- raise dns.exception.FormError, \
- "IXFR base serial mismatch"
+ raise dns.exception.FormError("IXFR base serial mismatch")
expecting_SOA = False
elif rdtype == dns.rdatatype.IXFR:
delete_mode = not delete_mode
rdtype = dns.rdatatype.AXFR
expecting_SOA = False
if done and q.keyring and not r.had_tsig:
- raise dns.exception.FormError, "missing TSIG"
+ raise dns.exception.FormError("missing TSIG")
yield r
s.close()
def from_text(text):
"""Convert text into an rcode.
-
+
@param text: the texual rcode
@type text: string
@raises UnknownRcode: the rcode is unknown
value = (flags & 0x000f) | ((ednsflags >> 20) & 0xff0)
if value < 0 or value > 4095:
- raise ValueError, 'rcode must be >= 0 and <= 4095'
+ raise ValueError('rcode must be >= 0 and <= 4095')
return value
def to_flags(value):
"""
if value < 0 or value > 4095:
- raise ValueError, 'rcode must be >= 0 and <= 4095'
+ raise ValueError('rcode must be >= 0 and <= 4095')
v = value & 0xf
ev = long(value & 0xff0) << 20
return (v, ev)
@type value: int
@rtype: string
"""
-
+
text = _by_value.get(value)
if text is None:
text = str(value)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
token = tok.get()
if not token.is_identifier() or token.value != '\#':
- raise dns.exception.SyntaxError, \
- r'generic rdata does not start with \#'
+ raise dns.exception.SyntaxError(r'generic rdata does not start with \#')
length = tok.get_int()
chunks = []
while 1:
hex = ''.join(chunks)
data = hex.decode('hex_codec')
if len(data) != length:
- raise dns.exception.SyntaxError, \
- 'generic rdata hex data has wrong length'
+ raise dns.exception.SyntaxError('generic rdata hex data has wrong length')
return cls(rdclass, rdtype, data)
from_text = classmethod(from_text)
@raises dns.rdataclass.UnknownRdataClass: the class is unknown
@raises ValueError: the rdata class value is not >= 0 and <= 65535
"""
-
+
value = _by_text.get(text.upper())
if value is None:
match = _unknown_class_pattern.match(text)
raise UnknownRdataclass
value = int(match.group(1))
if value < 0 or value > 65535:
- raise ValueError, "class must be between >= 0 and <= 65535"
+ raise ValueError("class must be between >= 0 and <= 65535")
return value
def to_text(value):
"""
if value < 0 or value > 65535:
- raise ValueError, "class must be between >= 0 and <= 65535"
+ raise ValueError("class must be between >= 0 and <= 65535")
text = _by_value.get(value)
if text is None:
text = 'CLASS' + `value`
@param rdclass: the rdata class
@type rdclass: int
@rtype: bool"""
-
+
if _metaclasses.has_key(rdclass):
return True
return False
"""
__slots__ = ['rdclass', 'rdtype', 'covers', 'ttl']
-
+
def __init__(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
"""Create a new rdataset of the specified class and type.
@see: the description of the class instance variables for the
meaning of I{rdclass} and I{rdtype}"""
-
+
super(Rdataset, self).__init__()
self.rdclass = rdclass
self.rdtype = rdtype
to the specified TTL.
@param ttl: The TTL
@type ttl: int"""
-
+
if len(self) == 0:
self.ttl = ttl
elif ttl < self.ttl:
If the optional I{ttl} parameter is supplied, then
self.update_ttl(ttl) will be called prior to adding the rdata.
-
+
@param rd: The rdata
@type rd: dns.rdata.Rdata object
@param ttl: The TTL
@type ttl: int"""
-
+
#
# If we're adding a signature, do some special handling to
# check that the signature covers the same type as the
@param other: The rdataset from which to update
@type other: dns.rdataset.Rdataset object"""
-
+
self.update_ttl(other.ttl)
super(Rdataset, self).update(other)
ctext = '(' + dns.rdatatype.to_text(self.covers) + ')'
return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \
dns.rdatatype.to_text(self.rdtype) + ctext + ' rdataset>'
-
+
def __str__(self):
return self.to_text()
"""Two rdatasets are equal if they have the same class, type, and
covers, and contain the same rdata.
@rtype: bool"""
-
+
if not isinstance(other, Rdataset):
return False
if self.rdclass != other.rdclass or \
self.covers != other.covers:
return False
return super(Rdataset, self).__eq__(other)
-
+
def __ne__(self, other):
return not self.__eq__(other)
Any additional keyword arguments are passed on to the rdata
to_text() method.
-
+
@param name: If name is not None, emit a RRs with I{name} as
the owner name.
@type name: dns.name.Name object
file.write(stuff)
file.seek(0, 2)
return len(self)
-
+
def match(self, rdclass, rdtype, covers):
"""Returns True if this rdataset matches the specified class, type,
and covers"""
rd = dns.rdata.from_text(r.rdclass, r.rdtype, t)
r.add(rd)
return r
-
+
def from_text(rdclass, rdtype, ttl, *text_rdatas):
"""Create an rdataset with the specified class, type, and TTL, and with
the specified rdatas in text format.
-
+
@rtype: dns.rdataset.Rdataset object
"""
def from_rdata_list(ttl, rdatas):
"""Create an rdataset with the specified TTL, and with
the specified list of rdata objects.
-
+
@rtype: dns.rdataset.Rdataset object
"""
if len(rdatas) == 0:
- raise ValueError, "rdata list must not be empty"
+ raise ValueError("rdata list must not be empty")
r = None
for rd in rdatas:
if r is None:
first_time = False
r.add(rd)
return r
-
+
def from_rdata(ttl, *rdatas):
"""Create an rdataset with the specified TTL, and with
the specified rdata objects.
-
+
@rtype: dns.rdataset.Rdataset object
"""
raise UnknownRdatatype
value = int(match.group(1))
if value < 0 or value > 65535:
- raise ValueError, "type must be between >= 0 and <= 65535"
+ raise ValueError("type must be between >= 0 and <= 65535")
return value
def to_text(value):
@rtype: string"""
if value < 0 or value > 65535:
- raise ValueError, "type must be between >= 0 and <= 65535"
+ raise ValueError("type must be between >= 0 and <= 65535")
text = _by_value.get(value)
if text is None:
text = 'TYPE' + `value`
key_tag = tok.get_uint16()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
if algorithm < 0 or algorithm > 255:
- raise dns.exception.SyntaxError, "bad algorithm type"
+ raise dns.exception.SyntaxError("bad algorithm type")
chunks = []
while 1:
t = tok.get().unescape()
algorithm = tok.get_uint8()
hit = tok.get_string().decode('hex-codec')
if len(hit) > 255:
- raise dns.exception.SyntaxError, "HIT too long"
+ raise dns.exception.SyntaxError("HIT too long")
key = tok.get_string().decode('base64-codec')
servers = []
while 1:
exp = i - 1
break
if exp is None or exp < 0:
- raise dns.exception.SyntaxError, "%s value out of bounds" % desc
+ raise dns.exception.SyntaxError("%s value out of bounds" % desc)
return exp
def _float_to_tuple(what):
def _decode_size(what, desc):
exponent = what & 0x0F
if exponent > 9:
- raise dns.exception.SyntaxError, "bad %s exponent" % desc
+ raise dns.exception.SyntaxError("bad %s exponent" % desc)
base = (what & 0xF0) >> 4
if base > 9:
- raise dns.exception.SyntaxError, "bad %s base" % desc
+ raise dns.exception.SyntaxError("bad %s base" % desc)
return long(base) * pow(10, exponent)
class LOC(dns.rdata.Rdata):
if '.' in t:
(seconds, milliseconds) = t.split('.')
if not seconds.isdigit():
- raise dns.exception.SyntaxError, \
- 'bad latitude seconds value'
+ raise dns.exception.SyntaxError('bad latitude seconds value')
latitude[2] = int(seconds)
if latitude[2] >= 60:
- raise dns.exception.SyntaxError, \
- 'latitude seconds >= 60'
+ raise dns.exception.SyntaxError('latitude seconds >= 60')
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
- raise dns.exception.SyntaxError, \
- 'bad latitude milliseconds value'
+ raise dns.exception.SyntaxError('bad latitude milliseconds value')
if l == 1:
m = 100
elif l == 2:
if t == 'S':
latitude[0] *= -1
elif t != 'N':
- raise dns.exception.SyntaxError, 'bad latitude hemisphere value'
+ raise dns.exception.SyntaxError('bad latitude hemisphere value')
longitude[0] = tok.get_int()
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
if not seconds.isdigit():
- raise dns.exception.SyntaxError, \
- 'bad longitude seconds value'
+ raise dns.exception.SyntaxError('bad longitude seconds value')
longitude[2] = int(seconds)
if longitude[2] >= 60:
- raise dns.exception.SyntaxError, \
- 'longitude seconds >= 60'
+ raise dns.exception.SyntaxError('longitude seconds >= 60')
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
- raise dns.exception.SyntaxError, \
- 'bad longitude milliseconds value'
+ raise dns.exception.SyntaxError('bad longitude milliseconds value')
if l == 1:
m = 100
elif l == 2:
if t == 'W':
longitude[0] *= -1
elif t != 'E':
- raise dns.exception.SyntaxError, 'bad longitude hemisphere value'
+ raise dns.exception.SyntaxError('bad longitude hemisphere value')
t = tok.get_string()
if t[-1] == 'm':
else:
latitude = -1 * float(0x80000000L - latitude) / 3600000
if latitude < -90.0 or latitude > 90.0:
- raise dns.exception.FormError, "bad latitude"
+ raise dns.exception.FormError("bad latitude")
if longitude > 0x80000000L:
longitude = float(longitude - 0x80000000L) / 3600000
else:
longitude = -1 * float(0x80000000L - longitude) / 3600000
if longitude < -180.0 or longitude > 180.0:
- raise dns.exception.FormError, "bad longitude"
+ raise dns.exception.FormError("bad longitude")
altitude = float(altitude) - 10000000.0
size = _decode_size(size, "size")
hprec = _decode_size(hprec, "horizontal precision")
break
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
- raise dns.exception.SyntaxError, "NSEC with bit 0"
+ raise dns.exception.SyntaxError("NSEC with bit 0")
if nrdtype > 65535:
- raise dns.exception.SyntaxError, "NSEC with bit > 65535"
+ raise dns.exception.SyntaxError("NSEC with bit > 65535")
rdtypes.append(nrdtype)
rdtypes.sort()
window = 0
windows = []
while rdlen > 0:
if rdlen < 3:
- raise dns.exception.FormError, "NSEC too short"
+ raise dns.exception.FormError("NSEC too short")
window = ord(wire[current])
octets = ord(wire[current + 1])
if octets == 0 or octets > 32:
- raise dns.exception.FormError, "bad NSEC octets"
+ raise dns.exception.FormError("bad NSEC octets")
current += 2
rdlen -= 2
if rdlen < octets:
- raise dns.exception.FormError, "bad NSEC bitmap length"
+ raise dns.exception.FormError("bad NSEC bitmap length")
bitmap = wire[current : current + octets]
current += octets
rdlen -= octets
break
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
- raise dns.exception.SyntaxError, "NSEC3 with bit 0"
+ raise dns.exception.SyntaxError("NSEC3 with bit 0")
if nrdtype > 65535:
- raise dns.exception.SyntaxError, "NSEC3 with bit > 65535"
+ raise dns.exception.SyntaxError("NSEC3 with bit > 65535")
rdtypes.append(nrdtype)
rdtypes.sort()
window = 0
windows = []
while rdlen > 0:
if rdlen < 3:
- raise dns.exception.FormError, "NSEC3 too short"
+ raise dns.exception.FormError("NSEC3 too short")
window = ord(wire[current])
octets = ord(wire[current + 1])
if octets == 0 or octets > 32:
- raise dns.exception.FormError, "bad NSEC3 octets"
+ raise dns.exception.FormError("bad NSEC3 octets")
current += 2
rdlen -= 2
if rdlen < octets:
- raise dns.exception.FormError, "bad NSEC3 bitmap length"
+ raise dns.exception.FormError("bad NSEC3 bitmap length")
bitmap = wire[current : current + octets]
current += octets
rdlen -= octets
else:
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
- raise dns.exception.SyntaxError, "NXT with bit 0"
+ raise dns.exception.SyntaxError("NXT with bit 0")
if nrdtype > 127:
- raise dns.exception.SyntaxError, "NXT with bit > 127"
+ raise dns.exception.SyntaxError("NXT with bit > 127")
i = nrdtype // 8
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (nrdtype % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
super(IPSECKEY, self).__init__(rdclass, rdtype)
if gateway_type == 0:
if gateway != '.' and not gateway is None:
- raise SyntaxError, 'invalid gateway for gateway type 0'
+ raise SyntaxError('invalid gateway for gateway type 0')
gateway = None
elif gateway_type == 1:
# check that it's OK
elif gateway_type == 3:
pass
else:
- raise SyntaxError, \
- 'invalid IPSECKEY gateway type: %d' % gateway_type
+ raise SyntaxError('invalid IPSECKEY gateway type: %d' % gateway_type)
self.precedence = precedence
self.gateway_type = gateway_type
self.algorithm = algorithm
elif self.gateway_type == 3:
gateway = str(self.gateway.choose_relativity(origin, relativize))
else:
- raise ValueError, 'invalid gateway type'
+ raise ValueError('invalid gateway type')
return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
self.algorithm, gateway,
dns.rdata._base64ify(self.key))
elif self.gateway_type == 3:
self.gateway.to_wire(file, None, origin)
else:
- raise ValueError, 'invalid gateway type'
+ raise ValueError('invalid gateway type')
file.write(self.key)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
current += cused
rdlen -= cused
else:
- raise dns.exception.FormError, 'invalid IPSECKEY gateway type'
+ raise dns.exception.FormError('invalid IPSECKEY gateway type')
key = wire[current : current + rdlen]
return cls(rdclass, rdtype, header[0], gateway_type, header[2],
gateway, key)
@see: RFC 1706"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(NSAP, self).__init__(rdclass, rdtype)
self.address = address
def to_text(self, origin=None, relativize=True, **kw):
return "0x%s" % self.address.encode('hex_codec')
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
t = tok.get_eol()
if address[0:2] != '0x':
- raise dns.exception.SyntaxError, 'string does not start with 0x'
+ raise dns.exception.SyntaxError('string does not start with 0x')
address = address[2:].replace('.', '')
if len(address) % 2 != 0:
- raise dns.exception.SyntaxError, 'hexstring has odd length'
+ raise dns.exception.SyntaxError('hexstring has odd length')
address = address.decode('hex_codec')
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(self.address)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = wire[current : current + rdlen]
return cls(rdclass, rdtype, address)
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
- raise NotImplementedError, "protocol must be TCP or UDP"
+ raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
for flag in flag_names:
v = _flags_from_text.get(flag)
if v is None:
- raise dns.exception.SyntaxError, 'unknown flag %s' % flag
+ raise dns.exception.SyntaxError('unknown flag %s' % flag)
flags &= ~v[1]
flags |= v[0]
protocol = tok.get_string()
else:
protocol = _protocol_from_text.get(protocol)
if protocol is None:
- raise dns.exception.SyntaxError, \
- 'unknown protocol %s' % protocol
+ raise dns.exception.SyntaxError('unknown protocol %s' % protocol)
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
chunks = []
if token.is_eol_or_eof():
break
if not (token.is_quoted_string() or token.is_identifier()):
- raise dns.exception.SyntaxError, "expected a string"
+ raise dns.exception.SyntaxError("expected a string")
if len(token.value) > 255:
- raise dns.exception.SyntaxError, "string too long"
+ raise dns.exception.SyntaxError("string too long")
strings.append(token.value)
if len(strings) == 0:
raise dns.exception.UnexpectedEnd
elif attr == 'rdtype':
return self.rrset.rdtype
else:
- raise AttributeError, attr
+ raise AttributeError(attr)
def __len__(self):
return len(self.rrset)
since the epoch.)
@type next_cleaning: float
"""
-
+
def __init__(self, cleaning_interval=300.0):
"""Initialize a DNS cache.
cleanings. The default is 300.0
@type cleaning_interval: float.
"""
-
+
self.data = {}
self.cleaning_interval = cleaning_interval
self.next_cleaning = time.time() + self.cleaning_interval
def maybe_clean(self):
"""Clean the cache if it's time to do so."""
-
+
now = time.time()
if self.next_cleaning <= now:
keys_to_delete = []
del self.data[k]
now = time.time()
self.next_cleaning = now + self.cleaning_interval
-
+
def get(self, key):
"""Get the answer associated with I{key}. Returns None if
no answer is cached for the key.
query name, rdtype, and rdclass.
@rtype: dns.resolver.Answer object or None
"""
-
+
self.maybe_clean()
v = self.data.get(key)
if v is None or v.expiration <= time.time():
@param value: The answer being cached
@type value: dns.resolver.Answer object
"""
-
+
self.maybe_clean()
self.data[key] = value
@param key: the key to flush
@type key: (dns.name.Name, int, int) tuple or None
"""
-
+
if not key is None:
if self.data.has_key(key):
del self.data[key]
of the appropriate type, or strings that can be converted into objects
of the appropriate type. E.g. For I{rdtype} the integer 2 and the
the string 'NS' both mean to query for records with DNS rdata type NS.
-
+
@param qname: the query name
@type qname: dns.name.Name object or string
@param rdtype: the query type
@raises NoAnswer: the response did not contain an answer
@raises NoNameservers: no non-broken nameservers are available to
answer the question."""
-
+
if isinstance(qname, (str, unicode)):
qname = dns.name.from_text(qname, None)
if isinstance(rdtype, str):
if resolver is None:
resolver = get_default_resolver()
if not name.is_absolute():
- raise NotAbsolute, name
+ raise NotAbsolute(name)
while 1:
try:
answer = resolver.query(name, dns.rdatatype.SOA, rdclass, tcp)
# run through inet_aton() to check syntax and make pretty.
return dns.ipv6.inet_ntoa(dns.ipv6.inet_aton(text))
else:
- raise dns.exception.SyntaxError, 'unknown reverse-map address family'
+ raise dns.exception.SyntaxError('unknown reverse-map address family')
"""
__slots__ = ['name', 'deleting']
-
+
def __init__(self, name, rdclass, rdtype, covers=dns.rdatatype.NONE,
deleting=None):
"""Create a new RRset."""
def __eq__(self, other):
"""Two RRsets are equal if they have the same name and the same
rdataset
-
+
@rtype: bool"""
if not isinstance(other, RRset):
return False
if self.name != name or self.deleting != deleting:
return False
return True
-
+
def to_text(self, origin=None, relativize=True, **kw):
"""Convert the RRset into DNS master file format.
Any additional keyword arguments are passed on to the rdata
to_text() method.
-
+
@param origin: The origin for relative names, or None.
@type origin: dns.name.Name object
@param relativize: True if names should names be relativized
@rtype: dns.rrset.RRset object
"""
-
+
if isinstance(name, (str, unicode)):
name = dns.name.from_text(name, None)
if isinstance(rdclass, str):
rd = dns.rdata.from_text(r.rdclass, r.rdtype, t)
r.add(rd)
return r
-
+
def from_text(name, ttl, rdclass, rdtype, *text_rdatas):
"""Create an RRset with the specified name, TTL, class, and type and with
the specified rdatas in text format.
name = dns.name.from_text(name, None)
if len(rdatas) == 0:
- raise ValueError, "rdata list must not be empty"
+ raise ValueError("rdata list must not be empty")
r = None
for rd in rdatas:
if r is None:
first_time = False
r.add(rd)
return r
-
+
def from_rdata(name, ttl, *rdatas):
"""Create an RRset with the specified name and TTL, and with
the specified rdata objects.
@type items: list"""
__slots__ = ['items']
-
+
def __init__(self, items=None):
"""Initialize the set.
@param items: the initial set of items
@type items: any iterable or None
"""
-
+
self.items = []
if not items is None:
for item in items:
def __repr__(self):
return "dns.simpleset.Set(%s)" % repr(self.items)
-
+
def add(self, item):
"""Add an item to the set."""
if not item in self.items:
return new instances (e.g. union) once, and keep using them in
subclasses.
"""
-
+
cls = self.__class__
obj = cls.__new__(cls)
obj.items = list(self.items)
def __copy__(self):
"""Make a (shallow) copy of the set."""
return self._clone()
-
+
def copy(self):
"""Make a (shallow) copy of the set."""
return self._clone()
@type other: Set object
"""
if not isinstance(other, Set):
- raise ValueError, 'other must be a Set instance'
+ raise ValueError('other must be a Set instance')
if self is other:
return
for item in other.items:
@type other: Set object
"""
if not isinstance(other, Set):
- raise ValueError, 'other must be a Set instance'
+ raise ValueError('other must be a Set instance')
if self is other:
return
# we make a copy of the list so that we can remove items from
@type other: Set object
"""
if not isinstance(other, Set):
- raise ValueError, 'other must be a Set instance'
+ raise ValueError('other must be a Set instance')
if self is other:
self.items = []
else:
@type other: Set object
@rtype: the same type as I{self}
"""
-
+
obj = self._clone()
obj.union_update(other)
return obj
@rtype: bool
"""
-
+
if not isinstance(other, Set):
- raise ValueError, 'other must be a Set instance'
+ raise ValueError('other must be a Set instance')
for item in self.items:
if not item in other.items:
return False
"""
if not isinstance(other, Set):
- raise ValueError, 'other must be a Set instance'
+ raise ValueError('other must be a Set instance')
for item in other.items:
if not item in self.items:
return False
return Token(COMMENT, token)
elif c == '':
if self.multiline:
- raise dns.exception.SyntaxError, \
- 'unbalanced parentheses'
+ raise dns.exception.SyntaxError('unbalanced parentheses')
return Token(EOF)
elif self.multiline:
self.skip_whitespace()
raise dns.exception.SyntaxError
c = chr(int(c) * 100 + int(c2) * 10 + int(c3))
elif c == '\n':
- raise dns.exception.SyntaxError, 'newline in quoted string'
+ raise dns.exception.SyntaxError('newline in quoted string')
elif c == '\\':
#
# It's an escape. Put it and the next character into
token += c
if token == '' and ttype != QUOTED_STRING:
if self.multiline:
- raise dns.exception.SyntaxError, 'unbalanced parentheses'
+ raise dns.exception.SyntaxError('unbalanced parentheses')
ttype = EOF
return Token(ttype, token, has_escape)
token = self.get().unescape()
if not token.is_identifier():
- raise dns.exception.SyntaxError, 'expecting an identifier'
+ raise dns.exception.SyntaxError('expecting an identifier')
if not token.value.isdigit():
- raise dns.exception.SyntaxError, 'expecting an integer'
+ raise dns.exception.SyntaxError('expecting an integer')
return int(token.value)
def get_uint8(self):
value = self.get_int()
if value < 0 or value > 255:
- raise dns.exception.SyntaxError, \
- '%d is not an unsigned 8-bit integer' % value
+ raise dns.exception.SyntaxError('%d is not an unsigned 8-bit integer' % value)
return value
def get_uint16(self):
value = self.get_int()
if value < 0 or value > 65535:
- raise dns.exception.SyntaxError, \
- '%d is not an unsigned 16-bit integer' % value
+ raise dns.exception.SyntaxError('%d is not an unsigned 16-bit integer' % value)
return value
def get_uint32(self):
token = self.get().unescape()
if not token.is_identifier():
- raise dns.exception.SyntaxError, 'expecting an identifier'
+ raise dns.exception.SyntaxError('expecting an identifier')
if not token.value.isdigit():
- raise dns.exception.SyntaxError, 'expecting an integer'
+ raise dns.exception.SyntaxError('expecting an integer')
value = long(token.value)
if value < 0 or value > 4294967296L:
- raise dns.exception.SyntaxError, \
- '%d is not an unsigned 32-bit integer' % value
+ raise dns.exception.SyntaxError('%d is not an unsigned 32-bit integer' % value)
return value
def get_string(self, origin=None):
token = self.get().unescape()
if not (token.is_identifier() or token.is_quoted_string()):
- raise dns.exception.SyntaxError, 'expecting a string'
+ raise dns.exception.SyntaxError('expecting a string')
return token.value
def get_identifier(self, origin=None):
token = self.get().unescape()
if not token.is_identifier():
- raise dns.exception.SyntaxError, 'expecting an identifier'
+ raise dns.exception.SyntaxError('expecting an identifier')
return token.value
def get_name(self, origin=None):
token = self.get()
if not token.is_identifier():
- raise dns.exception.SyntaxError, 'expecting an identifier'
+ raise dns.exception.SyntaxError('expecting an identifier')
return dns.name.from_text(token.value, origin)
def get_eol(self):
token = self.get()
if not token.is_eol_or_eof():
- raise dns.exception.SyntaxError, \
- 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)
+ raise dns.exception.SyntaxError('expected EOL or EOF, got %d "%s"' % (token.ttype, token.value))
return token.value
def get_ttl(self):
token = self.get().unescape()
if not token.is_identifier():
- raise dns.exception.SyntaxError, 'expecting an identifier'
+ raise dns.exception.SyntaxError('expecting an identifier')
return dns.ttl.from_text(token.value)
pre_mac = algorithm_name + time_mac
ol = len(other_data)
if ol > 65535:
- raise ValueError, 'TSIG Other Data is > 65535 bytes'
+ raise ValueError('TSIG Other Data is > 65535 bytes')
post_mac = struct.pack('!HH', error, ol) + other_data
if first:
ctx.update(pre_mac)
elif error == BADTRUNC:
raise PeerBadTruncation
else:
- raise PeerError, 'unknown TSIG error code %d' % error
+ raise PeerError('unknown TSIG error code %d' % error)
time_low = time - fudge
time_high = time + fudge
if now < time_low or now > time_high:
if algorithm in hashes:
return (algorithm.to_digestable(), hashes[algorithm])
- raise NotImplementedError, "TSIG algorithm " + str(algorithm) + \
- " is not supported"
+ raise NotImplementedError("TSIG algorithm " + str(algorithm) +
+ " is not supported")
class BadTTL(dns.exception.SyntaxError):
pass
-
+
def from_text(text):
"""Convert the text form of a TTL to an integer.
@raises dns.ttl.BadTTL: the TTL is not well-formed
@rtype: int
"""
-
+
if text.isdigit():
total = long(text)
else:
elif c == 's':
total += current
else:
- raise BadTTL, "unknown unit '%s'" % c
+ raise BadTTL("unknown unit '%s'" % c)
current = 0
if not current == 0:
- raise BadTTL, "trailing integer"
+ raise BadTTL("trailing integer")
if total < 0L or total > 2147483647L:
- raise BadTTL, "TTL should be between 0 and 2^31 - 1 (inclusive)"
+ raise BadTTL("TTL should be between 0 and 2^31 - 1 (inclusive)")
return total
if isinstance(name, (str, unicode)):
name = dns.name.from_text(name, None)
elif not isinstance(name, dns.name.Name):
- raise KeyError, \
- "name parameter must be convertable to a DNS name"
+ raise KeyError("name parameter must be convertable to a DNS name")
if name.is_absolute():
if not name.is_subdomain(self.origin):
- raise KeyError, \
- "name parameter must be a subdomain of the zone origin"
+ raise KeyError("name parameter must be a subdomain of the zone origin")
if self.relativize:
name = name.relativize(self.origin)
return name
"""
if replacement.rdclass != self.rdclass:
- raise ValueError, 'replacement.rdclass != zone.rdclass'
+ raise ValueError('replacement.rdclass != zone.rdclass')
node = self.find_node(name, True)
node.replace_rdataset(replacement)
except:
rdclass = self.zone.rdclass
if rdclass != self.zone.rdclass:
- raise dns.exception.SyntaxError, "RR class is not zone's class"
+ raise dns.exception.SyntaxError("RR class is not zone's class")
# Type
try:
rdtype = dns.rdatatype.from_text(token.value)
except:
- raise dns.exception.SyntaxError, \
- "unknown rdatatype '%s'" % token.value
+ raise dns.exception.SyntaxError("unknown rdatatype '%s'" % token.value)
n = self.zone.nodes.get(name)
if n is None:
n = self.zone.node_factory()
except dns.exception.SyntaxError:
# Catch and reraise.
(ty, va) = sys.exc_info()[:2]
- raise ty, va
+ raise va
except:
# All exceptions that occur in the processing of rdata
# are treated as syntax errors. This is not strictly
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
(ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError, \
- "caught exception %s: %s" % (str(ty), str(va))
+ raise dns.exception.SyntaxError("caught exception %s: %s" % (str(ty), str(va)))
rd.choose_relativity(self.zone.origin, self.relativize)
covers = rd.covers()
if u == '$TTL':
token = self.tok.get()
if not token.is_identifier():
- raise dns.exception.SyntaxError, "bad $TTL"
+ raise dns.exception.SyntaxError("bad $TTL")
self.ttl = dns.ttl.from_text(token.value)
self.tok.get_eol()
elif u == '$ORIGIN':
elif u == '$INCLUDE' and self.allow_include:
token = self.tok.get()
if not token.is_quoted_string():
- raise dns.exception.SyntaxError, \
- "bad filename in $INCLUDE"
+ raise dns.exception.SyntaxError("bad filename in $INCLUDE")
filename = token.value
token = self.tok.get()
if token.is_identifier():
self.current_origin)
self.tok.get_eol()
elif not token.is_eol_or_eof():
- raise dns.exception.SyntaxError, \
- "bad origin in $INCLUDE"
+ raise dns.exception.SyntaxError("bad origin in $INCLUDE")
else:
new_origin = self.current_origin
self.saved_state.append((self.tok,
filename)
self.current_origin = new_origin
else:
- raise dns.exception.SyntaxError, \
- "Unknown master file directive '" + u + "'"
+ raise dns.exception.SyntaxError("Unknown master file directive '" + u + "'")
continue
self.tok.unget(token)
self._rr_line()
(filename, line_number) = self.tok.where()
if detail is None:
detail = "syntax error"
- raise dns.exception.SyntaxError, \
- "%s:%d: %s" % (filename, line_number, detail)
+ raise dns.exception.SyntaxError("%s:%d: %s" % (filename, line_number, detail))
# Now that we're done reading, do some basic checking of the zone.
if self.check_origin: