'ttl',
'rdtypes',
'update',
+ 'util',
'version',
'zone',
]
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.
-_algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
+_algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.items()])
class UnknownAlgorithm(Exception):
"""Raised if an algorithm is unknown."""
if len(dlabels) != len(name.labels):
raise dns.exception.SyntaxError('non-digit labels in ENUM domain name')
dlabels.reverse()
- text = ''.join(dlabels)
+ text = ''.join([x.decode('ascii') for x in dlabels])
if want_plus_prefix:
text = '+' + text
return text
"""
raise NotImplementedError
+ @classmethod
def from_wire(cls, otype, wire, current, olen):
"""Build an EDNS option object from wire format
@rtype: dns.ends.Option instance"""
raise NotImplementedError
- from_wire = classmethod(from_wire)
-
def _cmp(self, other):
"""Compare an ENDS option with another option of the same type.
Return < 0 if self < other, 0 if self == other, and > 0 if self > other.
def to_wire(self, file):
file.write(self.data)
+ def _cmp(self, other):
+ return cmp(self.data, other.data)
+
+ @classmethod
def from_wire(cls, otype, wire, current, olen):
return cls(otype, wire[current : current + olen])
- from_wire = classmethod(from_wire)
-
- def _cmp(self, other):
- return cmp(self.data, other.data)
-
_type_to_class = {
}
import md5
self.hash = md5.new()
self.hash_len = 16
- self.pool = '\0' * self.hash_len
+ self.pool = bytearray(self.hash_len)
if not seed is None:
self.stir(seed)
self.seeded = True
if not already_locked:
self.lock.acquire()
try:
- bytes = [ord(c) for c in self.pool]
for c in entropy:
if self.pool_index == self.hash_len:
self.pool_index = 0
- b = ord(c) & 0xff
- bytes[self.pool_index] ^= b
+ self.pool[self.pool_index] ^= c
self.pool_index += 1
- self.pool = ''.join([chr(c) for c in bytes])
finally:
if not already_locked:
self.lock.release()
seed = os.urandom(16)
except:
try:
- r = file('/dev/urandom', 'r', 0)
+ r = open('/dev/urandom', 'rb', 0)
try:
seed = r.read(16)
finally:
r.close()
except:
- seed = str(time.time())
+ seed = str(time.time()).encode('utf-8')
self.seeded = True
self.stir(seed, True)
self.digest = self.hash.digest()
self.stir(self.digest, True)
self.next_byte = 0
- value = ord(self.digest[self.next_byte])
+ value = self.digest[self.next_byte]
self.next_byte += 1
finally:
self.lock.release()
def random_between(self, first, last):
size = last - first + 1
- if size > 4294967296L:
+ if size > 4294967296:
raise ValueError('too big')
if size > 65536:
rand = self.random_32
- max = 4294967295L
+ max = 4294967295
elif size > 256:
rand = self.random_16
max = 65535
else:
rand = self.random_8
max = 255
- return (first + size * rand() // (max + 1))
+ return (first + size * rand() // (max + 1))
pool = EntropyPool()
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mappings not to be true inverses.
-_by_value = dict([(y, x) for x, y in _by_text.iteritems()])
+_by_value = dict([(y, x) for x, y in _by_text.items()])
-_edns_by_value = dict([(y, x) for x, y in _edns_by_text.iteritems()])
+_edns_by_value = dict([(y, x) for x, y in _edns_by_text.items()])
def _order_flags(table):
- order = list(table.iteritems())
- order.sort()
- order.reverse()
- return order
+ return sorted(table.items(), reverse=True)
_flags_order = _order_flags(_by_value)
import socket
import sys
-if sys.hexversion < 0x02030000 or sys.platform == 'win32':
+if sys.platform == 'win32':
#
- # Some versions of Python 2.2 have an inet_aton which rejects
- # the valid IP address '255.255.255.255'. It appears this
- # problem is still present on the Win32 platform even in 2.3.
- # We'll work around the problem.
+ # XXX Does the Win32 python 3 inet_aton still reject 255.255.255.255?
+ # Until we know it doesn't, we'll keep our workaround in place.
#
def inet_aton(text):
if text == '255.255.255.255':
- return '\xff' * 4
+ return b'\xff' * 4
else:
return socket.inet_aton(text)
else:
"""IPv6 helper functions."""
+import base64
import re
import dns.exception
import dns.ipv4
-_leading_zero = re.compile(r'0+([0-9a-f]+)')
-
def inet_ntoa(address):
"""Convert a network format IPv6 address into text.
if len(address) != 16:
raise ValueError("IPv6 addresses are 16 bytes long")
- hex = address.encode('hex_codec')
+ hex = str(base64.b16encode(address), encoding='utf_8').lower()
chunks = []
i = 0
l = len(hex)
while i < l:
- chunk = hex[i : i + 4]
- # strip leading zeros. we do this with an re instead of
- # with lstrip() because lstrip() didn't support chars until
- # python 2.2.2
- m = _leading_zero.match(chunk)
- if not m is None:
- chunk = m.group(1)
+ chunk = hex[i : i + 4].lstrip('0')
+ if chunk == '':
+ chunk = '0'
chunks.append(chunk)
i += 4
#
best_len = 0
start = -1
last_was_zero = False
- for i in xrange(8):
+ for i in range(8):
if chunks[i] != '0':
if last_was_zero:
end = i
if seen_empty:
raise dns.exception.SyntaxError
seen_empty = True
- for i in xrange(0, 8 - l + 1):
+ for i in range(0, 8 - l + 1):
canonical.append('0000')
else:
lc = len(c)
# Finally we can go to binary.
#
try:
- return text.decode('hex_codec')
+ return bytes.fromhex(text)
except TypeError:
raise dns.exception.SyntaxError
"""DNS Messages"""
-import cStringIO
+import io
import random
import struct
import sys
self.index = {}
def __repr__(self):
- return '<DNS message, ID ' + `self.id` + '>'
+ return '<DNS message, ID ' + str(self.id) + '>'
def __str__(self):
return self.to_text()
@rtype: string
"""
- s = cStringIO.StringIO()
- print >> s, 'id %d' % self.id
- print >> s, 'opcode %s' % \
- dns.opcode.to_text(dns.opcode.from_flags(self.flags))
+ s = io.StringIO()
+ print('id %d' % self.id, file=s)
+ print('opcode %s' % \
+ dns.opcode.to_text(dns.opcode.from_flags(self.flags)),
+ file=s)
rc = dns.rcode.from_flags(self.flags, self.ednsflags)
- print >> s, 'rcode %s' % dns.rcode.to_text(rc)
- print >> s, 'flags %s' % dns.flags.to_text(self.flags)
+ print('rcode %s' % dns.rcode.to_text(rc), file=s)
+ print('flags %s' % dns.flags.to_text(self.flags), file=s)
if self.edns >= 0:
- print >> s, 'edns %s' % self.edns
+ print('edns %s' % self.edns, file=s)
if self.ednsflags != 0:
- print >> s, 'eflags %s' % \
- dns.flags.edns_to_text(self.ednsflags)
- print >> s, 'payload', self.payload
+ print('eflags %s' % \
+ dns.flags.edns_to_text(self.ednsflags), file=s)
+ print('payload', self.payload, file=s)
is_update = dns.opcode.is_update(self.flags)
if is_update:
- print >> s, ';ZONE'
+ print(';ZONE', file=s)
else:
- print >> s, ';QUESTION'
+ print(';QUESTION', file=s)
for rrset in self.question:
- print >> s, rrset.to_text(origin, relativize, **kw)
+ print(rrset.to_text(origin, relativize, **kw), file=s)
if is_update:
- print >> s, ';PREREQ'
+ print(';PREREQ', file=s)
else:
- print >> s, ';ANSWER'
+ print(';ANSWER', file=s)
for rrset in self.answer:
- print >> s, rrset.to_text(origin, relativize, **kw)
+ print(rrset.to_text(origin, relativize, **kw), file=s)
if is_update:
- print >> s, ';UPDATE'
+ print(';UPDATE', file=s)
else:
- print >> s, ';AUTHORITY'
+ print(';AUTHORITY', file=s)
for rrset in self.authority:
- print >> s, rrset.to_text(origin, relativize, **kw)
- print >> s, ';ADDITIONAL'
+ print(rrset.to_text(origin, relativize, **kw), file=s)
+ print(';ADDITIONAL', file=s)
for rrset in self.additional:
- print >> s, rrset.to_text(origin, relativize, **kw)
+ print(rrset.to_text(origin, relativize, **kw), file=s)
#
# We strip off the final \n so the caller can print the result without
# doing weird things to get around eccentricities in Python print
if keyname is None:
self.keyname = self.keyring.keys()[0]
else:
- if isinstance(keyname, (str, unicode)):
+ if isinstance(keyname, str):
keyname = dns.name.from_text(keyname)
self.keyname = keyname
self.keyalgorithm = algorithm
options = []
else:
# make sure the EDNS version in ednsflags agrees with edns
- ednsflags &= 0xFF00FFFFL
+ ednsflags &= 0xFF00FFFF
ednsflags |= (edns << 16)
if options is None:
options = []
(value, evalue) = dns.rcode.to_flags(rcode)
self.flags &= 0xFFF0
self.flags |= value
- self.ednsflags &= 0x00FFFFFFL
+ self.ednsflags &= 0x00FFFFFF
self.ednsflags |= evalue
if self.ednsflags != 0 and self.edns < 0:
self.edns = 0
if self.updating and qcount > 1:
raise dns.exception.FormError
- for i in xrange(0, qcount):
+ for i in range(0, qcount):
(qname, used) = dns.name.from_wire(self.wire, self.current)
if not self.message.origin is None:
qname = qname.relativize(self.message.origin)
else:
force_unique = False
seen_opt = False
- for i in xrange(0, count):
+ for i in range(0, count):
rr_start = self.current
(name, used) = dns.name.from_wire(self.wire, self.current)
absolute_name = name
@raises dns.exception.SyntaxError:
@rtype: dns.message.Message object"""
- if sys.hexversion >= 0x02030000:
- # allow Unicode filenames; turn on universal newline support
- str_type = basestring
- opts = 'rU'
- else:
- str_type = str
- opts = 'r'
- if isinstance(f, str_type):
- f = file(f, opts)
+ if isinstance(f, str):
+ f = open(f, 'rU')
want_close = True
else:
want_close = False
@type want_dnssec: bool
@rtype: dns.message.Message object"""
- if isinstance(qname, (str, unicode)):
+ if isinstance(qname, str):
qname = dns.name.from_text(qname)
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
@type empty: dns.name.Name object
"""
-import cStringIO
+import encodings.idna
+import io
import struct
import sys
-if sys.hexversion >= 0x02030000:
- import encodings.idna
-
import dns.exception
+import dns.util
NAMERELN_NONE = 0
NAMERELN_SUPERDOMAIN = 1
or the empty name."""
pass
-_escaped = {
- '"' : True,
- '(' : True,
- ')' : True,
- '.' : True,
- ';' : True,
- '\\' : True,
- '@' : True,
- '$' : True
- }
+class LabelMixesUnicodeAndASCII(dns.exception.SyntaxError):
+ """Raised if a label mixes Unicode characters and ASCII escapes."""
+ pass
+
+_escaped = frozenset([ord(c) for c in '"().;\\@$'])
def _escapify(label):
"""Escape the characters in label which need it.
text = ''
for c in label:
if c in _escaped:
- text += '\\' + c
- elif ord(c) > 0x20 and ord(c) < 0x7F:
- text += c
+ text += '\\' + chr(c)
+ elif c > 0x20 and c < 0x7F:
+ text += chr(c)
else:
- text += '\\%03d' % ord(c)
+ text += '\\%03d' % c
return text
+def _bytesify(label):
+ if isinstance(label, str):
+ return label.encode('latin_1')
+ elif not isinstance(label, bytes):
+ raise ValueError('label is not a bytes or a string')
+ else:
+ return label
+
def _validate_labels(labels):
"""Check for empty labels in the middle of a label sequence,
labels that are too long, and for too many labels.
i = -1
j = 0
for label in labels:
+ if not isinstance(label, bytes):
+ raise ValueError("label is not a bytes object or a string")
ll = len(label)
total += ll + 1
if ll > 63:
raise LabelTooLong
- if i < 0 and label == '':
+ if i < 0 and label == b'':
i = j
j += 1
if total > 255:
def __init__(self, labels):
"""Initialize a domain name from a list of labels.
@param labels: the labels
- @type labels: any iterable whose values are strings
+ @type labels: any iterable whose values are bytes objects or strings
+ containing only ISO Latin 1 characters (i.e. characters whose unicode
+ code points have values <= 255).
"""
- super(Name, self).__setattr__('labels', tuple(labels))
- _validate_labels(self.labels)
+ labels = tuple([_bytesify(l) for l in labels])
+ _validate_labels(labels)
+ super(Name, self).__setattr__('labels', labels)
def __setattr__(self, name, value):
raise TypeError("object doesn't support attribute assignment")
@rtype: bool
"""
- return len(self.labels) > 0 and self.labels[-1] == ''
+ return len(self.labels) > 0 and self.labels[-1] == b''
def is_wild(self):
"""Is this name wild? (I.e. Is the least significant label '*'?)
@rtype: bool
"""
- return len(self.labels) > 0 and self.labels[0] == '*'
+ return len(self.labels) > 0 and self.labels[0] == b'*'
def __hash__(self):
"""Return a case-insensitive hash of the name.
@rtype: int
"""
- h = 0L
+ h = 0
for label in self.labels:
+ label = label.lower()
for c in label:
- h += ( h << 3 ) + ord(c.lower())
- return int(h % sys.maxint)
+ h += ( h << 3 ) + c
+ return int(h % 18446744073709551616)
def fullcompare(self, other):
"""Compare two names, returning a 3-tuple (relation, order, nlabels).
if len(self.labels) == 0:
return '@'
- if len(self.labels) == 1 and self.labels[0] == '':
+ if len(self.labels) == 1 and self.labels[0] == b'':
return '.'
if omit_final_dot and self.is_absolute():
l = self.labels[:-1]
"""
if len(self.labels) == 0:
- return u'@'
- if len(self.labels) == 1 and self.labels[0] == '':
- return u'.'
+ return '@'
+ if len(self.labels) == 1 and self.labels[0] == b'':
+ return '.'
if omit_final_dot and self.is_absolute():
l = self.labels[:-1]
else:
l = self.labels
- s = u'.'.join([encodings.idna.ToUnicode(_escapify(x)) for x in l])
+ s = '.'.join([encodings.idna.ToUnicode(_escapify(x)) for x in l])
return s
def to_digestable(self, origin=None):
@raises NeedAbsoluteNameOrOrigin: All names in wire format are
absolute. If self is a relative name, then an origin must be supplied;
if it is missing, then this exception is raised
- @rtype: string
+ @rtype: bytes
"""
if not self.is_absolute():
labels.extend(list(origin.labels))
else:
labels = self.labels
- dlabels = ["%s%s" % (chr(len(x)), x.lower()) for x in labels]
- return ''.join(dlabels)
+ ba = bytearray()
+ for label in labels:
+ ba.append(len(label))
+ ba.extend(label.lower())
+ return bytes(ba)
def to_wire(self, file = None, compress = None, origin = None):
"""Convert name to wire format, possibly compressing it.
@param file: the file where the name is emitted (typically
- a cStringIO file). If None, a string containing the wire name
+ a io.BytesIO file). If None, a string containing the wire name
will be returned.
- @type file: file or None
+ @type file: bytearray or None
@param compress: The compression table. If None (the default) names
will not be compressed.
@type compress: dict
"""
if file is None:
- file = cStringIO.StringIO()
+ file = io.BytesIO()
want_return = True
else:
want_return = False
pos = None
if not pos is None:
value = 0xc000 + pos
- s = struct.pack('!H', value)
- file.write(s)
+ dns.util.write_uint16(file, value)
break
else:
if not compress is None and len(n) > 1:
if pos < 0xc000:
compress[n] = pos
l = len(label)
- file.write(chr(l))
+ dns.util.write_uint8(file, l)
if l > 0:
file.write(label)
if want_return:
raise NoParent
return Name(self.labels[1:])
-root = Name([''])
+root = Name([b''])
empty = Name([])
-def from_unicode(text, origin = root):
+def from_text(text, origin = root):
"""Convert unicode text into a Name object.
Lables are encoded in IDN ACE form.
@rtype: dns.name.Name object
"""
- if not isinstance(text, unicode):
- raise ValueError("input to from_unicode() must be a unicode string")
- if not (origin is None or isinstance(origin, Name)):
- raise ValueError("origin must be a Name or None")
- labels = []
- label = u''
- escaping = False
- edigits = 0
- total = 0
- if text == u'@':
- text = u''
- if text:
- if text == u'.':
- return Name(['']) # no Unicode "u" on this constant!
- for c in text:
- if escaping:
- if edigits == 0:
- if c.isdigit():
- total = int(c)
- edigits += 1
- else:
- label += c
- escaping = False
- else:
- if not c.isdigit():
- raise BadEscape
- total *= 10
- total += int(c)
- edigits += 1
- if edigits == 3:
- escaping = False
- label += chr(total)
- elif c == u'.' or c == u'\u3002' or \
- c == u'\uff0e' or c == u'\uff61':
- if len(label) == 0:
- raise EmptyLabel
- labels.append(encodings.idna.ToASCII(label))
- label = u''
- elif c == u'\\':
- escaping = True
- edigits = 0
- total = 0
- else:
- label += c
- if escaping:
- raise BadEscape
- if len(label) > 0:
- labels.append(encodings.idna.ToASCII(label))
- else:
- labels.append('')
- if (len(labels) == 0 or labels[-1] != '') and not origin is None:
- labels.extend(list(origin.labels))
- return Name(labels)
-
-def from_text(text, origin = root):
- """Convert text into a Name object.
- @rtype: dns.name.Name object
- """
-
- if not isinstance(text, str):
- if isinstance(text, unicode) and sys.hexversion >= 0x02030000:
- return from_unicode(text, origin)
- else:
- raise ValueError("input to from_text() must be a string")
if not (origin is None or isinstance(origin, Name)):
raise ValueError("origin must be a Name or None")
labels = []
label = ''
escaping = False
+ seen_non_ascii = False
+ seen_non_ascii_escape = False
edigits = 0
total = 0
if text == '@':
text = ''
if text:
if text == '.':
- return Name([''])
+ return Name([b''])
for c in text:
if escaping:
if edigits == 0:
else:
label += c
escaping = False
+ if ord(c) > 127:
+ seen_non_ascii = True
else:
if not c.isdigit():
raise BadEscape
if edigits == 3:
escaping = False
label += chr(total)
- elif c == '.':
+ if total > 127:
+ seen_non_ascii_escape = True
+ elif c == '.' or c == '\u3002' or \
+ c == '\uff0e' or c == '\uff61':
if len(label) == 0:
raise EmptyLabel
- labels.append(label)
+ if seen_non_ascii:
+ if seen_non_ascii_escape:
+ raise LabelMixesUnicodeAndASCII
+ labels.append(encodings.idna.ToASCII(label))
+ else:
+ labels.append(label.encode('latin_1'))
label = ''
+ seen_non_ascii = False
+ seen_non_ascii_escape = False
elif c == '\\':
escaping = True
edigits = 0
total = 0
else:
label += c
+ if ord(c) > 127:
+ seen_non_ascii = True
if escaping:
raise BadEscape
if len(label) > 0:
- labels.append(label)
+ if seen_non_ascii:
+ if seen_non_ascii_escape:
+ raise LabelMixesUnicodeAndASCII
+ labels.append(encodings.idna.ToASCII(label))
+ else:
+ labels.append(label.encode('latin_1'))
else:
- labels.append('')
- if (len(labels) == 0 or labels[-1] != '') and not origin is None:
+ labels.append(b'')
+ if (len(labels) == 0 or labels[-1] != b'') and not origin is None:
labels.extend(list(origin.labels))
return Name(labels)
def from_wire(message, current):
"""Convert possibly compressed wire format into a Name.
@param message: the entire DNS message
- @type message: string
+ @type message: bytes
@param current: the offset of the beginning of the name from the start
of the message
@type current: int
@rtype: (dns.name.Name object, int) tuple
"""
- if not isinstance(message, str):
+ if not isinstance(message, bytes):
raise ValueError("input to from_wire() must be a byte string")
labels = []
biggest_pointer = current
hops = 0
- count = ord(message[current])
+ count = message[current]
current += 1
cused = 1
while count != 0:
if hops == 0:
cused += count
elif count >= 192:
- current = (count & 0x3f) * 256 + ord(message[current])
+ current = (count & 0x3f) * 256 + message[current]
if hops == 0:
cused += 1
if current >= biggest_pointer:
hops += 1
else:
raise BadLabelType
- count = ord(message[current])
+ count = message[current]
current += 1
if hops == 0:
cused += 1
- labels.append('')
+ labels.append(b'')
return (Name(labels), cused)
depth = len(name)
if depth > self.max_depth:
depth = self.max_depth
- for i in xrange(-depth, 0):
+ for i in range(-depth, 0):
n = dns.name.Name(name[i:])
- if self.has_key(n):
+ if n in self:
return (n, self[n])
v = self[dns.name.empty]
return (dns.name.empty, v)
"""DNS nodes. A node is a set of rdatasets."""
-import StringIO
+import io
import dns.rdataset
import dns.rdatatype
class Node(object):
"""A DNS node.
-
+
A node is a set of rdatasets
@ivar rdatasets: the node's rdatasets
@type rdatasets: list of dns.rdataset.Rdataset objects"""
__slots__ = ['rdatasets']
-
+
def __init__(self):
"""Initialize a DNS node.
"""
-
+
self.rdatasets = [];
def to_text(self, name, **kw):
@type name: dns.name.Name object
@rtype: string
"""
-
- s = StringIO.StringIO()
+
+ s = io.StringIO()
for rds in self.rdatasets:
- print >> s, rds.to_text(name, **kw)
+ print(rds.to_text(name, **kw), file=s)
return s.getvalue()[:-1]
def __repr__(self):
return '<DNS node ' + str(id(self)) + '>'
-
+
def __eq__(self, other):
"""Two nodes are equal if they have the same rdatasets.
def __ne__(self, other):
return not self.__eq__(other)
-
+
def __len__(self):
return len(self.rdatasets)
def replace_rdataset(self, replacement):
"""Replace an rdataset.
-
+
It is not an error if there is no rdataset matching I{replacement}.
Ownership of the I{replacement} object is transferred to the node;
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.
-_by_value = dict([(y, x) for x, y in _by_text.iteritems()])
+_by_value = dict([(y, x) for x, y in _by_text.items()])
class UnknownOpcode(dns.exception.DNSException):
(r, w, x) = select.select(ir, iw, ix)
else:
(r, w, x) = select.select(ir, iw, ix, timeout)
- except select.error, e:
+ except select.error as e:
if e.args[0] != errno.EINTR:
raise e
done = True
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be a true inverse.
-_by_value = dict([(y, x) for x, y in _by_text.iteritems()])
+_by_value = dict([(y, x) for x, y in _by_text.items()])
class UnknownRcode(dns.exception.DNSException):
if value < 0 or value > 4095:
raise ValueError('rcode must be >= 0 and <= 4095')
v = value & 0xf
- ev = long(value & 0xff0) << 20
+ ev = (value & 0xff0) << 20
return (v, ev)
def to_text(value):
chunk of hexstring that _hexify() produces before whitespace occurs.
@type _hex_chunk: int"""
-import cStringIO
+import base64
+import io
import dns.exception
import dns.rdataclass
import dns.rdatatype
import dns.tokenizer
+import dns.util
_hex_chunksize = 32
if chunksize is None:
chunksize = _hex_chunksize
- hex = data.encode('hex_codec')
+ hex = base64.b16encode(data).decode('ascii').lower()
l = len(hex)
if l > chunksize:
chunks = []
if chunksize is None:
chunksize = _base64_chunksize
- b64 = data.encode('base64_codec')
- b64 = b64.replace('\n', '')
+ b64 = base64.b64encode(data).decode('ascii')
l = len(b64)
if l > chunksize:
chunks = []
b64 = ' '.join(chunks)
return b64
-__escaped = {
- '"' : True,
- '\\' : True,
- }
+_escaped = frozenset('"\\')
def _escapify(qstring):
"""Escape the characters in a quoted string which need it.
@rtype: string
"""
+ if isinstance(qstring, bytes):
+ qstring = qstring.decode('latin_1')
text = ''
for c in qstring:
- if c in __escaped:
+ if c in _escaped:
text += '\\' + c
elif ord(c) >= 0x20 and ord(c) < 0x7F:
text += c
@rtype: string
"""
- for i in xrange(len(what) - 1, -1, -1):
- if what[i] != '\x00':
+ for i in range(len(what) - 1, -1, -1):
+ if what[i] != 0:
break
- return ''.join(what[0 : i + 1])
+ return bytes(what[0 : i + 1])
class Rdata(object):
"""Base class for all DNS rdata types.
def to_digestable(self, origin = None):
"""Convert rdata to a format suitable for digesting in hashes. This
is also the DNSSEC canonical form."""
- f = cStringIO.StringIO()
+ f = io.BytesIO()
self.to_wire(f, None, origin)
return f.getvalue()
return self._cmp(other) != 0
def __lt__(self, other):
- if not isinstance(other, Rdata) or \
- self.rdclass != other.rdclass or \
- self.rdtype != other.rdtype:
+ if not isinstance(other, Rdata):
return NotImplemented
+ if self.rdclass != other.rdclass or \
+ self.rdtype != other.rdtype:
+ return dns.util.cmp((self.rdclass, self.rdtype), (other.rdclass, other.rdtype))
return self._cmp(other) < 0
def __le__(self, other):
- if not isinstance(other, Rdata) or \
- self.rdclass != other.rdclass or \
- self.rdtype != other.rdtype:
+ if not isinstance(other, Rdata):
return NotImplemented
+ if self.rdclass != other.rdclass or \
+ self.rdtype != other.rdtype:
+ return dns.util.cmp((self.rdclass, self.rdtype), (other.rdclass, other.rdtype))
return self._cmp(other) <= 0
def __ge__(self, other):
- if not isinstance(other, Rdata) or \
- self.rdclass != other.rdclass or \
- self.rdtype != other.rdtype:
+ if not isinstance(other, Rdata):
return NotImplemented
+ if self.rdclass != other.rdclass or \
+ self.rdtype != other.rdtype:
+ return dns.util.cmp((self.rdclass, self.rdtype), (other.rdclass, other.rdtype))
return self._cmp(other) >= 0
def __gt__(self, other):
- if not isinstance(other, Rdata) or \
- self.rdclass != other.rdclass or \
- self.rdtype != other.rdtype:
+ if not isinstance(other, Rdata):
return NotImplemented
+ if self.rdclass != other.rdclass or \
+ self.rdtype != other.rdtype:
+ return dns.util.cmp((self.rdclass, self.rdtype), (other.rdclass, other.rdtype))
return self._cmp(other) > 0
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
"""Build an rdata object from text format.
raise NotImplementedError
- from_text = classmethod(from_text)
-
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
"""Build an rdata object from wire format
raise NotImplementedError
- from_wire = classmethod(from_wire)
-
def choose_relativity(self, origin = None, relativize = True):
"""Convert any domain names in the rdata to the specified
relativization.
def to_text(self, origin=None, relativize=True, **kw):
return r'\# %d ' % len(self.data) + _hexify(self.data)
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
token = tok.get()
if not token.is_identifier() or token.value != '\#':
+ print('XXX %u %u' % (rdclass, rdtype))
raise dns.exception.SyntaxError(r'generic rdata does not start with \#')
length = tok.get_int()
chunks = []
if token.is_eol_or_eof():
break
chunks.append(token.value)
- hex = ''.join(chunks)
- data = hex.decode('hex_codec')
+ data = bytes.fromhex(''.join(chunks))
if len(data) != length:
raise dns.exception.SyntaxError('generic rdata hex data has wrong length')
return cls(rdclass, rdtype, data)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
file.write(self.data)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
return cls(rdclass, rdtype, wire[current : current + rdlen])
- from_wire = classmethod(from_wire)
-
def _cmp(self, other):
- return cmp(self.data, other.data)
+ return dns.util.cmp(self.data, other.data)
_rdata_modules = {}
_module_prefix = 'dns.rdtypes'
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.
-_by_value = dict([(y, x) for x, y in _by_text.iteritems()])
+_by_value = dict([(y, x) for x, y in _by_text.items()])
# Now that we've built the inverse map, we can add class aliases to
# the _by_text mapping.
'HESIOD' : HS
})
-_metaclasses = {
- NONE : True,
- ANY : True
- }
+_metaclasses = frozenset([NONE, ANY])
_unknown_class_pattern = re.compile('CLASS([0-9]+)$', re.I);
raise ValueError("class must be between >= 0 and <= 65535")
text = _by_value.get(value)
if text is None:
- text = 'CLASS' + `value`
+ text = 'CLASS' + str(value)
return text
def is_metaclass(rdclass):
@type rdclass: int
@rtype: bool"""
- if _metaclasses.has_key(rdclass):
+ if rdclass in _metaclasses:
return True
return False
"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)"""
+import io
import random
-import StringIO
import struct
import dns.exception
else:
ntext = ''
pad = ''
- s = StringIO.StringIO()
+ s = io.StringIO()
if not override_rdclass is None:
rdclass = override_rdclass
else:
# some dynamic updates, so we don't need to print out the TTL
# (which is meaningless anyway).
#
- print >> s, '%s%s%s %s' % (ntext, pad,
- dns.rdataclass.to_text(rdclass),
- dns.rdatatype.to_text(self.rdtype))
+ print('%s%s%s %s' % (ntext, pad,
+ dns.rdataclass.to_text(rdclass),
+ dns.rdatatype.to_text(self.rdtype)),
+ file=s)
else:
for rd in self:
- print >> s, '%s%s%d %s %s %s' % \
+ print('%s%s%d %s %s %s' % \
(ntext, pad, self.ttl, dns.rdataclass.to_text(rdclass),
dns.rdatatype.to_text(self.rdtype),
- rd.to_text(origin=origin, relativize=relativize, **kw))
+ rd.to_text(origin=origin, relativize=relativize, **kw)),
+ file=s)
#
# We strip off the final \n for the caller's convenience in printing
#
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.
-_by_value = dict([(y, x) for x, y in _by_text.iteritems()])
+_by_value = dict([(y, x) for x, y in _by_text.items()])
+_metatypes = frozenset([OPT])
-_metatypes = {
- OPT : True
- }
-
-_singletons = {
- SOA : True,
- NXT : True,
- DNAME : True,
- NSEC : True,
- # CNAME is technically a singleton, but we allow multiple CNAMEs.
- }
+# CNAME is technically a singleton, but we allow multiple CNAMEs.
+_singletons = frozenset([SOA, NXT, DNAME, NSEC, NSEC3, NSEC3PARAM])
_unknown_type_pattern = re.compile('TYPE([0-9]+)$', re.I);
raise ValueError("type must be between >= 0 and <= 65535")
text = _by_value.get(value)
if text is None:
- text = 'TYPE' + `value`
+ text = 'TYPE' + str(value)
return text
def is_metatype(rdtype):
@type rdtype: int
@rtype: bool"""
- if rdtype >= TKEY and rdtype <= ANY or _metatypes.has_key(rdtype):
+ if rdtype >= TKEY and rdtype <= ANY or rdtype in _metatypes:
return True
return False
@type rdtype: int
@rtype: bool"""
- if _singletons.has_key(rdtype):
+ if rdtype in _singletons:
return True
return False
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import base64
+import io
import struct
import dns.exception
import dns.dnssec
import dns.rdata
import dns.tokenizer
+import dns.util
_ctype_by_value = {
1 : 'PKIX',
raise dns.exception.SyntaxError
chunks.append(t.value)
b64 = ''.join(chunks)
- certificate = b64.decode('base64_codec')
+ certificate = base64.b64decode(b64.encode('ascii'))
return cls(rdclass, rdtype, certificate_type, key_tag,
algorithm, certificate)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- f = cStringIO.StringIO()
+ f = io.BytesIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)
wire2 = f.getvalue()
f.close()
- return cmp(wire1, wire2)
+ return dns.util.cmp(wire1, wire2)
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
def _validate_float_string(what):
if what[0] == '-' or what[0] == '+':
raise dns.exception.FormError
if not right == '' and not right.isdigit():
raise dns.exception.FormError
-
+
class GPOS(dns.rdata.Rdata):
"""GPOS record
@see: RFC 1712"""
__slots__ = ['latitude', 'longitude', 'altitude']
-
+
def __init__(self, rdclass, rdtype, latitude, longitude, altitude):
super(GPOS, self).__init__(rdclass, rdtype)
if isinstance(latitude, float) or \
- isinstance(latitude, int) or \
- isinstance(latitude, long):
+ isinstance(latitude, int):
latitude = str(latitude)
if isinstance(longitude, float) or \
- isinstance(longitude, int) or \
- isinstance(longitude, long):
+ isinstance(longitude, int):
longitude = str(longitude)
if isinstance(altitude, float) or \
- isinstance(altitude, int) or \
- isinstance(altitude, long):
+ isinstance(altitude, int):
altitude = str(altitude)
_validate_float_string(latitude)
_validate_float_string(longitude)
def to_text(self, origin=None, relativize=True, **kw):
return '%s %s %s' % (self.latitude, self.longitude, self.altitude)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
latitude = tok.get_string()
longitude = tok.get_string()
altitude = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, latitude, longitude, altitude)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
l = len(self.latitude)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.latitude)
+ dns.util.write_uint8(file, l)
+ file.write(self.latitude.encode('latin_1'))
l = len(self.longitude)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.longitude)
+ dns.util.write_uint8(file, l)
+ file.write(self.longitude.encode('latin_1'))
l = len(self.altitude)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.altitude)
-
+ dns.util.write_uint8(file, l)
+ file.write(self.altitude.encode('latin_1'))
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
- latitude = wire[current : current + l]
+ latitude = wire[current : current + l].decode('latin_1')
current += l
rdlen -= l
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
- longitude = wire[current : current + l]
+ longitude = wire[current : current + l].decode('latin_1')
current += l
rdlen -= l
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
- altitude = wire[current : current + l]
+ altitude = wire[current : current + l].decode('latin_1')
return cls(rdclass, rdtype, latitude, longitude, altitude)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- v = cmp(self.latitude, other.latitude)
+ v = dns.util.cmp(self.latitude, other.latitude)
if v == 0:
- v = cmp(self.longitude, other.longitude)
+ v = dns.util.cmp(self.longitude, other.longitude)
if v == 0:
- v = cmp(self.altitude, other.altitude)
+ v = dns.util.cmp(self.altitude, other.altitude)
return v
def _get_float_latitude(self):
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
class HINFO(dns.rdata.Rdata):
"""HINFO record
@see: RFC 1035"""
__slots__ = ['cpu', 'os']
-
+
def __init__(self, rdclass, rdtype, cpu, os):
super(HINFO, self).__init__(rdclass, rdtype)
self.cpu = cpu
def to_text(self, origin=None, relativize=True, **kw):
return '"%s" "%s"' % (dns.rdata._escapify(self.cpu),
dns.rdata._escapify(self.os))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
cpu = tok.get_string()
os = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, cpu, os)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
l = len(self.cpu)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.cpu)
+ dns.util.write_uint8(file, l)
+ file.write(self.cpu.encode('latin_1'))
l = len(self.os)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.os)
-
+ dns.util.write_uint8(file, l)
+ file.write(self.os.encode('latin_1'))
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
- cpu = wire[current : current + l]
+ cpu = wire[current : current + l].decode('latin_1')
current += l
rdlen -= l
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
- os = wire[current : current + l]
+ os = wire[current : current + l].decode('latin_1')
return cls(rdclass, rdtype, cpu, os)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- v = cmp(self.cpu, other.cpu)
+ v = dns.util.cmp(self.cpu, other.cpu)
if v == 0:
- v = cmp(self.os, other.os)
+ v = dns.util.cmp(self.os, other.os)
return v
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import base64
+import io
import string
import struct
import dns.exception
import dns.rdata
import dns.rdatatype
+import dns.util
class HIP(dns.rdata.Rdata):
"""HIP record
self.servers = servers
def to_text(self, origin=None, relativize=True, **kw):
- hit = self.hit.encode('hex-codec')
- key = self.key.encode('base64-codec').replace('\n', '')
+ hit = base64.b16encode(self.hit).decode('ascii').lower()
+ key = base64.b64encode(self.key).decode('ascii')
text = ''
servers = []
for server in self.servers:
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
algorithm = tok.get_uint8()
- hit = tok.get_string().decode('hex-codec')
+ hit = bytes.fromhex(tok.get_string())
if len(hit) > 255:
raise dns.exception.SyntaxError("HIT too long")
- key = tok.get_string().decode('base64-codec')
+ key = base64.b64decode(tok.get_string().encode('ascii'))
servers = []
while 1:
token = tok.get()
self.servers = servers
def _cmp(self, other):
- b1 = cStringIO.StringIO()
+ b1 = io.BytesIO()
lh = len(self.hit)
lk = len(self.key)
b1.write(struct.pack("!BBH", lh, self.algorithm, lk))
b1.write(self.hit)
b1.write(self.key)
- b2 = cStringIO.StringIO()
+ b2 = io.BytesIO()
lh = len(other.hit)
lk = len(other.key)
b2.write(struct.pack("!BBH", lh, other.algorithm, lk))
b2.write(other.hit)
b2.write(other.key)
- v = cmp(b1.getvalue(), b2.getvalue())
+ v = dns.util.cmp(b1.getvalue(), b2.getvalue())
if v != 0:
return v
ls = len(self.servers)
count = min(ls, lo)
i = 0
while i < count:
- v = cmp(self.servers[i], other.servers[i])
+ v = dns.util.cmp(self.servers[i], other.servers[i])
if v != 0:
return v
i += 1
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
class ISDN(dns.rdata.Rdata):
"""ISDN record
return '"%s"' % dns.rdata._escapify(self.address)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
- address = tok.get_string()
+ address = tok.get_string().encode('ascii')
t = tok.get()
if not t.is_eol_or_eof():
tok.unget(t)
- subaddress = tok.get_string()
+ subaddress = tok.get_string().encode('ascii')
else:
tok.unget(t)
- subaddress = ''
+ subaddress = b''
tok.get_eol()
return cls(rdclass, rdtype, address, subaddress)
def to_wire(self, file, compress = None, origin = None):
l = len(self.address)
assert l < 256
- byte = chr(l)
- file.write(byte)
+ dns.util.write_uint8(file, l)
file.write(self.address)
l = len(self.subaddress)
if l > 0:
assert l < 256
- byte = chr(l)
- file.write(byte)
+ dns.util.write_uint8(file, l)
file.write(self.subaddress)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
current += l
rdlen -= l
if rdlen > 0:
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
subaddress = wire[current : current + l]
else:
- subaddress = ''
+ subaddress = b''
return cls(rdclass, rdtype, address, subaddress)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- v = cmp(self.address, other.address)
+ v = dns.util.cmp(self.address, other.address)
if v == 0:
- v = cmp(self.subaddress, other.subaddress)
+ v = dns.util.cmp(self.subaddress, other.subaddress)
return v
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import io
import struct
import dns.exception
import dns.rdata
+import dns.util
-_pows = (1L, 10L, 100L, 1000L, 10000L, 100000L, 1000000L, 10000000L,
- 100000000L, 1000000000L, 10000000000L)
+_pows = (1, 10, 100, 1000, 10000, 100000, 1000000, 10000000,
+ 100000000, 1000000000, 10000000000)
def _exponent_of(what, desc):
exp = None
- for i in xrange(len(_pows)):
- if what // _pows[i] == 0L:
+ for i in range(len(_pows)):
+ if what // _pows[i] == 0:
exp = i - 1
break
if exp is None or exp < 0:
what *= -1
else:
sign = 1
- what = long(round(what * 3600000))
- degrees = int(what // 3600000)
+ what = int(round(what * 3600000))
+ degrees = what // 3600000
what -= degrees * 3600000
- minutes = int(what // 60000)
+ minutes = what // 60000
what -= minutes * 60000
- seconds = int(what // 1000)
- what -= int(seconds * 1000)
- what = int(what)
+ seconds = what // 1000
+ what -= seconds * 1000
return (degrees * sign, minutes, seconds, what)
def _tuple_to_float(what):
return sign * value
def _encode_size(what, desc):
- what = long(what);
+ what = int(what);
exponent = _exponent_of(what, desc) & 0xF
base = what // pow(10, exponent) & 0xF
return base * 16 + exponent
base = (what & 0xF0) >> 4
if base > 9:
raise dns.exception.SyntaxError("bad %s base" % desc)
- return long(base) * pow(10, exponent)
+ return int(base) * pow(10, exponent)
class LOC(dns.rdata.Rdata):
"""LOC record
degrees. The other parameters are floats."""
super(LOC, self).__init__(rdclass, rdtype)
- if isinstance(latitude, int) or isinstance(latitude, long):
+ if isinstance(latitude, int):
latitude = float(latitude)
if isinstance(latitude, float):
latitude = _float_to_tuple(latitude)
self.latitude = latitude
- if isinstance(longitude, int) or isinstance(longitude, long):
+ if isinstance(longitude, int):
longitude = float(longitude)
if isinstance(longitude, float):
longitude = _float_to_tuple(longitude)
def to_wire(self, file, compress = None, origin = None):
if self.latitude[0] < 0:
sign = -1
- degrees = long(-1 * self.latitude[0])
+ degrees = -1 * self.latitude[0]
else:
sign = 1
- degrees = long(self.latitude[0])
+ degrees = self.latitude[0]
milliseconds = (degrees * 3600000 +
self.latitude[1] * 60000 +
self.latitude[2] * 1000 +
self.latitude[3]) * sign
- latitude = 0x80000000L + milliseconds
+ latitude = 0x80000000 + milliseconds
if self.longitude[0] < 0:
sign = -1
- degrees = long(-1 * self.longitude[0])
+ degrees = -1 * self.longitude[0]
else:
sign = 1
- degrees = long(self.longitude[0])
+ degrees = self.longitude[0]
milliseconds = (degrees * 3600000 +
self.longitude[1] * 60000 +
self.longitude[2] * 1000 +
self.longitude[3]) * sign
- longitude = 0x80000000L + milliseconds
- altitude = long(self.altitude) + 10000000L
+ longitude = 0x80000000 + milliseconds
+ altitude = int(self.altitude) + 10000000
size = _encode_size(self.size, "size")
hprec = _encode_size(self.horizontal_precision, "horizontal precision")
vprec = _encode_size(self.vertical_precision, "vertical precision")
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
struct.unpack("!BBBBIII", wire[current : current + rdlen])
- if latitude > 0x80000000L:
- latitude = float(latitude - 0x80000000L) / 3600000
+ if latitude > 0x80000000:
+ latitude = float(latitude - 0x80000000) / 3600000
else:
- latitude = -1 * float(0x80000000L - latitude) / 3600000
+ latitude = -1 * float(0x80000000 - latitude) / 3600000
if latitude < -90.0 or latitude > 90.0:
raise dns.exception.FormError("bad latitude")
- if longitude > 0x80000000L:
- longitude = float(longitude - 0x80000000L) / 3600000
+ if longitude > 0x80000000:
+ longitude = float(longitude - 0x80000000) / 3600000
else:
- longitude = -1 * float(0x80000000L - longitude) / 3600000
+ longitude = -1 * float(0x80000000 - longitude) / 3600000
if longitude < -180.0 or longitude > 180.0:
raise dns.exception.FormError("bad longitude")
altitude = float(altitude) - 10000000.0
from_wire = classmethod(from_wire)
def _cmp(self, other):
- f = cStringIO.StringIO()
+ f = io.BytesIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)
wire2 = f.getvalue()
f.close()
- return cmp(wire1, wire2)
+ return dns.util.cmp(wire1, wire2)
def _get_float_latitude(self):
return _tuple_to_float(self.latitude)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import io
import dns.exception
import dns.rdata
import dns.rdatatype
import dns.name
+import dns.util
class NSEC(dns.rdata.Rdata):
"""NSEC record
text = ''
for (window, bitmap) in self.windows:
bits = []
- for i in xrange(0, len(bitmap)):
- byte = ord(bitmap[i])
- for j in xrange(0, 8):
+ for i in range(0, len(bitmap)):
+ byte = bitmap[i]
+ for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(window * 256 + \
i * 8 + j))
window = 0
octets = 0
prior_rdtype = 0
- bitmap = ['\0'] * 32
+ bitmap = bytearray(32)
windows = []
for nrdtype in rdtypes:
if nrdtype == prior_rdtype:
prior_rdtype = nrdtype
new_window = nrdtype // 256
if new_window != window:
- windows.append((window, ''.join(bitmap[0:octets])))
- bitmap = ['\0'] * 32
+ windows.append((window, bytes(bitmap[0:octets])))
+ bitmap = bytearray(32)
window = new_window
offset = nrdtype % 256
- byte = offset / 8
+ byte = offset // 8
bit = offset % 8
octets = byte + 1
- bitmap[byte] = chr(ord(bitmap[byte]) | (0x80 >> bit))
- windows.append((window, ''.join(bitmap[0:octets])))
+ bitmap[byte] = bitmap[byte] | (0x80 >> bit)
+ windows.append((window, bytes(bitmap[0:octets])))
return cls(rdclass, rdtype, next, windows)
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
self.next.to_wire(file, None, origin)
for (window, bitmap) in self.windows:
- file.write(chr(window))
- file.write(chr(len(bitmap)))
+ dns.util.write_uint8(file, window)
+ dns.util.write_uint8(file, len(bitmap))
file.write(bitmap)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
while rdlen > 0:
if rdlen < 3:
raise dns.exception.FormError("NSEC too short")
- window = ord(wire[current])
- octets = ord(wire[current + 1])
+ window = wire[current]
+ octets = wire[current + 1]
if octets == 0 or octets > 32:
raise dns.exception.FormError("bad NSEC octets")
current += 2
self.next = self.next.choose_relativity(origin, relativize)
def _cmp(self, other):
- v = cmp(self.next, other.next)
+ v = dns.util.cmp(self.next, other.next)
if v == 0:
- b1 = cStringIO.StringIO()
+ b1 = io.BytesIO()
for (window, bitmap) in self.windows:
- b1.write(chr(window))
- b1.write(chr(len(bitmap)))
+ dns.util.write_uint8(b1, window)
+ dns.util.write_uint8(b1, len(bitmap))
b1.write(bitmap)
- b2 = cStringIO.StringIO()
+ b2 = io.BytesIO()
for (window, bitmap) in other.windows:
- b2.write(chr(window))
- b2.write(chr(len(bitmap)))
+ dns.util.write_uint8(b2, window)
+ dns.util.write_uint8(b2, len(bitmap))
b2.write(bitmap)
- v = cmp(b1.getvalue(), b2.getvalue())
+ v = dns.util.cmp(b1.getvalue(), b2.getvalue())
return v
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import base64
-import cStringIO
+import io
import string
import struct
import dns.exception
import dns.rdata
import dns.rdatatype
+import dns.util
-b32_hex_to_normal = string.maketrans('0123456789ABCDEFGHIJKLMNOPQRSTUV',
- 'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
-b32_normal_to_hex = string.maketrans('ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
- '0123456789ABCDEFGHIJKLMNOPQRSTUV')
+b32_hex_to_normal = bytes.maketrans(b'0123456789ABCDEFGHIJKLMNOPQRSTUV',
+ b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
+b32_normal_to_hex = bytes.maketrans(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
+ b'0123456789ABCDEFGHIJKLMNOPQRSTUV')
# hash algorithm constants
SHA1 = 1
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(b32_normal_to_hex).lower()
- if self.salt == '':
+ next = next.decode('ascii')
+ if self.salt == b'':
salt = '-'
else:
- salt = self.salt.encode('hex-codec')
+ salt = base64.b16encode(self.salt).decode('ascii').lower()
text = ''
for (window, bitmap) in self.windows:
bits = []
- for i in xrange(0, len(bitmap)):
- byte = ord(bitmap[i])
- for j in xrange(0, 8):
+ for i in range(0, len(bitmap)):
+ byte = bitmap[i]
+ for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(window * 256 + \
i * 8 + j))
iterations = tok.get_uint16()
salt = tok.get_string()
if salt == '-':
- salt = ''
+ salt = b''
else:
- salt = salt.decode('hex-codec')
- next = tok.get_string().upper().translate(b32_hex_to_normal)
+ salt = bytes.fromhex(salt)
+ next = tok.get_string().upper().encode('ascii')
+ next = next.translate(b32_hex_to_normal)
next = base64.b32decode(next)
rdtypes = []
while 1:
window = 0
octets = 0
prior_rdtype = 0
- bitmap = ['\0'] * 32
+ bitmap = bytearray(32)
windows = []
for nrdtype in rdtypes:
if nrdtype == prior_rdtype:
prior_rdtype = nrdtype
new_window = nrdtype // 256
if new_window != window:
- windows.append((window, ''.join(bitmap[0:octets])))
- bitmap = ['\0'] * 32
+ windows.append((window, bytes(bitmap[0:octets])))
+ bitmap = bytearray(32)
window = new_window
offset = nrdtype % 256
- byte = offset / 8
+ byte = offset // 8
bit = offset % 8
octets = byte + 1
- bitmap[byte] = chr(ord(bitmap[byte]) | (0x80 >> bit))
- windows.append((window, ''.join(bitmap[0:octets])))
+ bitmap[byte] = bitmap[byte] | (0x80 >> bit)
+ windows.append((window, bytes(bitmap[0:octets])))
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next, windows)
from_text = classmethod(from_text)
file.write(struct.pack("!B", l))
file.write(self.next)
for (window, bitmap) in self.windows:
- file.write(chr(window))
- file.write(chr(len(bitmap)))
+ dns.util.write_uint8(file, window)
+ dns.util.write_uint8(file, len(bitmap))
file.write(bitmap)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
salt = wire[current : current + slen]
current += slen
rdlen -= slen
- (nlen, ) = struct.unpack('!B', wire[current])
+ nlen = wire[current]
current += 1
rdlen -= 1
next = wire[current : current + nlen]
while rdlen > 0:
if rdlen < 3:
raise dns.exception.FormError("NSEC3 too short")
- window = ord(wire[current])
- octets = ord(wire[current + 1])
+ window = wire[current]
+ octets = wire[current + 1]
if octets == 0 or octets > 32:
raise dns.exception.FormError("bad NSEC3 octets")
current += 2
from_wire = classmethod(from_wire)
def _cmp(self, other):
- b1 = cStringIO.StringIO()
+ b1 = io.BytesIO()
self.to_wire(b1)
- b2 = cStringIO.StringIO()
+ b2 = io.BytesIO()
other.to_wire(b2)
- return cmp(b1.getvalue(), b2.getvalue())
+ return dns.util.cmp(b1.getvalue(), b2.getvalue())
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import base64
+import io
import struct
import dns.exception
import dns.rdata
+import dns.util
class NSEC3PARAM(dns.rdata.Rdata):
"""NSEC3PARAM record
self.salt = salt
def to_text(self, origin=None, relativize=True, **kw):
- if self.salt == '':
+ if self.salt == b'':
salt = '-'
else:
- salt = self.salt.encode('hex-codec')
+ salt = base64.b16encode(self.salt).decode('ascii').lower()
return '%u %u %u %s' % (self.algorithm, self.flags, self.iterations, salt)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
iterations = tok.get_uint16()
salt = tok.get_string()
if salt == '-':
- salt = ''
+ salt = b''
else:
- salt = salt.decode('hex-codec')
+ salt = bytes.fromhex(salt)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)
from_text = classmethod(from_text)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- b1 = cStringIO.StringIO()
+ b1 = io.BytesIO()
self.to_wire(b1)
- b2 = cStringIO.StringIO()
+ b2 = io.BytesIO()
other.to_wire(b2)
- return cmp(b1.getvalue(), b2.getvalue())
+ return dns.util.cmp(b1.getvalue(), b2.getvalue())
import dns.rdata
import dns.rdatatype
import dns.name
+import dns.util
class NXT(dns.rdata.Rdata):
"""NXT record
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
bits = []
- for i in xrange(0, len(self.bitmap)):
- byte = ord(self.bitmap[i])
- for j in xrange(0, 8):
+ for i in range(0, len(self.bitmap)):
+ byte = self.bitmap[i]
+ for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(i * 8 + j))
text = ' '.join(bits)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
next = tok.get_name()
next = next.choose_relativity(origin, relativize)
- bitmap = ['\x00', '\x00', '\x00', '\x00',
- '\x00', '\x00', '\x00', '\x00',
- '\x00', '\x00', '\x00', '\x00',
- '\x00', '\x00', '\x00', '\x00' ]
+ bitmap = bytearray(32)
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
if nrdtype > 127:
raise dns.exception.SyntaxError("NXT with bit > 127")
i = nrdtype // 8
- bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (nrdtype % 8)))
+ bitmap[i] = bitmap[i] | (0x80 >> (nrdtype % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, next, bitmap)
self.next = self.next.choose_relativity(origin, relativize)
def _cmp(self, other):
- v = cmp(self.next, other.next)
+ v = dns.util.cmp(self.next, other.next)
if v == 0:
- v = cmp(self.bitmap, other.bitmap)
+ v = dns.util.cmp(self.bitmap, other.bitmap)
return v
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class RP(dns.rdata.Rdata):
"""RP record
self.txt = self.txt.choose_relativity(origin, relativize)
def _cmp(self, other):
- v = cmp(self.mbox, other.mbox)
+ v = dns.util.cmp(self.mbox, other.mbox)
if v == 0:
- v = cmp(self.txt, other.txt)
+ v = dns.util.cmp(self.txt, other.txt)
return v
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class SOA(dns.rdata.Rdata):
"""SOA record
__slots__ = ['mname', 'rname', 'serial', 'refresh', 'retry', 'expire',
'minimum']
-
+
def __init__(self, rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum):
super(SOA, self).__init__(rdclass, rdtype)
return '%s %s %d %d %d %d %d' % (
mname, rname, self.serial, self.refresh, self.retry,
self.expire, self.minimum )
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
mname = tok.get_name()
rname = tok.get_name()
tok.get_eol()
return cls(rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum )
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
self.rname = self.rname.choose_relativity(origin, relativize)
def _cmp(self, other):
- v = cmp(self.mname, other.mname)
+ v = dns.util.cmp(self.mname, other.mname)
if v == 0:
- v = cmp(self.rname, other.rname)
+ v = dns.util.cmp(self.rname, other.rname)
if v == 0:
self_ints = struct.pack('!IIIII', self.serial, self.refresh,
self.retry, self.expire, self.minimum)
other_ints = struct.pack('!IIIII', other.serial, other.refresh,
other.retry, other.expire,
other.minimum)
- v = cmp(self_ints, other_ints)
+ v = dns.util.cmp(self_ints, other_ints)
return v
import dns.rdata
import dns.rdatatype
+import dns.util
class SSHFP(dns.rdata.Rdata):
"""SSHFP record
@see: draft-ietf-secsh-dns-05.txt"""
__slots__ = ['algorithm', 'fp_type', 'fingerprint']
-
+
def __init__(self, rdclass, rdtype, algorithm, fp_type,
fingerprint):
super(SSHFP, self).__init__(rdclass, rdtype)
self.fp_type,
dns.rdata._hexify(self.fingerprint,
chunksize=128))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
algorithm = tok.get_uint8()
fp_type = tok.get_uint8()
- fingerprint = tok.get_string()
- fingerprint = fingerprint.decode('hex_codec')
+ fingerprint = bytes.fromhex(tok.get_string())
tok.get_eol()
return cls(rdclass, rdtype, algorithm, fp_type, fingerprint)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!BB", self.algorithm, self.fp_type)
file.write(header)
file.write(self.fingerprint)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
header = struct.unpack("!BB", wire[current : current + 2])
current += 2
def _cmp(self, other):
hs = struct.pack("!BB", self.algorithm, self.fp_type)
ho = struct.pack("!BB", other.algorithm, other.fp_type)
- v = cmp(hs, ho)
+ v = dns.util.cmp(hs, ho)
if v == 0:
- v = cmp(self.fingerprint, other.fingerprint)
+ v = dns.util.cmp(self.fingerprint, other.fingerprint)
return v
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
class X25(dns.rdata.Rdata):
"""X25 record
@see: RFC 1183"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(X25, self).__init__(rdclass, rdtype)
self.address = address
def to_text(self, origin=None, relativize=True, **kw):
return '"%s"' % dns.rdata._escapify(self.address)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
l = len(self.address)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(self.address)
-
+ dns.util.write_uint8(file, l)
+ file.write(self.address.encode('latin_1'))
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
- address = wire[current : current + l]
+ address = wire[current : current + l].decode('latin_1')
return cls(rdclass, rdtype, address)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- return cmp(self.address, other.address)
+ return dns.util.cmp(self.address, other.address)
import dns.ipv4
import dns.rdata
import dns.tokenizer
+import dns.util
class A(dns.rdata.Rdata):
"""A record.
def _cmp(self, other):
sa = dns.ipv4.inet_aton(self.address)
oa = dns.ipv4.inet_aton(other.address)
- return cmp(sa, oa)
+ return dns.util.cmp(sa, oa)
import dns.inet
import dns.rdata
import dns.tokenizer
+import dns.util
class AAAA(dns.rdata.Rdata):
"""AAAA record.
def _cmp(self, other):
sa = dns.inet.inet_pton(dns.inet.AF_INET6, self.address)
oa = dns.inet.inet_pton(dns.inet.AF_INET6, other.address)
- return cmp(sa, oa)
+ return dns.util.cmp(sa, oa)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import io
import struct
import dns.exception
import dns.inet
import dns.rdata
import dns.tokenizer
+import dns.util
class APLItem(object):
"""An APL list item.
# Truncate least significant zero bytes.
#
last = 0
- for i in xrange(len(address) - 1, -1, -1):
+ for i in range(len(address) - 1, -1, -1):
if address[i] != chr(0):
last = i + 1
break
l = len(address)
if header[0] == 1:
if l < 4:
- address += '\x00' * (4 - l)
+ address += b'\x00' * (4 - l)
address = dns.inet.inet_ntop(dns.inet.AF_INET, address)
elif header[0] == 2:
if l < 16:
- address += '\x00' * (16 - l)
+ address += b'\x00' * (16 - l)
address = dns.inet.inet_ntop(dns.inet.AF_INET6, address)
else:
#
# This isn't really right according to the RFC, but it
# seems better than throwing an exception
#
- address = address.encode('hex_codec')
+ address = dns.rdata._hexify(address)
current += afdlen
rdlen -= afdlen
item = APLItem(header[0], negation, address, header[1])
from_wire = classmethod(from_wire)
def _cmp(self, other):
- f = cStringIO.StringIO()
+ f = io.BytesIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)
wire2 = f.getvalue()
f.close()
- return cmp(wire1, wire2)
+ return dns.util.cmp(wire1, wire2)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+import base64
+
import dns.exception
+import dns.util
class DHCID(dns.rdata.Rdata):
"""DHCID record
raise dns.exception.SyntaxError
chunks.append(t.value)
b64 = ''.join(chunks)
- data = b64.decode('base64_codec')
+ data = base64.b64decode(b64.encode('ascii'))
return cls(rdclass, rdtype, data)
from_text = classmethod(from_text)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- return cmp(self.data, other.data)
+ return dns.util.cmp(self.data, other.data)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import base64
+import io
import struct
import dns.exception
import dns.inet
import dns.name
+import dns.util
class IPSECKEY(dns.rdata.Rdata):
"""IPSECKEY record
raise dns.exception.SyntaxError
chunks.append(t.value)
b64 = ''.join(chunks)
- key = b64.decode('base64_codec')
+ key = base64.b64decode(b64.encode('ascii'))
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- f = cStringIO.StringIO()
+ f = io.BytesIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)
wire2 = f.getvalue()
f.close()
- return cmp(wire1, wire2)
+ return dns.util.cmp(wire1, wire2)
import dns.exception
import dns.name
import dns.rdata
+import dns.util
def _write_string(file, s):
l = len(s)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(s)
+ dns.util.write_uint8(file, l)
+ file.write(s.encode('ascii'))
class NAPTR(dns.rdata.Rdata):
"""NAPTR record
__slots__ = ['order', 'preference', 'flags', 'service', 'regexp',
'replacement']
-
+
def __init__(self, rdclass, rdtype, order, preference, flags, service,
regexp, replacement):
super(NAPTR, self).__init__(rdclass, rdtype)
tok.get_eol()
return cls(rdclass, rdtype, order, preference, flags, service,
regexp, replacement)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
_write_string(file, self.service)
_write_string(file, self.regexp)
self.replacement.to_wire(file, compress, origin)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(order, preference) = struct.unpack('!HH', wire[current : current + 4])
current += 4
rdlen -= 4
strings = []
- for i in xrange(3):
- l = ord(wire[current])
+ for i in range(3):
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen or rdlen < 0:
raise dns.exception.FormError
- s = wire[current : current + l]
+ s = wire[current : current + l].decode('latin_1')
current += l
rdlen -= l
strings.append(s)
def choose_relativity(self, origin = None, relativize = True):
self.replacement = self.replacement.choose_relativity(origin,
relativize)
-
+
def _cmp(self, other):
sp = struct.pack("!HH", self.order, self.preference)
op = struct.pack("!HH", other.order, other.preference)
- v = cmp(sp, op)
+ v = dns.util.cmp(sp, op)
if v == 0:
- v = cmp(self.flags, other.flags)
+ v = dns.util.cmp(self.flags, other.flags)
if v == 0:
- v = cmp(self.service, other.service)
+ v = dns.util.cmp(self.service, other.service)
if v == 0:
- v = cmp(self.regexp, other.regexp)
+ v = dns.util.cmp(self.regexp, other.regexp)
if v == 0:
- v = cmp(self.replacement, other.replacement)
+ v = dns.util.cmp(self.replacement, other.replacement)
return v
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
class NSAP(dns.rdata.Rdata):
"""NSAP record.
self.address = address
def to_text(self, origin=None, relativize=True, **kw):
- return "0x%s" % self.address.encode('hex_codec')
+ return "0x%s" % dns.rdata._hexify(self.address, 256)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
address = address[2:].replace('.', '')
if len(address) % 2 != 0:
raise dns.exception.SyntaxError('hexstring has odd length')
- address = address.decode('hex_codec')
+ address = bytes.fromhex(address)
return cls(rdclass, rdtype, address)
from_text = classmethod(from_text)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- return cmp(self.address, other.address)
+ return dns.util.cmp(self.address, other.address)
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class PX(dns.rdata.Rdata):
"""PX record.
@see: RFC 2163"""
__slots__ = ['preference', 'map822', 'mapx400']
-
+
def __init__(self, rdclass, rdtype, preference, map822, mapx400):
super(PX, self).__init__(rdclass, rdtype)
self.preference = preference
map822 = self.map822.choose_relativity(origin, relativize)
mapx400 = self.mapx400.choose_relativity(origin, relativize)
return '%d %s %s' % (self.preference, map822, mapx400)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
preference = tok.get_uint16()
map822 = tok.get_name()
mapx400 = mapx400.choose_relativity(origin, relativize)
tok.get_eol()
return cls(rdclass, rdtype, preference, map822, mapx400)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(pref)
self.map822.to_wire(file, None, origin)
self.mapx400.to_wire(file, None, origin)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(preference, ) = struct.unpack('!H', wire[current : current + 2])
current += 2
def _cmp(self, other):
sp = struct.pack("!H", self.preference)
op = struct.pack("!H", other.preference)
- v = cmp(sp, op)
+ v = dns.util.cmp(sp, op)
if v == 0:
- v = cmp(self.map822, other.map822)
+ v = dns.util.cmp(self.map822, other.map822)
if v == 0:
- v = cmp(self.mapx400, other.mapx400)
+ v = dns.util.cmp(self.mapx400, other.mapx400)
return v
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class SRV(dns.rdata.Rdata):
"""SRV record
def _cmp(self, other):
sp = struct.pack("!HHH", self.priority, self.weight, self.port)
op = struct.pack("!HHH", other.priority, other.weight, other.port)
- v = cmp(sp, op)
+ v = dns.util.cmp(sp, op)
if v == 0:
- v = cmp(self.target, other.target)
+ v = dns.util.cmp(self.target, other.target)
return v
import dns.ipv4
import dns.rdata
+import dns.util
_proto_tcp = socket.getprotobyname('tcp')
_proto_udp = socket.getprotobyname('udp')
def to_text(self, origin=None, relativize=True, **kw):
bits = []
- for i in xrange(0, len(self.bitmap)):
- byte = ord(self.bitmap[i])
- for j in xrange(0, 8):
+ for i in range(0, len(self.bitmap)):
+ byte = self.bitmap[i]
+ for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(str(i * 8 + j))
text = ' '.join(bits)
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
- bitmap = []
+ bitmap = bytearray(32 * 256)
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
- l = len(bitmap)
- if l < i + 1:
- for j in xrange(l, i + 1):
- bitmap.append('\x00')
- bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
+ bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def _cmp(self, other):
sa = dns.ipv4.inet_aton(self.address)
oa = dns.ipv4.inet_aton(other.address)
- v = cmp(sa, oa)
+ v = dns.util.cmp(sa, oa)
if v == 0:
sp = struct.pack('!B', self.protocol)
op = struct.pack('!B', other.protocol)
- v = cmp(sp, op)
+ v = dns.util.cmp(sp, op)
if v == 0:
- v = cmp(self.bitmap, other.bitmap)
+ v = dns.util.cmp(self.bitmap, other.bitmap)
return v
import dns.rdata
import dns.rdatatype
+import dns.util
class DSBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DS record
dns.rdata._hexify(self.digest,
chunksize=128))
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
key_tag = tok.get_uint16()
algorithm = tok.get_uint8()
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value)
- digest = ''.join(chunks)
- digest = digest.decode('hex_codec')
+ digest = bytes.fromhex(''.join(chunks))
return cls(rdclass, rdtype, key_tag, algorithm, digest_type,
digest)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!HBB", self.key_tag, self.algorithm,
self.digest_type)
file.write(header)
file.write(self.digest)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
header = struct.unpack("!HBB", wire[current : current + 4])
current += 4
digest = wire[current : current + rdlen]
return cls(rdclass, rdtype, header[0], header[1], header[2], digest)
- from_wire = classmethod(from_wire)
-
def _cmp(self, other):
hs = struct.pack("!HBB", self.key_tag, self.algorithm,
self.digest_type)
ho = struct.pack("!HBB", other.key_tag, other.algorithm,
other.digest_type)
- v = cmp(hs, ho)
+ v = dns.util.cmp(hs, ho)
if v == 0:
- v = cmp(self.digest, other.digest)
+ v = dns.util.cmp(self.digest, other.digest)
return v
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+import base64
import struct
import dns.exception
import dns.dnssec
import dns.rdata
+import dns.util
_flags_from_text = {
'NOCONF': (0x4000, 0xC000),
return '%d %d %d %s' % (self.flags, self.protocol, self.algorithm,
dns.rdata._base64ify(self.key))
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
flags = tok.get_string()
if flags.isdigit():
raise dns.exception.SyntaxError
chunks.append(t.value)
b64 = ''.join(chunks)
- key = b64.decode('base64_codec')
+ key = base64.b64decode(b64.encode('ascii'))
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!HBB", self.flags, self.protocol, self.algorithm)
file.write(header)
file.write(self.key)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
if rdlen < 4:
raise dns.exception.FormError
return cls(rdclass, rdtype, header[0], header[1], header[2],
key)
- from_wire = classmethod(from_wire)
-
def _cmp(self, other):
hs = struct.pack("!HBB", self.flags, self.protocol, self.algorithm)
ho = struct.pack("!HBB", other.flags, other.protocol, other.algorithm)
- v = cmp(hs, ho)
+ v = dns.util.cmp(hs, ho)
if v == 0:
- v = cmp(self.key, other.key)
+ v = dns.util.cmp(self.key, other.key)
return v
"""MX-like base classes."""
-import cStringIO
import struct
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class MXBase(dns.rdata.Rdata):
"""Base class for rdata that is like an MX record.
exchange = self.exchange.choose_relativity(origin, relativize)
return '%d %s' % (self.preference, exchange)
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
preference = tok.get_uint16()
exchange = tok.get_name()
tok.get_eol()
return cls(rdclass, rdtype, preference, exchange)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
pref = struct.pack("!H", self.preference)
file.write(pref)
return struct.pack("!H", self.preference) + \
self.exchange.to_digestable(origin)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(preference, ) = struct.unpack('!H', wire[current : current + 2])
current += 2
exchange = exchange.relativize(origin)
return cls(rdclass, rdtype, preference, exchange)
- from_wire = classmethod(from_wire)
-
def choose_relativity(self, origin = None, relativize = True):
self.exchange = self.exchange.choose_relativity(origin, relativize)
def _cmp(self, other):
sp = struct.pack("!H", self.preference)
op = struct.pack("!H", other.preference)
- v = cmp(sp, op)
+ v = dns.util.cmp(sp, op)
if v == 0:
- v = cmp(self.exchange, other.exchange)
+ v = dns.util.cmp(self.exchange, other.exchange)
return v
class UncompressedMX(MXBase):
super(UncompressedMX, self).to_wire(file, None, origin)
def to_digestable(self, origin = None):
- f = cStringIO.StringIO()
- self.to_wire(f, None, origin)
- return f.getvalue()
+ return self.to_wire(f, None, origin)
class UncompressedDowncasingMX(MXBase):
"""Base class for rdata that is like an MX record, but whose name
"""NS-like base classes."""
-import cStringIO
+import io
import dns.exception
import dns.rdata
import dns.name
+import dns.util
class NSBase(dns.rdata.Rdata):
"""Base class for rdata that is like an NS record.
target = self.target.choose_relativity(origin, relativize)
return str(target)
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
target = tok.get_name()
target = target.choose_relativity(origin, relativize)
tok.get_eol()
return cls(rdclass, rdtype, target)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
self.target.to_wire(file, compress, origin)
def to_digestable(self, origin = None):
return self.target.to_digestable(origin)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(target, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
target = target.relativize(origin)
return cls(rdclass, rdtype, target)
- from_wire = classmethod(from_wire)
-
def choose_relativity(self, origin = None, relativize = True):
self.target = self.target.choose_relativity(origin, relativize)
def _cmp(self, other):
- return cmp(self.target, other.target)
+ return dns.util.cmp(self.target, other.target)
class UncompressedNS(NSBase):
"""Base class for rdata that is like an NS record, but whose name
super(UncompressedNS, self).to_wire(file, None, origin)
def to_digestable(self, origin = None):
- f = cStringIO.StringIO()
- self.to_wire(f, None, origin)
- return f.getvalue()
+ return self.to_wire(None, None, origin)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+import base64
import calendar
import struct
import time
import dns.exception
import dns.rdata
import dns.rdatatype
+import dns.util
class BadSigTime(dns.exception.DNSException):
"""Raised when a SIG or RRSIG RR's time cannot be parsed."""
dns.rdata._base64ify(self.signature)
)
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
type_covered = dns.rdatatype.from_text(tok.get_string())
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
raise dns.exception.SyntaxError
chunks.append(t.value)
b64 = ''.join(chunks)
- signature = b64.decode('base64_codec')
+ signature = base64.b64decode(b64.encode('ascii'))
return cls(rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
header = struct.pack('!HBBIIIH', self.type_covered,
self.algorithm, self.labels,
self.signer.to_wire(file, None, origin)
file.write(self.signature)
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
header = struct.unpack('!HBBIIIH', wire[current : current + 18])
current += 18
header[3], header[4], header[5], header[6], signer,
signature)
- from_wire = classmethod(from_wire)
-
def choose_relativity(self, origin = None, relativize = True):
self.signer = self.signer.choose_relativity(origin, relativize)
other.algorithm, other.labels,
other.original_ttl, other.expiration,
other.inception, other.key_tag)
- v = cmp(hs, ho)
+ v = dns.util.cmp(hs, ho)
if v == 0:
- v = cmp(self.signer, other.signer)
+ v = dns.util.cmp(self.signer, other.signer)
if v == 0:
- v = cmp(self.signature, other.signature)
+ v = dns.util.cmp(self.signature, other.signature)
return v
import dns.exception
import dns.rdata
import dns.tokenizer
+import dns.util
class TXTBase(dns.rdata.Rdata):
"""Base class for rdata that is like a TXT record
prefix = ' '
return txt
+ @classmethod
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
strings = []
while 1:
raise dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
- from_text = classmethod(from_text)
-
def to_wire(self, file, compress = None, origin = None):
for s in self.strings:
l = len(s)
assert l < 256
- byte = chr(l)
- file.write(byte)
- file.write(s)
+ dns.util.write_uint8(file, l)
+ file.write(s.encode('latin_1'))
+ @classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
strings = []
while rdlen > 0:
- l = ord(wire[current])
+ l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
- s = wire[current : current + l]
+ s = wire[current : current + l].decode('latin_1')
current += l
rdlen -= l
strings.append(s)
return cls(rdclass, rdtype, strings)
- from_wire = classmethod(from_wire)
-
def _cmp(self, other):
- return cmp(self.strings, other.strings)
+ return dns.util.cmp(self.strings, other.strings)
"""Help for building DNS wire format messages"""
-import cStringIO
+import io
import struct
import random
import time
wire = r.get_wire()
@ivar output: where rendering is written
- @type output: cStringIO.StringIO object
+ @type output: io.BytesIO object
@ivar id: the message id
@type id: int
@ivar flags: the message flags
@type origin: dns.name.Namem or None.
"""
- self.output = cStringIO.StringIO()
+ self.output = io.BytesIO()
if id is None:
self.id = random.randint(0, 65535)
else:
self.compress = {}
self.section = QUESTION
self.counts = [0, 0, 0, 0]
- self.output.write('\x00' * 12)
+ self.output.write(b'\x00' * 12)
self.mac = ''
def _rollback(self, where):
self.output.seek(where)
self.output.truncate()
keys_to_delete = []
- for k, v in self.compress.iteritems():
+ for k, v in self.compress.items():
if v >= where:
keys_to_delete.append(k)
for k in keys_to_delete:
"""
# make sure the EDNS version in ednsflags agrees with edns
- ednsflags &= 0xFF00FFFFL
+ ednsflags &= 0xFF00FFFF
ednsflags |= (edns << 16)
self._set_section(ADDITIONAL)
before = self.output.tell()
self.response = response
min_ttl = -1
rrset = None
- for count in xrange(0, 15):
+ for count in range(0, 15):
try:
rrset = response.find_rrset(response.answer, qname,
rdclass, rdtype)
now = time.time()
if self.next_cleaning <= now:
keys_to_delete = []
- for (k, v) in self.data.iteritems():
+ for (k, v) in self.data.items():
if v.expiration <= now:
keys_to_delete.append(k)
for k in keys_to_delete:
"""
if not key is None:
- if self.data.has_key(key):
+ if key in self.data:
del self.data[key]
else:
self.data = {}
"""Process f as a file in the /etc/resolv.conf format. If f is
a string, it is used as the name of the file to open; otherwise it
is treated as the file itself."""
- if isinstance(f, str) or isinstance(f, unicode):
+ if isinstance(f, str):
try:
f = open(f, 'r')
except IOError:
def _config_win32_nameservers(self, nameservers):
"""Configure a NameServer registry entry."""
- # we call str() on nameservers to convert it from unicode to ascii
- nameservers = str(nameservers)
split_char = self._determine_split_char(nameservers)
ns_list = nameservers.split(split_char)
for ns in ns_list:
def _config_win32_domain(self, domain):
"""Configure a Domain registry entry."""
- # we call str() on domain to convert it from unicode to ascii
- self.domain = dns.name.from_text(str(domain))
+ self.domain = dns.name.from_text(domain)
def _config_win32_search(self, search):
"""Configure a Search registry entry."""
- # we call str() on search to convert it from unicode to ascii
- search = str(search)
split_char = self._determine_split_char(search)
search_list = search.split(split_char)
for s in search_list:
@raises NoNameservers: no non-broken nameservers are available to
answer the question."""
- if isinstance(qname, (str, unicode)):
+ if isinstance(qname, str):
qname = dns.name.from_text(qname, None)
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
@type resolver: dns.resolver.Resolver object or None
@rtype: dns.name.Name"""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, dns.name.root)
if resolver is None:
resolver = get_default_resolver()
@type ipv6_reverse_domain: dns.name.Name object
"""
+import base64
+
import dns.name
import dns.ipv6
import dns.ipv4
@rtype: dns.name.Name object
"""
try:
- parts = list(dns.ipv6.inet_aton(text).encode('hex_codec'))
+ parts = ['%x.%x' % (byte, byte >> 4) for byte in dns.ipv6.inet_aton(text)]
origin = ipv6_reverse_domain
except:
- parts = ['%d' % ord(byte) for byte in dns.ipv4.inet_aton(text)]
+ parts = ['%d' % byte for byte in dns.ipv4.inet_aton(text)]
origin = ipv4_reverse_domain
parts.reverse()
- return dns.name.from_text('.'.join(parts), origin=origin)
+ return dns.name.from_text('.'.join(parts).lower(), origin=origin)
def to_address(name):
"""Convert a reverse map domain name into textual address form.
name = name.relativize(ipv4_reverse_domain)
labels = list(name.labels)
labels.reverse()
- text = '.'.join(labels)
+ text = '.'.join([x.decode('ascii') for x in labels])
# run through inet_aton() to check syntax and make pretty.
return dns.ipv4.inet_ntoa(dns.ipv4.inet_aton(text))
elif name.is_subdomain(ipv6_reverse_domain):
i = 0
l = len(labels)
while i < l:
- parts.append(''.join(labels[i:i+4]))
+ parts.append(''.join([x.decode('ascii') for x in labels[i:i+4]]))
i += 4
text = ':'.join(parts)
# run through inet_aton() to check syntax and make pretty.
@rtype: dns.rrset.RRset object
"""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
if isinstance(rdclass, str):
rdclass = dns.rdataclass.from_text(rdclass)
class Set(object):
"""A simple set class.
- Sets are not in Python until 2.3, and rdata are not immutable so
- we cannot use sets.Set anyway. This class implements subset of
- the 2.3 Set interface using a list as the container.
+ This class implements a mutable set using a list as the container.
+ We don't use Python's set class because it's not indexable.
@ivar items: A list of the items which are in the set
@type items: list"""
"""Tokenize DNS master file format"""
-import cStringIO
+import io
import sys
import dns.exception
import dns.name
import dns.ttl
-_DELIMITERS = {
- ' ' : True,
- '\t' : True,
- '\n' : True,
- ';' : True,
- '(' : True,
- ')' : True,
- '"' : True }
-
-_QUOTING_DELIMITERS = { '"' : True }
+_DELIMITERS = frozenset(' \t\n;()"')
+_QUOTING_DELIMITERS = frozenset('"')
EOF = 0
EOL = 1
"""
if isinstance(f, str):
- f = cStringIO.StringIO(f)
+ f = io.StringIO(f)
if filename is None:
filename = '<string>'
else:
raise UngetBufferFull
self.ungotten_token = token
- def next(self):
+ def __next__(self):
"""Return the next item in an iteration.
@rtype: (int, string)
"""
raise dns.exception.SyntaxError('expecting an identifier')
if not token.value.isdigit():
raise dns.exception.SyntaxError('expecting an integer')
- value = long(token.value)
- if value < 0 or value > 4294967296L:
+ value = int(token.value)
+ if value < 0 or value > 4294967296:
raise dns.exception.SyntaxError('%d is not an unsigned 32-bit integer' % value)
return value
"""DNS TSIG support."""
+import hashlib
import hmac
import struct
ctx.update(keyname.to_digestable())
ctx.update(struct.pack('!H', dns.rdataclass.ANY))
ctx.update(struct.pack('!I', 0))
- long_time = time + 0L
- upper_time = (long_time >> 32) & 0xffffL
- lower_time = long_time & 0xffffffffL
+ upper_time = (time >> 32) & 0xffff
+ lower_time = time & 0xffffffff
time_mac = struct.pack('!HIH', upper_time, lower_time, fudge)
pre_mac = algorithm_name + time_mac
ol = len(other_data)
current = current + used
(upper_time, lower_time, fudge, mac_size) = \
struct.unpack("!HIHH", wire[current:current + 10])
- time = ((upper_time + 0L) << 32) + (lower_time + 0L)
+ time = ((upper_time + 0) << 32) + lower_time
current += 10
mac = wire[current:current + mac_size]
current += mac_size
"""
hashes = {}
- try:
- import hashlib
- hashes[dns.name.from_text('hmac-sha224')] = hashlib.sha224
- hashes[dns.name.from_text('hmac-sha256')] = hashlib.sha256
- hashes[dns.name.from_text('hmac-sha384')] = hashlib.sha384
- hashes[dns.name.from_text('hmac-sha512')] = hashlib.sha512
- hashes[dns.name.from_text('hmac-sha1')] = hashlib.sha1
- hashes[dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = hashlib.md5
-
- import sys
- if sys.hexversion < 0x02050000:
- # hashlib doesn't conform to PEP 247: API for
- # Cryptographic Hash Functions, which hmac before python
- # 2.5 requires, so add the necessary items.
- class HashlibWrapper:
- def __init__(self, basehash):
- self.basehash = basehash
- self.digest_size = self.basehash().digest_size
-
- def new(self, *args, **kwargs):
- return self.basehash(*args, **kwargs)
-
- for name in hashes:
- hashes[name] = HashlibWrapper(hashes[name])
-
- except ImportError:
- import md5, sha
- hashes[dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = md5.md5
- hashes[dns.name.from_text('hmac-sha1')] = sha.sha
+ hashes[dns.name.from_text('hmac-sha224')] = hashlib.sha224
+ hashes[dns.name.from_text('hmac-sha256')] = hashlib.sha256
+ hashes[dns.name.from_text('hmac-sha384')] = hashlib.sha384
+ hashes[dns.name.from_text('hmac-sha512')] = hashlib.sha512
+ hashes[dns.name.from_text('hmac-sha1')] = hashlib.sha1
+ hashes[dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = hashlib.md5
if isinstance(algorithm, (str, unicode)):
algorithm = dns.name.from_text(algorithm)
"""
if text.isdigit():
- total = long(text)
+ total = int(text)
else:
if not text[0].isdigit():
raise BadTTL
- total = 0L
- current = 0L
+ total = 0
+ current = 0
for c in text:
if c.isdigit():
current *= 10
- current += long(c)
+ current += int(c)
else:
c = c.lower()
if c == 'w':
- total += current * 604800L
+ total += current * 604800
elif c == 'd':
- total += current * 86400L
+ total += current * 86400
elif c == 'h':
- total += current * 3600L
+ total += current * 3600
elif c == 'm':
- total += current * 60L
+ total += current * 60
elif c == 's':
total += current
else:
current = 0
if not current == 0:
raise BadTTL("trailing integer")
- if total < 0L or total > 2147483647L:
+ if total < 0 or total > 2147483647:
raise BadTTL("TTL should be between 0 and 2^31 - 1 (inclusive)")
return total
"""
super(Update, self).__init__()
self.flags |= dns.opcode.to_flags(dns.opcode.UPDATE)
- if isinstance(zone, (str, unicode)):
+ if isinstance(zone, str):
zone = dns.name.from_text(zone)
self.origin = zone
if isinstance(rdclass, str):
- ttl, rdtype, string..."""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
if isinstance(args[0], dns.rdataset.Rdataset):
for rds in args:
- rdtype, [string...]"""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
if len(args) == 0:
rrset = self.find_rrset(self.authority, name, dns.rdataclass.ANY,
- rdtype, string..."""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
if len(args) == 0:
rrset = self.find_rrset(self.answer, name,
"""Require that an owner name (and optionally an rdata type) does
not exist as a prerequisite to the execution of the update."""
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
if rdtype is None:
rrset = self.find_rrset(self.answer, name,
--- /dev/null
+# Copyright (C) 2010 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Miscellaneous Implementation Helpers"""
+
+import struct
+
+def cmp(x, y):
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ else:
+ return 0
+
+def write_uint8(bfile, value):
+ """Write an unsigned 8-bit integer to an io.BytesIO file
+ """
+
+ bfile.write(struct.pack('B', value))
+
+
+def write_uint16(bfile, value):
+ """Write an unsigned 16-bit integer to an io.BytesIO file
+ """
+
+ bfile.write(struct.pack('!H', value))
+
+def write_uint32(bfile, value):
+ """Write an unsigned 32-bit integer to an io.BytesIO file
+ """
+
+ bfile.write(struct.pack('!L', value))
+
+
+def write_uint64(bfile, value):
+ """Write an unsigned 64-bit integer to an io.BytesIO file
+ """
+
+ bfile.write(struct.pack('!Q', value))
return not self.__eq__(other)
def _validate_name(self, name):
- if isinstance(name, (str, unicode)):
+ if isinstance(name, str):
name = dns.name.from_text(name, None)
elif not isinstance(name, dns.name.Name):
raise KeyError("name parameter must be convertable to a DNS name")
"""
name = self._validate_name(name)
- if self.nodes.has_key(name):
+ if name in self.nodes:
del self.nodes[name]
def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = dns.rdatatype.from_text(covers)
- for (name, node) in self.iteritems():
+ for (name, node) in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = dns.rdatatype.from_text(covers)
- for (name, node) in self.iteritems():
+ for (name, node) in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
@type nl: string or None
"""
- if sys.hexversion >= 0x02030000:
- # allow Unicode filenames
- str_type = basestring
- else:
- str_type = str
if nl is None:
opts = 'w'
else:
opts = 'wb'
- if isinstance(f, str_type):
- f = file(f, opts)
+ if isinstance(f, str):
+ f = open(f, opts)
want_close = True
else:
want_close = False
try:
if sorted:
- names = self.keys()
+ names = list(self.keys())
names.sort()
else:
names = self.iterkeys()
l = self[n].to_text(n, origin=self.origin,
relativize=relativize)
if nl is None:
- print >> f, l
+ print(l, file=f)
else:
- f.write(l)
- f.write(nl)
+ f.write(l.encode('ascii'))
+ f.write(nl.encode('ascii'))
finally:
if want_close:
f.close()
def __init__(self, tok, origin, rdclass, relativize, zone_factory=Zone,
allow_include=False, check_origin=True):
- if isinstance(origin, (str, unicode)):
+ if isinstance(origin, str):
origin = dns.name.from_text(origin)
self.tok = tok
self.current_origin = origin
self.last_name,
self.current_file,
self.ttl))
- self.current_file = file(filename, 'r')
+ self.current_file = open(filename, 'r')
self.tok = dns.tokenizer.Tokenizer(self.current_file,
filename)
self.current_origin = new_origin
continue
self.tok.unget(token)
self._rr_line()
- except dns.exception.SyntaxError, detail:
+ except dns.exception.SyntaxError as detail:
(filename, line_number) = self.tok.where()
if detail is None:
detail = "syntax error"
@rtype: dns.zone.Zone object
"""
- if sys.hexversion >= 0x02030000:
- # allow Unicode filenames; turn on universal newline support
- str_type = basestring
- opts = 'rU'
- else:
- str_type = str
- opts = 'r'
- if isinstance(f, str_type):
+ if isinstance(f, str):
if filename is None:
filename = f
- f = file(f, opts)
+ f = open(f, 'rU')
want_close = True
else:
if filename is None:
import dns.e164
n = dns.e164.from_e164("+1 555 1212")
-print n
-print dns.e164.to_e164(n)
+print(n)
+print(dns.e164.to_e164(n))
answers = dns.resolver.query('nominum.com', 'MX')
for rdata in answers:
- print 'Host', rdata.exchange, 'has preference', rdata.preference
+ print('Host', rdata.exchange, 'has preference', rdata.preference)
+
n = dns.name.from_text('www.dnspython.org')
o = dns.name.from_text('dnspython.org')
-print n.is_subdomain(o) # True
-print n.is_superdomain(o) # False
-print n > o # True
-rel = n.relativize(o) # rel is the relative name www
+print(n.is_subdomain(o)) # True
+print(n.is_superdomain(o)) # False
+print(n > o) # True
+rel = n.relativize(o) # rel is the relative name www
n2 = rel + o
-print n2 == n # True
-print n.labels # ['www', 'dnspython', 'org', '']
+print(n2 == n) # True
+print(n.labels) # (b'www', b'dnspython', b'org', b'')
relativize=False)
for (name, ttl, rdata) in zone.iterate_rdatas('A'):
try:
- reverse_map[rdata.address].append(name.to_text())
- except KeyError:
- reverse_map[rdata.address] = [name.to_text()]
+ reverse_map[rdata.address].append(name.to_text())
+ except KeyError:
+ reverse_map[rdata.address] = [name.to_text()]
-keys = reverse_map.keys()
-keys.sort(lambda a1, a2: cmp(dns.ipv4.inet_aton(a1), dns.ipv4.inet_aton(a2)))
+keys = sorted(reverse_map.keys(), key=dns.ipv4.inet_aton)
for k in keys:
- v = reverse_map[k]
- v.sort()
- print k, v
+ v = sorted(reverse_map[k])
+ print(k, v)
import dns.reversename
n = dns.reversename.from_address("127.0.0.1")
-print n
-print dns.reversename.to_address(n)
+print(n)
+print(dns.reversename.to_address(n))
names = z.nodes.keys()
names.sort()
for n in names:
- print z[n].to_text(n)
+ print(z[n].to_text(n))
# $Id: Makefile,v 1.5 2004/03/19 00:17:27 halley Exp $
-PYTHON=python
+PYTHON=python3.1
check: test
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
import os
import unittest
goodhex = '04d201000001000000000001047777777709646e73707974686f6e' \
'036f726700000100010000291000000080000000'
-goodwire = goodhex.decode('hex_codec')
+goodwire = bytes.fromhex(goodhex)
answer_text = """id 1234
opcode QUERY
'c091 0001 0001 00000e10 0004 cc98ba96'
-goodwire2 = goodhex2.replace(' ', '').decode('hex_codec')
+goodwire2 = bytes.fromhex(goodhex2.replace(' ', ''))
query_text_2 = """id 1234
opcode QUERY
goodhex3 = '04d2010f0001000000000001047777777709646e73707974686f6e' \
'036f726700000100010000291000ff0080000000'
-goodwire3 = goodhex3.decode('hex_codec')
+goodwire3 = bytes.fromhex(goodhex3)
class MessageTestCase(unittest.TestCase):
def test_TooBig(self):
def bad():
q = dns.message.from_text(query_text)
- for i in xrange(0, 25):
+ for i in range(0, 25):
rrset = dns.rrset.from_text('foo%d.' % i, 3600,
dns.rdataclass.IN,
dns.rdatatype.A,
def test_TrailingJunk(self):
def bad():
- badwire = goodwire + '\x00'
+ badwire = goodwire + b'\x00'
m = dns.message.from_wire(badwire)
self.failUnlessRaises(dns.message.TrailingJunk, bad)
def test_ShortHeader(self):
def bad():
- badwire = '\x00' * 11
+ badwire = b'\x00' * 11
m = dns.message.from_wire(badwire)
self.failUnlessRaises(dns.message.ShortHeader, bad)
import unittest
-import cStringIO
+import io
import socket
+import sys
import dns.name
import dns.reversename
class NameTestCase(unittest.TestCase):
def setUp(self):
self.origin = dns.name.from_text('example.')
-
+
def testFromTextRel1(self):
n = dns.name.from_text('foo.bar')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testFromTextRel2(self):
n = dns.name.from_text('foo.bar', origin=self.origin)
- self.failUnless(n.labels == ('foo', 'bar', 'example', ''))
+ self.failUnless(n.labels == (b'foo', b'bar', b'example', b''))
def testFromTextRel3(self):
n = dns.name.from_text('foo.bar', origin=None)
- self.failUnless(n.labels == ('foo', 'bar'))
+ self.failUnless(n.labels == (b'foo', b'bar'))
def testFromTextRel4(self):
n = dns.name.from_text('@', origin=None)
def testFromTextAbs1(self):
n = dns.name.from_text('foo.bar.')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testTortureFromText(self):
good = [
try:
n = dns.name.from_text(t)
except:
+ info = sys.exc_info()
+ print(info[0])
+ print(info[2])
self.fail("good test '%s' raised an exception" % t)
for t in bad:
caught = False
def testImmutable2(self):
def bad():
- self.origin.labels[0] = 'foo'
+ self.origin.labels[0] = b'foo'
self.failUnlessRaises(TypeError, bad)
def testAbs1(self):
def testCompare4(self):
self.failUnless(dns.name.root != 1)
- def testCompare5(self):
- self.failUnless(dns.name.root < 1 or dns.name.root > 1)
-
def testSubdomain1(self):
self.failUnless(not dns.name.empty.is_subdomain(dns.name.root))
def testCanonicalize1(self):
n = dns.name.from_text('FOO.bar', origin=self.origin)
c = n.canonicalize()
- self.failUnless(c.labels == ('foo', 'bar', 'example', ''))
+ self.failUnless(c.labels == (b'foo', b'bar', b'example', b''))
def testToText1(self):
n = dns.name.from_text('FOO.bar', origin=self.origin)
def testSlice1(self):
n = dns.name.from_text(r'a.b.c.', origin=None)
s = n[:]
- self.failUnless(s == ('a', 'b', 'c', ''))
+ self.failUnless(s == (b'a', b'b', b'c', b''))
def testSlice2(self):
n = dns.name.from_text(r'a.b.c.', origin=None)
s = n[:2]
- self.failUnless(s == ('a', 'b'))
+ self.failUnless(s == (b'a', b'b'))
def testSlice3(self):
n = dns.name.from_text(r'a.b.c.', origin=None)
s = n[2:]
- self.failUnless(s == ('c', ''))
+ self.failUnless(s == (b'c', b''))
def testEmptyLabel1(self):
def bad():
def testBadEscape(self):
def bad():
n = dns.name.from_text(r'a.b\0q1.c.')
- print n
+ print(n)
self.failUnlessRaises(dns.name.BadEscape, bad)
def testDigestable1(self):
n = dns.name.from_text('FOO.bar')
d = n.to_digestable()
- self.failUnless(d == '\x03foo\x03bar\x00')
+ self.failUnless(d == b'\x03foo\x03bar\x00')
def testDigestable2(self):
n1 = dns.name.from_text('FOO.bar')
def testDigestable3(self):
d = dns.name.root.to_digestable()
- self.failUnless(d == '\x00')
+ self.failUnless(d == b'\x00')
def testDigestable4(self):
n = dns.name.from_text('FOO.bar', None)
d = n.to_digestable(dns.name.root)
- self.failUnless(d == '\x03foo\x03bar\x00')
-
+ self.failUnless(d == b'\x03foo\x03bar\x00')
+
def testBadDigestable(self):
def bad():
n = dns.name.from_text('FOO.bar', None)
def testToWire1(self):
n = dns.name.from_text('FOO.bar')
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n.to_wire(f, compress)
- self.failUnless(f.getvalue() == '\x03FOO\x03bar\x00')
+ self.failUnless(f.getvalue() == b'\x03FOO\x03bar\x00')
def testToWire2(self):
n = dns.name.from_text('FOO.bar')
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n.to_wire(f, compress)
n.to_wire(f, compress)
- self.failUnless(f.getvalue() == '\x03FOO\x03bar\x00\xc0\x00')
+ self.failUnless(f.getvalue() == b'\x03FOO\x03bar\x00\xc0\x00')
def testToWire3(self):
n1 = dns.name.from_text('FOO.bar')
n2 = dns.name.from_text('foo.bar')
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, compress)
- self.failUnless(f.getvalue() == '\x03FOO\x03bar\x00\xc0\x00')
+ self.failUnless(f.getvalue() == b'\x03FOO\x03bar\x00\xc0\x00')
def testToWire4(self):
n1 = dns.name.from_text('FOO.bar')
n2 = dns.name.from_text('a.foo.bar')
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, compress)
- self.failUnless(f.getvalue() == '\x03FOO\x03bar\x00\x01\x61\xc0\x00')
+ self.failUnless(f.getvalue() == b'\x03FOO\x03bar\x00\x01\x61\xc0\x00')
def testToWire5(self):
n1 = dns.name.from_text('FOO.bar')
n2 = dns.name.from_text('a.foo.bar')
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, None)
self.failUnless(f.getvalue() == \
- '\x03FOO\x03bar\x00\x01\x61\x03foo\x03bar\x00')
+ b'\x03FOO\x03bar\x00\x01\x61\x03foo\x03bar\x00')
def testToWire6(self):
n = dns.name.from_text('FOO.bar')
v = n.to_wire()
- self.failUnless(v == '\x03FOO\x03bar\x00')
+ self.failUnless(v == b'\x03FOO\x03bar\x00')
def testBadToWire(self):
def bad():
n = dns.name.from_text('FOO.bar', None)
- f = cStringIO.StringIO()
+ f = io.BytesIO()
compress = {}
n.to_wire(f, compress)
self.failUnlessRaises(dns.name.NeedAbsoluteNameOrOrigin, bad)
self.failUnless(n.choose_relativity(o, False) == e)
def testFromWire1(self):
- w = '\x03foo\x00\xc0\x00'
+ w = b'\x03foo\x00\xc0\x00'
(n1, cused1) = dns.name.from_wire(w, 0)
(n2, cused2) = dns.name.from_wire(w, cused1)
en1 = dns.name.from_text('foo.')
self.failUnless(n1 == en1 and cused1 == ecused1 and \
n2 == en2 and cused2 == ecused2)
- def testFromWire1(self):
- w = '\x03foo\x00\x01a\xc0\x00\x01b\xc0\x05'
+ def testFromWire2(self):
+ w = b'\x03foo\x00\x01a\xc0\x00\x01b\xc0\x05'
current = 0
(n1, cused1) = dns.name.from_wire(w, current)
current += cused1
def testBadFromWire1(self):
def bad():
- w = '\x03foo\xc0\x04'
+ w = b'\x03foo\xc0\x04'
(n, cused) = dns.name.from_wire(w, 0)
self.failUnlessRaises(dns.name.BadPointer, bad)
def testBadFromWire2(self):
def bad():
- w = '\x03foo\xc0\x05'
+ w = b'\x03foo\xc0\x05'
(n, cused) = dns.name.from_wire(w, 0)
self.failUnlessRaises(dns.name.BadPointer, bad)
def testBadFromWire3(self):
def bad():
- w = '\xbffoo'
+ w = b'\xbffoo'
(n, cused) = dns.name.from_wire(w, 0)
self.failUnlessRaises(dns.name.BadLabelType, bad)
def testBadFromWire4(self):
def bad():
- w = '\x41foo'
+ w = b'\x41foo'
(n, cused) = dns.name.from_wire(w, 0)
self.failUnlessRaises(dns.name.BadLabelType, bad)
self.failUnlessRaises(dns.name.NoParent, bad)
def testFromUnicode1(self):
- n = dns.name.from_text(u'foo.bar')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ n = dns.name.from_text('foo.bar')
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testFromUnicode2(self):
- n = dns.name.from_text(u'foo\u1234bar.bar')
- self.failUnless(n.labels == ('xn--foobar-r5z', 'bar', ''))
+ n = dns.name.from_text('foo\u1234bar.bar')
+ self.failUnless(n.labels == (b'xn--foobar-r5z', b'bar', b''))
def testFromUnicodeAlternateDot1(self):
- n = dns.name.from_text(u'foo\u3002bar')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ n = dns.name.from_text('foo\u3002bar')
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testFromUnicodeAlternateDot2(self):
- n = dns.name.from_text(u'foo\uff0ebar')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ n = dns.name.from_text('foo\uff0ebar')
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testFromUnicodeAlternateDot3(self):
- n = dns.name.from_text(u'foo\uff61bar')
- self.failUnless(n.labels == ('foo', 'bar', ''))
+ n = dns.name.from_text('foo\uff61bar')
+ self.failUnless(n.labels == (b'foo', b'bar', b''))
def testToUnicode1(self):
- n = dns.name.from_text(u'foo.bar')
+ n = dns.name.from_text('foo.bar')
s = n.to_unicode()
- self.failUnless(s == u'foo.bar.')
+ self.failUnless(s == 'foo.bar.')
def testToUnicode2(self):
- n = dns.name.from_text(u'foo\u1234bar.bar')
+ n = dns.name.from_text('foo\u1234bar.bar')
s = n.to_unicode()
- self.failUnless(s == u'foo\u1234bar.bar.')
+ self.failUnless(s == 'foo\u1234bar.bar.')
def testToUnicode3(self):
n = dns.name.from_text('foo.bar')
s = n.to_unicode()
- self.failUnless(s == u'foo.bar.')
+ self.failUnless(s == 'foo.bar.')
def testReverseIPv4(self):
e = dns.name.from_text('1.0.0.127.in-addr.arpa.')
def test_aton1(self):
a = dns.ipv6.inet_aton('::')
- self.failUnless(a == '\x00' * 16)
+ self.failUnless(a == b'\x00' * 16)
def test_aton2(self):
a = dns.ipv6.inet_aton('::1')
- self.failUnless(a == '\x00' * 15 + '\x01')
+ self.failUnless(a == b'\x00' * 15 + b'\x01')
def test_aton3(self):
a = dns.ipv6.inet_aton('::10.0.0.1')
- self.failUnless(a == '\x00' * 12 + '\x0a\x00\x00\x01')
+ self.failUnless(a == b'\x00' * 12 + b'\x0a\x00\x00\x01')
def test_aton4(self):
a = dns.ipv6.inet_aton('abcd::dcba')
- self.failUnless(a == '\xab\xcd' + '\x00' * 12 + '\xdc\xba')
+ self.failUnless(a == b'\xab\xcd' + b'\x00' * 12 + b'\xdc\xba')
def test_aton5(self):
a = dns.ipv6.inet_aton('1:2:3:4:5:6:7:8')
self.failUnless(a == \
- '00010002000300040005000600070008'.decode('hex_codec'))
+ bytes.fromhex('00010002000300040005000600070008'))
def test_bad_aton1(self):
def bad():
def test_aton1(self):
a = dns.ipv6.inet_aton('::')
- self.failUnless(a == '\x00' * 16)
+ self.failUnless(a == b'\x00' * 16)
def test_aton2(self):
a = dns.ipv6.inet_aton('::1')
- self.failUnless(a == '\x00' * 15 + '\x01')
+ self.failUnless(a == b'\x00' * 15 + b'\x01')
def test_aton3(self):
a = dns.ipv6.inet_aton('::10.0.0.1')
- self.failUnless(a == '\x00' * 12 + '\x0a\x00\x00\x01')
+ self.failUnless(a == b'\x00' * 12 + b'\x0a\x00\x00\x01')
def test_aton4(self):
a = dns.ipv6.inet_aton('abcd::dcba')
- self.failUnless(a == '\xab\xcd' + '\x00' * 12 + '\xdc\xba')
+ self.failUnless(a == b'\xab\xcd' + b'\x00' * 12 + b'\xdc\xba')
def test_ntoa1(self):
- b = '00010002000300040005000600070008'.decode('hex_codec')
+ b = bytes.fromhex('00010002000300040005000600070008')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '1:2:3:4:5:6:7:8')
def test_ntoa2(self):
- b = '\x00' * 16
+ b = b'\x00' * 16
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::')
def test_ntoa3(self):
- b = '\x00' * 15 + '\x01'
+ b = b'\x00' * 15 + b'\x01'
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::1')
def test_ntoa4(self):
- b = '\x80' + '\x00' * 15
+ b = b'\x80' + b'\x00' * 15
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '8000::')
def test_ntoa5(self):
- b = '\x01\xcd' + '\x00' * 12 + '\x03\xef'
+ b = b'\x01\xcd' + b'\x00' * 12 + b'\x03\xef'
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '1cd::3ef')
def test_ntoa6(self):
- b = 'ffff00000000ffff000000000000ffff'.decode('hex_codec')
+ b = bytes.fromhex('ffff00000000ffff000000000000ffff')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == 'ffff:0:0:ffff::ffff')
def test_ntoa7(self):
- b = '00000000ffff000000000000ffffffff'.decode('hex_codec')
+ b = bytes.fromhex('00000000ffff000000000000ffffffff')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '0:0:ffff::ffff:ffff')
def test_ntoa8(self):
- b = 'ffff0000ffff00000000ffff00000000'.decode('hex_codec')
+ b = bytes.fromhex('ffff0000ffff00000000ffff00000000')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == 'ffff:0:ffff::ffff:0:0')
def test_ntoa9(self):
- b = '0000000000000000000000000a000001'.decode('hex_codec')
+ b = bytes.fromhex('0000000000000000000000000a000001')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::10.0.0.1')
def test_ntoa10(self):
- b = '0000000000000000000000010a000001'.decode('hex_codec')
+ b = bytes.fromhex('0000000000000000000000010a000001')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::1:a00:1')
def test_ntoa11(self):
- b = '00000000000000000000ffff0a000001'.decode('hex_codec')
+ b = bytes.fromhex('00000000000000000000ffff0a000001')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::ffff:10.0.0.1')
def test_ntoa12(self):
- b = '000000000000000000000000ffffffff'.decode('hex_codec')
+ b = bytes.fromhex('000000000000000000000000ffffffff')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::255.255.255.255')
def test_ntoa13(self):
- b = '00000000000000000000ffffffffffff'.decode('hex_codec')
+ b = bytes.fromhex('00000000000000000000ffffffffffff')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::ffff:255.255.255.255')
def test_ntoa14(self):
- b = '0000000000000000000000000001ffff'.decode('hex_codec')
+ b = bytes.fromhex('0000000000000000000000000001ffff')
t = dns.ipv6.inet_ntoa(b)
self.failUnless(t == '::0.1.255.255')
def test_bad_ntoa2(self):
def bad():
- a = dns.ipv6.inet_ntoa('\x00' * 17)
+ a = dns.ipv6.inet_ntoa(b'\x00' * 17)
self.failUnlessRaises(ValueError, bad)
if __name__ == '__main__':
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import io
import sys
import time
import unittest
if sys.platform != 'win32':
def testRead(self):
- f = cStringIO.StringIO(resolv_conf)
+ f = io.StringIO(resolv_conf)
r = dns.resolver.Resolver(f)
self.failUnless(r.nameservers == ['10.0.0.1', '10.0.0.2'] and
r.domain == dns.name.from_text('foo'))
'04626c617ac00c 0001 00ff 00000000 0000' \
'c049 00ff 00ff 00000000 0000'
-goodwire = goodhex.replace(' ', '').decode('hex_codec')
+goodwire = bytes.fromhex(goodhex.replace(' ', ''))
update_text="""id 1
opcode UPDATE
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import cStringIO
+import io
import filecmp
import os
import unittest
def testFromText(self):
z = dns.zone.from_text(example_text, 'example.', relativize=True)
- f = cStringIO.StringIO()
- names = z.nodes.keys()
- names.sort()
+ f = io.StringIO()
+ names = sorted(z.nodes.keys())
for n in names:
- print >> f, z[n].to_text(n)
+ print(z[n].to_text(n), file=f)
self.failUnless(f.getvalue() == example_text_output)
-
+
def testTorture1(self):
#
# Read a zone containing all our supported RR types, and
# for each RR in the zone, convert the rdata into wire format
# and then back out, and see if we get equal rdatas.
#
- f = cStringIO.StringIO()
+ f = io.BytesIO()
o = dns.name.from_text('example.')
z = dns.zone.from_file('example', o)
- for (name, node) in z.iteritems():
+ for (name, node) in z.items():
for rds in node:
for rd in rds:
f.seek(0)
def testIterateRdatasets(self):
z = dns.zone.from_text(example_text, 'example.', relativize=True)
- ns = [n for n, r in z.iterate_rdatasets('A')]
- ns.sort()
+ ns = sorted([n for n, r in z.iterate_rdatasets('A')])
self.failUnless(ns == [dns.name.from_text('ns1', None),
dns.name.from_text('ns2', None)])
def testIterateAllRdatasets(self):
z = dns.zone.from_text(example_text, 'example.', relativize=True)
- ns = [n for n, r in z.iterate_rdatasets()]
- ns.sort()
+ ns = sorted([n for n, r in z.iterate_rdatasets()])
self.failUnless(ns == [dns.name.from_text('@', None),
dns.name.from_text('@', None),
dns.name.from_text('bar.foo', None),
def testIterateRdatas(self):
z = dns.zone.from_text(example_text, 'example.', relativize=True)
- l = list(z.iterate_rdatas('A'))
- l.sort()
+ l = sorted(z.iterate_rdatas('A'))
exl = [(dns.name.from_text('ns1', None),
3600,
dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.A,
def testIterateAllRdatas(self):
z = dns.zone.from_text(example_text, 'example.', relativize=True)
- l = list(z.iterate_rdatas())
- l.sort()
+ l = sorted(z.iterate_rdatas())
exl = [(dns.name.from_text('@', None),
3600,
dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.NS,