def _header_line(self, section):
"""Process one line from the text format header section."""
- (ttype, what) = self.tok.get()
+ token = self.tok.get()
+ what = token.value
if what == 'id':
self.message.id = self.tok.get_int()
elif what == 'flags':
while True:
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
self.tok.unget(token)
break
self.message.flags = self.message.flags | \
- dns.flags.from_text(token[1])
+ dns.flags.from_text(token.value)
if dns.opcode.is_update(self.message.flags):
self.updating = True
elif what == 'edns':
self.message.edns = 0
while True:
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
self.tok.unget(token)
break
self.message.ednsflags = self.message.ednsflags | \
- dns.flags.edns_from_text(token[1])
+ dns.flags.edns_from_text(token.value)
elif what == 'payload':
self.message.payload = self.tok.get_int()
if self.message.edns < 0:
"""Process one line from the text format question section."""
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], None)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, None)
name = self.last_name
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
except:
rdclass = dns.rdataclass.IN
# Type
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
self.message.find_rrset(self.message.question, name,
rdclass, rdtype, create=True,
force_unique=True)
deleting = None
# Name
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], None)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, None)
name = self.last_name
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# TTL
try:
- ttl = int(token[1], 0)
+ ttl = int(token.value, 0)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
ttl = 0
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
if rdclass == dns.rdataclass.ANY or rdclass == dns.rdataclass.NONE:
deleting = rdclass
except:
rdclass = dns.rdataclass.IN
# Type
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.EOL and token[0] != dns.tokenizer.EOF:
+ if not token.is_eol_or_eof():
self.tok.unget(token)
rd = dns.rdata.from_text(rdclass, rdtype, self.tok, None)
covers = rd.covers()
section = None
while 1:
token = self.tok.get(True, True)
- if token[0] == dns.tokenizer.EOL or token[0] == dns.tokenizer.EOF:
+ if token.is_eol_or_eof():
break
- if token[0] == dns.tokenizer.COMMENT:
- u = token[1].upper()
+ if token.is_comment():
+ u = token.value.upper()
if u == 'HEADER':
line_method = self._header_line
elif u == 'QUESTION' or u == 'ZONE':
length = tok.get_int()
chunks = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- chunks.append(value)
+ chunks.append(token.value)
hex = ''.join(chunks)
data = hex.decode('hex_codec')
if len(data) != length:
# peek at first token
token = tok.get()
tok.unget(token)
- if token[0] == dns.tokenizer.IDENTIFIER and \
- token[1] == r'\#':
+ if token.is_identifier and \
+ token.value == r'\#':
#
# Known type using the generic syntax. Extract the
# wire form from the generic syntax, and then run
@see: RFC 2538"""
__slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate']
-
+
def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate):
super(CERT, self).__init__(rdclass, rdtype)
return "%s %d %s %s" % (certificate_type, self.key_tag,
dns.dnssec.algorithm_to_text(self.algorithm),
dns.rdata._base64ify(self.certificate))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
certificate_type = _ctype_from_text(tok.get_string())
key_tag = tok.get_uint16()
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
certificate = b64.decode('base64_codec')
return cls(rdclass, rdtype, certificate_type, key_tag,
algorithm, certificate)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
self.algorithm)
file.write(prefix)
file.write(self.certificate)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
prefix = wire[current : current + 5]
current += 5
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
key = tok.get_string().decode('base64-codec')
servers = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- server = dns.name.from_text(value, origin)
+ server = dns.name.from_text(token.value, origin)
server.choose_relativity(origin, relativize)
servers.append(server)
return cls(rdclass, rdtype, hit, algorithm, key, servers)
@see: RFC 1183"""
__slots__ = ['address', 'subaddress']
-
+
def __init__(self, rdclass, rdtype, address, subaddress):
super(ISDN, self).__init__(rdclass, rdtype)
self.address = address
dns.rdata._escapify(self.subaddress))
else:
return '"%s"' % dns.rdata._escapify(self.address)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
t = tok.get()
- if t[0] != dns.tokenizer.EOL and t[0] != dns.tokenizer.EOF:
+ if not t.is_eol_or_eof():
tok.unget(t)
subaddress = tok.get_string()
else:
subaddress = ''
tok.get_eol()
return cls(rdclass, rdtype, address, subaddress)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
byte = chr(l)
file.write(byte)
file.write(self.subaddress)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
l = ord(wire[current])
current += 1
__slots__ = ['latitude', 'longitude', 'altitude', 'size',
'horizontal_precision', 'vertical_precision']
-
+
def __init__(self, rdclass, rdtype, latitude, longitude, altitude,
size=1.0, hprec=10000.0, vprec=10.0):
"""Initialize a LOC record instance.
of integers specifying (degrees, minutes, seconds, milliseconds),
or they may be floating point values specifying the number of
degrees. The other parameters are floats."""
-
+
super(LOC, self).__init__(rdclass, rdtype)
if isinstance(latitude, int) or isinstance(latitude, long):
latitude = float(latitude)
self.vertical_precision / 100.0
)
return text
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
latitude = [0, 0, 0, 0]
longitude = [0, 0, 0, 0]
size = 1.0
hprec = 10000.0
vprec = 10.0
-
+
latitude[0] = tok.get_int()
t = tok.get_string()
if t.isdigit():
t = t[0 : -1]
altitude = float(t) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
size = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
hprec = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and ttype != dns.tokenizer.EOF:
+ token = tok.get()
+ if not token.is_eol_or_eof():
+ value = token.value
if value[-1] == 'm':
value = value[0 : -1]
vprec = float(value) * 100.0 # m -> cm
- (ttype, value) = tok.get()
- if ttype != dns.tokenizer.EOL and \
- ttype != dns.tokenizer.EOF:
- raise dns.exception.SyntaxError, \
- "expected EOL or EOF"
+ tok.get_eol()
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
else:
sign = 1
degrees = long(self.latitude[0])
- milliseconds = (degrees * 3600000 +
+ milliseconds = (degrees * 3600000 +
self.latitude[1] * 60000 +
self.latitude[2] * 1000 +
self.latitude[3]) * sign
else:
sign = 1
degrees = long(self.longitude[0])
- milliseconds = (degrees * 3600000 +
+ milliseconds = (degrees * 3600000 +
self.longitude[1] * 60000 +
self.longitude[2] * 1000 +
self.longitude[3]) * sign
wire = struct.pack("!BBBBIII", 0, size, hprec, vprec, latitude,
longitude, altitude)
file.write(wire)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
struct.unpack("!BBBBIII", wire[current : current + rdlen])
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
def _get_float_latitude(self):
@type windows: list of (window number, string) tuples"""
__slots__ = ['next', 'windows']
-
+
def __init__(self, rdclass, rdtype, next, windows):
super(NSEC, self).__init__(rdclass, rdtype)
self.next = next
i * 8 + j))
text += (' ' + ' '.join(bits))
return '%s%s' % (next, text)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
next = tok.get_name()
next = next.choose_relativity(origin, relativize)
rdtypes = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NSEC with bit 0"
if nrdtype > 65535:
bitmap[byte] = chr(ord(bitmap[byte]) | (0x80 >> bit))
windows.append((window, ''.join(bitmap[0:octets])))
return cls(rdclass, rdtype, next, windows)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(chr(window))
file.write(chr(len(bitmap)))
file.write(bitmap)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
(next, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
def choose_relativity(self, origin = None, relativize = True):
self.next = self.next.choose_relativity(origin, relativize)
-
+
def _cmp(self, other):
v = cmp(self.next, other.next)
if v == 0:
next = base64.b32decode(next)
rdtypes = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NSEC3 with bit 0"
if nrdtype > 65535:
'\x00', '\x00', '\x00', '\x00',
'\x00', '\x00', '\x00', '\x00' ]
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if value.isdigit():
- nrdtype = int(value)
+ if token.value.isdigit():
+ nrdtype = int(token.value)
else:
- nrdtype = dns.rdatatype.from_text(value)
+ nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError, "NXT with bit 0"
if nrdtype > 127:
@type address: string (in the standard "dotted quad" format)"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(A, self).__init__(rdclass, rdtype)
# check that it's OK
def to_text(self, origin=None, relativize=True, **kw):
return self.address
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
- (ttype, address) = tok.get()
- if ttype != dns.tokenizer.IDENTIFIER:
- raise dns.exception.SyntaxError
- t = tok.get_eol()
+ address = tok.get_identifier()
+ tok.get_eol()
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(dns.ipv4.inet_aton(self.address))
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.ipv4.inet_ntoa(wire[current : current + rdlen])
return cls(rdclass, rdtype, address)
@type address: string (in the standard IPv6 format)"""
__slots__ = ['address']
-
+
def __init__(self, rdclass, rdtype, address):
super(AAAA, self).__init__(rdclass, rdtype)
# check that it's OK
def to_text(self, origin=None, relativize=True, **kw):
return self.address
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
- (ttype, address) = tok.get()
- if ttype != dns.tokenizer.IDENTIFIER:
- raise dns.exception.SyntaxError
- t = tok.get_eol()
+ address = tok.get_identifier()
+ tok.get_eol()
return cls(rdclass, rdtype, address)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(dns.inet.inet_pton(dns.inet.AF_INET6, self.address))
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.inet.inet_ntop(dns.inet.AF_INET6,
wire[current : current + rdlen])
class APLItem(object):
"""An APL list item.
-
+
@ivar family: the address family (IANA address family registry)
@type family: int
@ivar negation: is this item negated?
"""
__slots__ = ['family', 'negation', 'address', 'prefix']
-
+
def __init__(self, family, negation, address, prefix):
self.family = family
self.negation = negation
@see: RFC 3123"""
__slots__ = ['items']
-
+
def __init__(self, rdclass, rdtype, items):
super(APL, self).__init__(rdclass, rdtype)
self.items = items
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(lambda x: str(x), self.items))
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
items = []
while 1:
- (ttype, item) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
+ item = token.value
if item[0] == '!':
negation = True
item = item[1:]
items.append(item)
return cls(rdclass, rdtype, items)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
for item in self.items:
item.to_wire(file)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
items = []
while 1:
other.to_wire(f)
wire2 = f.getvalue()
f.close()
-
+
return cmp(wire1, wire2)
@see: RFC 4701"""
__slots__ = ['data']
-
+
def __init__(self, rdclass, rdtype, data):
super(DHCID, self).__init__(rdclass, rdtype)
self.data = data
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
data = b64.decode('base64_codec')
return cls(rdclass, rdtype, data)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(self.data)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
data = wire[current : current + rdlen]
return cls(rdclass, rdtype, data)
from_wire = classmethod(from_wire)
def _cmp(self, other):
- return cmp(self.data, other.data)
+ return cmp(self.data, other.data)
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
key = b64.decode('base64_codec')
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
@see: RFC 1035"""
__slots__ = ['address', 'protocol', 'bitmap']
-
+
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super(WKS, self).__init__(rdclass, rdtype)
self.address = address
bits.append(str(i * 8 + j))
text = ' '.join(bits)
return '%s %d %s' % (self.address, self.protocol, text)
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
- (ttype, value) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if value.isdigit():
- serv = int(value)
+ if token.value.isdigit():
+ serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError, "protocol must be TCP or UDP"
protocol_text = "udp"
else:
protocol_text = "tcp"
- serv = socket.getservbyname(value, protocol_text)
+ serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
protocol = struct.pack('!B', self.protocol)
file.write(protocol)
file.write(self.bitmap)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
address = dns.ipv4.inet_ntoa(wire[current : current + 4])
protocol, = struct.unpack('!B', wire[current + 4 : current + 5])
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
digest = ''.join(chunks)
digest = digest.decode('hex_codec')
return cls(rdclass, rdtype, key_tag, algorithm, digest_type,
'IPSEC' : 4,
'ALL' : 255,
}
-
+
class KEYBase(dns.rdata.Rdata):
"""KEY-like record base
@type key: string"""
__slots__ = ['flags', 'protocol', 'algorithm', 'key']
-
+
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super(KEYBase, self).__init__(rdclass, rdtype)
self.flags = flags
if protocol is None:
raise dns.exception.SyntaxError, \
'unknown protocol %s' % protocol
-
+
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
key = b64.decode('base64_codec')
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!HBB", self.flags, self.protocol, self.algorithm)
file.write(header)
file.write(self.key)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
if rdlen < 4:
raise dns.exception.FormError
def posixtime_to_sigtime(what):
return time.strftime('%Y%m%d%H%M%S', time.gmtime(what))
-
+
class SIGBase(dns.rdata.Rdata):
"""SIG-like record base
__slots__ = ['type_covered', 'algorithm', 'labels', 'original_ttl',
'expiration', 'inception', 'key_tag', 'signer',
'signature']
-
+
def __init__(self, rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature):
def covers(self):
return self.type_covered
-
+
def to_text(self, origin=None, relativize=True, **kw):
return '%s %d %d %d %s %s %d %s %s' % (
dns.rdatatype.to_text(self.type_covered),
chunks = []
while 1:
t = tok.get()
- if t[0] == dns.tokenizer.EOL or t[0] == dns.tokenizer.EOF:
+ if t.is_eol_or_eof():
break
- if t[0] != dns.tokenizer.IDENTIFIER:
+ if not t.is_identifier():
raise dns.exception.SyntaxError
- chunks.append(t[1])
+ chunks.append(t.value)
b64 = ''.join(chunks)
signature = b64.decode('base64_codec')
return cls(rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(header)
self.signer.to_wire(file, None, origin)
file.write(self.signature)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
header = struct.unpack('!HBBIIIH', wire[current : current + 18])
current += 18
def choose_relativity(self, origin = None, relativize = True):
self.signer = self.signer.choose_relativity(origin, relativize)
-
+
def _cmp(self, other):
hs = struct.pack('!HBBIIIH', self.type_covered,
self.algorithm, self.labels,
@see: RFC 1035"""
__slots__ = ['strings']
-
+
def __init__(self, rdclass, rdtype, strings):
super(TXTBase, self).__init__(rdclass, rdtype)
if isinstance(strings, str):
txt += '%s"%s"' % (prefix, dns.rdata._escapify(s))
prefix = ' '
return txt
-
+
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
strings = []
while 1:
- (ttype, s) = tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = tok.get()
+ if token.is_eol_or_eof():
break
- if ttype != dns.tokenizer.QUOTED_STRING and \
- ttype != dns.tokenizer.IDENTIFIER:
+ if not (token.is_quoted_string() or token.is_identifier()):
raise dns.exception.SyntaxError, "expected a string"
- if len(s) > 255:
+ if len(token.value) > 255:
raise dns.exception.SyntaxError, "string too long"
- strings.append(s)
+ strings.append(token.value)
if len(strings) == 0:
raise dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
-
+
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
byte = chr(l)
file.write(byte)
file.write(s)
-
+
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
strings = []
while rdlen > 0:
"""Raised when an attempt is made to unget a token when the unget
buffer is full."""
pass
-
+
+class Token(object):
+ """A DNS master file format token.
+
+ @ivar ttype: The token type
+ @type ttype: int
+ @ivar value: The token value
+ @type value: string
+ """
+
+ def __init__(self, ttype, value='', has_escape=False):
+ """Initialize a token instance.
+
+ @param ttype: The token type
+ @type ttype: int
+ @ivar value: The token value
+ @type value: string
+ @ivar has_escape: Does the token value contain escapes?
+ @type has_escape: bool
+ """
+ self.ttype = ttype
+ self.value = value
+ self.has_escape = has_escape
+
+ def is_eof(self):
+ return self.ttype == EOF
+
+ def is_eol(self):
+ return self.ttype == EOL
+
+ def is_whitespace(self):
+ return self.ttype == WHITESPACE
+
+ def is_identifier(self):
+ return self.ttype == IDENTIFIER
+
+ def is_quoted_string(self):
+ return self.ttype == QUOTED_STRING
+
+ def is_comment(self):
+ return self.ttype == COMMENT
+
+ def is_delimiter(self):
+ return self.ttype == DELIMITER
+
+ def is_eol_or_eof(self):
+ return (self.ttype == EOL or self.ttype == EOF)
+
+ def __eq__(self, other):
+ if not isinstance(other, Token):
+ return False
+ return (self.ttype == other.ttype and
+ self.value == other.value)
+
+ def __ne__(self, other):
+ if not isinstance(other, Token):
+ return True
+ return (self.ttype != other.ttype or
+ self.value != other.value)
+
+ def __str__(self):
+ return '%d "%s"' % (self.ttype, self.value)
+
class Tokenizer(object):
"""A DNS master file format tokenizer.
A token is a (type, value) tuple, where I{type} is an int, and
I{value} is a string. The valid types are EOF, EOL, WHITESPACE,
IDENTIFIER, QUOTED_STRING, COMMENT, and DELIMITER.
-
+
@ivar file: The file to tokenize
@type file: file
@ivar ungotten_char: The most recently ungotten character, or None.
@ivar filename: A filename that will be returned by the L{where} method.
@type filename: string
"""
-
+
def __init__(self, f=sys.stdin, filename=None):
"""Initialize a tokenizer instance.
will return.
@type filename: string
"""
-
+
if isinstance(f, str):
f = cStringIO.StringIO(f)
if filename is None:
"""Read a character from input.
@rtype: string
"""
-
+
if self.ungotten_char is None:
if self.eof:
c = ''
@rtype: (string, int) tuple. The first item is the filename of
the input, the second is the current line number.
"""
-
+
return (self.filename, self.line_number)
-
+
def _unget_char(self, c):
"""Unget a character.
The unget buffer for characters is only one character large; it is
an error to try to unget a character when the unget buffer is not
empty.
-
+
@param c: the character to unget
@type c: string
@raises UngetBufferFull: there is already an ungotten char
"""
-
+
if not self.ungotten_char is None:
raise UngetBufferFull
self.ungotten_char = c
@rtype: int
"""
-
+
skipped = 0
while True:
c = self._get_char()
@param want_comment: If True, return a COMMENT token if the
first token read is a comment. The default is False.
@type want_comment: bool
- @rtype: (int, string) tuple
+ @rtype: Token object
@raises dns.exception.UnexpectedEnd: input ended prematurely
@raises dns.exception.SyntaxError: input was badly formed
"""
-
+
if not self.ungotten_token is None:
token = self.ungotten_token
self.ungotten_token = None
- if token[0] == WHITESPACE:
+ if token.is_whitespace():
if want_leading:
return token
- elif token[0] == COMMENT:
+ elif token.is_comment():
if want_comment:
return token
else:
return token
skipped = self.skip_whitespace()
if want_leading and skipped > 0:
- return (WHITESPACE, ' ')
+ return Token(WHITESPACE, ' ')
token = ''
ttype = IDENTIFIER
while True:
self.skip_whitespace()
continue
elif c == '\n':
- return (EOL, '\n')
+ return Token(EOL, '\n')
elif c == ';':
while 1:
c = self._get_char()
token += c
if want_comment:
self._unget_char(c)
- return (COMMENT, token)
+ return Token(COMMENT, token)
elif c == '':
if self.multiline:
raise dns.exception.SyntaxError, \
'unbalanced parentheses'
- return (EOF, '')
+ return Token(EOF)
elif self.multiline:
self.skip_whitespace()
token = ''
continue
else:
- return (EOL, '\n')
+ return Token(EOL, '\n')
else:
# This code exists in case we ever want a
# delimiter to be returned. It never produces
raise dns.exception.SyntaxError, 'newline in quoted string'
elif c == '\\':
#
- # Treat \ followed by a delimiter as the
+ # Treat \ followed by a delimiter as the
# delimiter, otherwise leave it alone.
#
c = self._get_char()
if self.multiline:
raise dns.exception.SyntaxError, 'unbalanced parentheses'
ttype = EOF
- return (ttype, token)
+ return Token(ttype, token)
def unget(self, token):
"""Unget a token.
The unget buffer for tokens is only one token large; it is
an error to try to unget a token when the unget buffer is not
empty.
-
+
@param token: the token to unget
- @type token: (int, string) token tuple
+ @type token: Token object
@raises UngetBufferFull: there is already an ungotten token
"""
"""Return the next item in an iteration.
@rtype: (int, string)
"""
-
+
token = self.get()
- if token[0] == EOF:
+ if token.is_eof():
raise StopIteration
return token
def get_int(self):
"""Read the next token and interpret it as an integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
- (ttype, value) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- if not value.isdigit():
+ if not token.value.isdigit():
raise dns.exception.SyntaxError, 'expecting an integer'
- return int(value)
+ return int(token.value)
def get_uint8(self):
"""Read the next token and interpret it as an 8-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
+
value = self.get_int()
if value < 0 or value > 255:
raise dns.exception.SyntaxError, \
def get_uint16(self):
"""Read the next token and interpret it as a 16-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
+
value = self.get_int()
if value < 0 or value > 65535:
raise dns.exception.SyntaxError, \
def get_uint32(self):
"""Read the next token and interpret it as a 32-bit unsigned
integer.
-
+
@raises dns.exception.SyntaxError:
@rtype: int
"""
-
- (ttype, value) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- if not value.isdigit():
+ if not token.value.isdigit():
raise dns.exception.SyntaxError, 'expecting an integer'
- value = long(value)
+ value = long(token.value)
if value < 0 or value > 4294967296L:
raise dns.exception.SyntaxError, \
'%d is not an unsigned 32-bit integer' % value
def get_string(self, origin=None):
"""Read the next token and interpret it as a string.
-
+
@raises dns.exception.SyntaxError:
@rtype: string
"""
-
- (ttype, t) = self.get()
- if ttype != IDENTIFIER and ttype != QUOTED_STRING:
+
+ token = self.get()
+ if not (token.is_identifier() or token.is_quoted_string()):
raise dns.exception.SyntaxError, 'expecting a string'
- return t
+ return token.value
+
+ def get_identifier(self, origin=None):
+ """Read the next token and raise an exception if it is not an identifier.
+
+ @raises dns.exception.SyntaxError:
+ @rtype: string
+ """
+
+ token = self.get()
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError, 'expecting an identifier'
+ return token.value
def get_name(self, origin=None):
"""Read the next token and interpret it as a DNS name.
-
+
@raises dns.exception.SyntaxError:
@rtype: dns.name.Name object"""
-
- (ttype, t) = self.get()
- if ttype != IDENTIFIER:
+
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- return dns.name.from_text(t, origin)
+ return dns.name.from_text(token.value, origin)
def get_eol(self):
"""Read the next token and raise an exception if it isn't EOL or
@raises dns.exception.SyntaxError:
@rtype: string
"""
-
- (ttype, t) = self.get()
- if ttype != EOL and ttype != EOF:
+
+ token = self.get()
+ if not token.is_eol_or_eof():
raise dns.exception.SyntaxError, \
- 'expected EOL or EOF, got %d "%s"' % (ttype, t)
- return t
+ 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)
+ return token.value
def get_ttl(self):
- (ttype, t) = self.get()
- if ttype != IDENTIFIER:
+ token = self.get()
+ if not token.is_identifier():
raise dns.exception.SyntaxError, 'expecting an identifier'
- return dns.ttl.from_text(t)
+ return dns.ttl.from_text(token.value)
dns.name.Name object, or it may be a string. In the either case,
if the name is relative it is treated as relative to the origin of
the zone.
-
+
@ivar rdclass: The zone's rdata class; the default is class IN.
@type rdclass: int
@ivar origin: The origin of the zone.
node_factory = dns.node.Node
__slots__ = ['rdclass', 'origin', 'nodes', 'relativize']
-
+
def __init__(self, origin, rdclass=dns.rdataclass.IN, relativize=True):
"""Initialize a zone object.
nodes.
@rtype: bool
"""
-
+
if not isinstance(other, Zone):
return False
if self.rdclass != other.rdclass or \
"""Are two zones not equal?
@rtype: bool
"""
-
+
return not self.__eq__(other)
def _validate_name(self, name):
if self.relativize:
name = name.relativize(self.origin)
return name
-
+
def __getitem__(self, key):
key = self._validate_name(key)
return self.nodes[key]
@raises KeyError: the name is not known and create was not specified.
@rtype: dns.node.Node object
"""
-
+
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None:
This method is like L{find_node}, except it returns None instead
of raising an exception if the node does not exist and creation
has not been requested.
-
+
@param name: the name of the node to find
@type name: dns.name.Name object or string
@param create: should the node be created if it doesn't exist?
It is not an error if the node does not exist.
"""
-
+
name = self._validate_name(name)
if self.nodes.has_key(name):
del self.nodes[name]
-
+
def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
create=False):
"""Look for rdata with the specified name and type in the zone,
The rdataset returned is not a copy; changes to it will change
the zone.
-
+
KeyError is raised if the name or type are not found.
Use L{get_rdataset} if you want to have None returned instead.
The rdataset returned is not a copy; changes to it will change
the zone.
-
+
None is returned if the name or type are not found.
Use L{find_rdataset} if you want to have KeyError raised instead.
def replace_rdataset(self, name, replacement):
"""Replace an rdataset at name.
-
+
It is not an error if there is no rdataset matching I{replacement}.
Ownership of the I{replacement} object is transferred to the zone;
The I{name}, I{rdtype}, and I{covers} parameters may be
strings, in which case they will be converted to their proper
type.
-
+
This method is less efficient than the similar
L{find_rdataset} because it creates an RRset instead of
returning the matching rdataset. It may be more convenient
The I{name}, I{rdtype}, and I{covers} parameters may be
strings, in which case they will be converted to their proper
type.
-
+
This method is less efficient than the similar L{get_rdataset}
because it creates an RRset instead of returning the matching
rdataset. It may be more convenient for some uses since it
@param covers: the covered type (defaults to None)
@type covers: int or string
"""
-
+
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
@param covers: the covered type (defaults to None)
@type covers: int or string
"""
-
+
if isinstance(rdtype, str):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
def to_file(self, f, sorted=True, relativize=True, nl=None):
"""Write a zone to a file.
-
+
@param f: file or string. If I{f} is a string, it is treated
as the name of a file to open.
@param sorted: if True, the file will be written with the
def _eat_line(self):
while 1:
- (ttype, t) = self.tok.get()
- if ttype == dns.tokenizer.EOL or ttype == dns.tokenizer.EOF:
+ token = self.tok.get()
+ if token.is_eol_or_eof():
break
-
+
def _rr_line(self):
"""Process one line from a DNS master file."""
# Name
if self.current_origin is None:
raise UnknownOrigin
token = self.tok.get(want_leading = True)
- if token[0] != dns.tokenizer.WHITESPACE:
- self.last_name = dns.name.from_text(token[1], self.current_origin)
+ if not token.is_whitespace():
+ self.last_name = dns.name.from_text(token.value, self.current_origin)
else:
token = self.tok.get()
- if token[0] == dns.tokenizer.EOL or \
- token[0] == dns.tokenizer.EOF:
+ if token.is_eol_or_eof():
# treat leading WS followed by EOL/EOF as if they were EOL/EOF.
return
self.tok.unget(token)
if self.relativize:
name = name.relativize(self.zone.origin)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
# TTL
try:
- ttl = dns.ttl.from_text(token[1])
+ ttl = dns.ttl.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.ttl.BadTTL:
ttl = self.ttl
# Class
try:
- rdclass = dns.rdataclass.from_text(token[1])
+ rdclass = dns.rdataclass.from_text(token.value)
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError
except dns.exception.SyntaxError:
raise dns.exception.SyntaxError
raise dns.exception.SyntaxError, "RR class is not zone's class"
# Type
try:
- rdtype = dns.rdatatype.from_text(token[1])
+ rdtype = dns.rdatatype.from_text(token.value)
except:
raise dns.exception.SyntaxError, \
- "unknown rdatatype '%s'" % token[1]
+ "unknown rdatatype '%s'" % token.value
n = self.zone.nodes.get(name)
if n is None:
n = self.zone.node_factory()
# correct, but it is correct almost all of the time.
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
-
(ty, va) = sys.exc_info()[:2]
raise dns.exception.SyntaxError, \
"caught exception %s: %s" % (str(ty), str(va))
try:
while 1:
token = self.tok.get(True, True)
- if token[0] == dns.tokenizer.EOF:
+ if token.is_eof():
if not self.current_file is None:
self.current_file.close()
if len(self.saved_state) > 0:
self.ttl) = self.saved_state.pop(-1)
continue
break
- elif token[0] == dns.tokenizer.EOL:
+ elif token.is_eol():
continue
- elif token[0] == dns.tokenizer.COMMENT:
+ elif token.is_comment():
self.tok.get_eol()
continue
- elif token[1][0] == '$':
- u = token[1].upper()
+ elif token.value[0] == '$':
+ u = token.value.upper()
if u == '$TTL':
token = self.tok.get()
- if token[0] != dns.tokenizer.IDENTIFIER:
+ if not token.is_identifier():
raise dns.exception.SyntaxError, "bad $TTL"
- self.ttl = dns.ttl.from_text(token[1])
+ self.ttl = dns.ttl.from_text(token.value)
self.tok.get_eol()
elif u == '$ORIGIN':
self.current_origin = self.tok.get_name()
self.zone.origin = self.current_origin
elif u == '$INCLUDE' and self.allow_include:
token = self.tok.get()
- if token[0] != dns.tokenizer.QUOTED_STRING:
+ if not token.is_quoted_string():
raise dns.exception.SyntaxError, \
"bad filename in $INCLUDE"
- filename = token[1]
+ filename = token.value
token = self.tok.get()
- if token[0] == dns.tokenizer.IDENTIFIER:
- new_origin = dns.name.from_text(token[1], \
- self.current_origin)
+ if token.is_identifier():
+ new_origin = dns.name.from_text(token.value, \
+ self.current_origin)
self.tok.get_eol()
- elif token[0] != dns.tokenizer.EOL and \
- token[0] != dns.tokenizer.EOF:
+ elif not token.is_eol_or_eof():
raise dns.exception.SyntaxError, \
"bad origin in $INCLUDE"
else:
detail = "syntax error"
raise dns.exception.SyntaxError, \
"%s:%d: %s" % (filename, line_number, detail)
-
+
# Now that we're done reading, do some basic checking of the zone.
if self.check_origin:
self.zone.check_origin()
if filename is None:
filename = '<file>'
want_close = False
-
+
try:
z = from_text(f, origin, rdclass, relativize, zone_factory,
filename, allow_include, check_origin)
def from_xfr(xfr, zone_factory=Zone, relativize=True):
"""Convert the output of a zone transfer generator into a zone object.
-
+
@param xfr: The xfr generator
@type xfr: generator of dns.message.Message objects
@param relativize: should names be relativized? The default is True.
@raises dns.zone.NoNS: No NS RRset was found at the zone origin
@rtype: dns.zone.Zone object
"""
-
+
z = None
for r in xfr:
if z is None:
import dns.exception
import dns.tokenizer
+Token = dns.tokenizer.Token
+
class TokenizerTestCase(unittest.TestCase):
-
+
def testQuotedString1(self):
tok = dns.tokenizer.Tokenizer(r'"foo"')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == 'foo')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo'))
def testQuotedString2(self):
tok = dns.tokenizer.Tokenizer(r'""')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == '')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, ''))
def testQuotedString3(self):
tok = dns.tokenizer.Tokenizer(r'"\"foo\""')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == '"foo"')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, '"foo"'))
def testQuotedString4(self):
tok = dns.tokenizer.Tokenizer(r'"foo\010bar"')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.QUOTED_STRING and
- value == 'foo\x0abar')
+ token = tok.get()
+ self.failUnless(token == Token(dns.tokenizer.QUOTED_STRING, 'foo\x0abar'))
def testQuotedString5(self):
def bad():
tok = dns.tokenizer.Tokenizer(r'"foo')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.UnexpectedEnd, bad)
def testQuotedString6(self):
def bad():
tok = dns.tokenizer.Tokenizer(r'"foo\01')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.SyntaxError, bad)
def testQuotedString7(self):
def bad():
tok = dns.tokenizer.Tokenizer('"foo\nbar"')
- (ttype, value) = tok.get()
+ token = tok.get()
self.failUnlessRaises(dns.exception.SyntaxError, bad)
def testEmpty1(self):
tok = dns.tokenizer.Tokenizer('')
- (ttype, value) = tok.get()
- self.failUnless(ttype == dns.tokenizer.EOF)
+ token = tok.get()
+ self.failUnless(token.is_eof())
def testEmpty2(self):
tok = dns.tokenizer.Tokenizer('')
- (ttype1, value1) = tok.get()
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOF and
- ttype2 == dns.tokenizer.EOF)
+ token1 = tok.get()
+ token2 = tok.get()
+ self.failUnless(token1.is_eof() and token2.is_eof())
def testEOL(self):
tok = dns.tokenizer.Tokenizer('\n')
- (ttype1, value1) = tok.get()
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL and
- ttype2 == dns.tokenizer.EOF)
+ token1 = tok.get()
+ token2 = tok.get()
+ self.failUnless(token1.is_eol() and token2.is_eof())
def testWS1(self):
tok = dns.tokenizer.Tokenizer(' \n')
- (ttype1, value1) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL)
+ token1 = tok.get()
+ self.failUnless(token1.is_eol())
def testWS2(self):
tok = dns.tokenizer.Tokenizer(' \n')
- (ttype1, value1) = tok.get(want_leading=True)
- self.failUnless(ttype1 == dns.tokenizer.WHITESPACE)
+ token1 = tok.get(want_leading=True)
+ self.failUnless(token1.is_whitespace())
def testComment1(self):
tok = dns.tokenizer.Tokenizer(' ;foo\n')
- (ttype1, value1) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.EOL)
+ token1 = tok.get()
+ self.failUnless(token1.is_eol())
def testComment2(self):
tok = dns.tokenizer.Tokenizer(' ;foo\n')
- (ttype1, value1) = tok.get(want_comment = True)
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.COMMENT and
- value1 == 'foo' and
- ttype2 == dns.tokenizer.EOL)
+ token1 = tok.get(want_comment = True)
+ token2 = tok.get()
+ self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo') and
+ token2.is_eol())
def testComment3(self):
tok = dns.tokenizer.Tokenizer(' ;foo bar\n')
- (ttype1, value1) = tok.get(want_comment = True)
- (ttype2, value2) = tok.get()
- self.failUnless(ttype1 == dns.tokenizer.COMMENT and
- value1 == 'foo bar' and
- ttype2 == dns.tokenizer.EOL)
+ token1 = tok.get(want_comment = True)
+ token2 = tok.get()
+ self.failUnless(token1 == Token(dns.tokenizer.COMMENT, 'foo bar') and
+ token2.is_eol())
def testMultiline1(self):
tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)')
tokens = list(iter(tok))
- self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'),
- (dns.tokenizer.IDENTIFIER, 'bar')])
+ self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'),
+ Token(dns.tokenizer.IDENTIFIER, 'bar')])
def testMultiline2(self):
tok = dns.tokenizer.Tokenizer('( foo\n\n bar\n)\n')
tokens = list(iter(tok))
- self.failUnless(tokens == [(dns.tokenizer.IDENTIFIER, 'foo'),
- (dns.tokenizer.IDENTIFIER, 'bar'),
- (dns.tokenizer.EOL, '\n')])
+ self.failUnless(tokens == [Token(dns.tokenizer.IDENTIFIER, 'foo'),
+ Token(dns.tokenizer.IDENTIFIER, 'bar'),
+ Token(dns.tokenizer.EOL, '\n')])
def testMultiline3(self):
def bad():
tok = dns.tokenizer.Tokenizer('foo)')
t1 = tok.get()
tok.unget(t1)
t2 = tok.get()
- self.failUnless(t1 == t2 and t1 == (dns.tokenizer.IDENTIFIER, 'foo'))
+ self.failUnless(t1 == t2 and t1.ttype == dns.tokenizer.IDENTIFIER and \
+ t1.value == 'foo')
def testUnget2(self):
def bad():
def testEscapedDelimiter1(self):
tok = dns.tokenizer.Tokenizer(r'ch\ ld')
t = tok.get()
- self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch ld'))
+ self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch ld')
def testEscapedDelimiter2(self):
tok = dns.tokenizer.Tokenizer(r'ch\0ld')
t = tok.get()
- self.failUnless(t == (dns.tokenizer.IDENTIFIER, r'ch\0ld'))
+ self.failUnless(t.ttype == dns.tokenizer.IDENTIFIER and t.value == r'ch\0ld')
if __name__ == '__main__':
unittest.main()