def __repr__(self):
return '<DNS message, ID ' + `self.id` + '>'
-
+
def __str__(self):
return self.to_text()
@rtype: string
"""
-
+
s = cStringIO.StringIO()
print >> s, 'id %d' % self.id
print >> s, 'opcode %s' % \
covers=dns.rdatatype.NONE, deleting=None, create=False,
force_unique=False):
"""Find the RRset with the given attributes in the specified section.
-
+
@param section: the section of the message to look in, e.g.
self.answer.
@type section: list of dns.rrset.RRset objects
"""Get the RRset with the given attributes in the specified section.
If the RRset is not found, None is returned.
-
+
@param section: the section of the message to look in, e.g.
self.answer.
@type section: list of dns.rrset.RRset objects
Additional keyword arguments are passed to the rrset to_wire()
method.
-
+
@param origin: The origin to be appended to any relative names.
@type origin: dns.name.Name object
@param max_size: The maximum size of the wire format output; default
tsig_error=0, other_data=''):
"""When sending, a TSIG signature using the specified keyring
and keyname should be added.
-
+
@param keyring: The TSIG keyring to use; defaults to None.
@type keyring: dict
@param keyname: The name of the TSIG key to use; defaults to None.
ednsflags = 0
payload = 0
request_payload = 0
+ else:
+ # make sure the EDNS version in ednsflags agrees with edns
+ ednsflags &= 0xFF00FFFFL
+ ednsflags |= (edns << 16)
self.edns = edns
self.ednsflags = ednsflags
self.payload = payload
(value, evalue) = dns.rcode.to_flags(rcode)
self.flags &= 0xFFF0
self.flags |= value
- self.ednsflags &= 0xFF000000L
+ self.ednsflags &= 0x00FFFFFFL
self.ednsflags |= evalue
if self.ednsflags != 0 and self.edns < 0:
self.edns = 0
DNS dynamic updates.
@type zone_rdclass: int
"""
-
+
def __init__(self, wire, message, question_only=False):
self.wire = wire
self.message = message
self.updating = False
self.zone_rdclass = dns.rdataclass.IN
self.question_only = question_only
-
+
def _get_question(self, qcount):
"""Read the next I{qcount} records from the wire data and add them to
the question section.
if self.updating and qcount > 1:
raise dns.exception.FormError
-
+
for i in xrange(0, qcount):
(qname, used) = dns.name.from_wire(self.wire, self.current)
if not self.message.origin is None:
force_unique=True)
if self.updating:
self.zone_rdclass = rdclass
-
+
def _get_section(self, section, count):
"""Read the next I{count} records from the wire data and add them to
the specified section.
@type section: list of dns.rrset.RRset objects
@param count: the number of records to read
@type count: int"""
-
+
if self.updating:
force_unique = True
else:
if secret is None:
raise UnknownTSIGKey, "key '%s' unknown" % name
self.message.tsig_ctx = \
- dns.tsig.validate(self.wire,
+ dns.tsig.validate(self.wire,
name,
secret,
int(time.time()),
def read(self):
"""Read a wire format DNS message and build a dns.message.Message
object."""
-
+
l = len(self.wire)
if l < 12:
raise ShortHeader
reader.read()
return m
-
+
class _TextReader(object):
"""Text format reader.
-
+
@ivar tok: the tokenizer
@type tok: dns.tokenizer.Tokenizer object
@ivar message: The message object being built
def _header_line(self, section):
"""Process one line from the text format header section."""
-
+
(ttype, what) = self.tok.get()
if what == 'id':
self.message.id = self.tok.get_int()
def _question_line(self, section):
"""Process one line from the text format question section."""
-
+
token = self.tok.get(want_leading = True)
if token[0] != dns.tokenizer.WHITESPACE:
self.last_name = dns.name.from_text(token[1], None)
"""Process one line from the text format answer, authority, or
additional data sections.
"""
-
+
deleting = None
# Name
token = self.tok.get(want_leading = True)
continue
self.tok.unget(token)
line_method(section)
-
+
def from_text(text):
"""Convert the text format message into a message object.
# 'text' can also be a file, but we don't publish that fact
# since it's an implementation detail. The official file
# interface is from_file().
-
+
m = Message()
reader = _TextReader(text, m)
reader.read()
-
+
return m
-
+
def from_file(f):
"""Read the next text format message from the specified file.
The query will have a randomly choosen query id, and its DNS flags
will be set to dns.flags.RD.
-
+
@param qname: The query name.
@type qname: dns.name.Name object or string
@param rdtype: The desired rdata type.
@param want_dnssec: Should the query indicate that DNSSEC is desired?
@type want_dnssec: bool
@rtype: dns.message.Message object"""
-
+
if isinstance(qname, (str, unicode)):
qname = dns.name.from_text(qname)
if isinstance(rdtype, str):
The response's question section is a shallow copy of the query's
question section, so the query's question RRsets should not be
changed.
-
+
@param query: the query to respond to
@type query: dns.message.Message object
@param recursion_available: should RA be set in the response?
class and its to_wire() method to generate wire-format messages.
This class is for those applications which need finer control
over the generation of messages.
-
+
Typical use::
r = dns.renderer.Renderer(id=1, flags=0x80, max_size=512)
@param origin: the origin to use when rendering relative names
@type origin: dns.name.Namem or None.
"""
-
+
self.output = cStringIO.StringIO()
if id is None:
self.id = random.randint(0, 65535)
@param where: the offset
@type where: int
"""
-
+
self.output.seek(where)
self.output.truncate()
keys_to_delete = []
@raises dns.exception.FormError: an attempt was made to set
a section value less than the current section.
"""
-
+
if self.section != section:
if self.section > section:
raise dns.exception.FormError
@param rdclass: the question rdata class
@type rdclass: int
"""
-
+
self._set_section(QUESTION)
before = self.output.tell()
qname.to_wire(self.output, self.compress, self.origin)
self._rollback(before)
raise dns.exception.TooBig
self.counts[QUESTION] += 1
-
+
def add_rrset(self, section, rrset, **kw):
"""Add the rrset to the specified section.
Any keyword arguments are passed on to the rdataset's to_wire()
routine.
-
+
@param section: the section
@type section: int
@param rrset: the rrset
@param rdataset: the rdataset
@type rdataset: dns.rdataset.Rdataset object
"""
-
+
self._set_section(section)
before = self.output.tell()
n = rdataset.to_wire(name, self.output, self.compress, self.origin,
@see: RFC 2671
"""
+ # make sure the EDNS version in ednsflags agrees with edns
+ ednsflags &= 0xFF00FFFFL
+ ednsflags |= (edns << 16)
self._set_section(ADDITIONAL)
before = self.output.tell()
self.output.write(struct.pack('!BHHIH', 0, dns.rdatatype.OPT, payload,
def add_tsig(self, keyname, secret, fudge, id, tsig_error, other_data,
request_mac):
"""Add a TSIG signature to the message.
-
+
@param keyname: the TSIG key name
@type keyname: dns.name.Name object
@param secret: the secret to use
had the specified MAC.
@type request_mac: string
"""
-
+
self._set_section(ADDITIONAL)
before = self.output.tell()
s = self.output.getvalue()
have been rendered, but before the optional TSIG signature
is added.
"""
-
+
self.output.seek(0)
self.output.write(struct.pack('!HHHHHH', self.id, self.flags,
self.counts[0], self.counts[1],
@rtype: string
"""
-
+
return self.output.getvalue()