"""A TSIG with an unknown key was received."""
+class Truncated(dns.exception.DNSException):
+ """The truncated flag is set."""
+
+
#: The question section number
QUESTION = 0
self.current = 12
if dns.opcode.is_update(self.message.flags):
self.updating = True
+ if self.message.flags & dns.flags.TC:
+ raise Truncated
self._get_question(qcount)
if self.question_only:
return
Raises ``dns.message.BadTSIG`` if a TSIG record was not the last
record of the additional data section.
+ Raises ``dns.message.Truncated`` if the TC flag is set.
+
Returns a ``dns.message.Message``.
"""
source=source,
source_port=source_port)
else:
- response = dns.query.udp(request, nameserver,
- timeout, port,
- source=source,
- source_port=source_port)
- if response.flags & dns.flags.TC:
+ try:
+ response = dns.query.udp(request, nameserver,
+ timeout, port,
+ source=source,
+ source_port=\
+ source_port)
+ except dns.message.Truncated:
# Response truncated; retry with TCP.
tcp_attempt = True
timeout = self._compute_timeout(start, lifetime)
dns.rdatatype.SOA)
self.failUnless(rrs1 == rrs2)
+ def test_CleanTruncated(self):
+ def bad():
+ a = dns.message.from_text(answer_text)
+ a.flags |= dns.flags.TC
+ wire = a.to_wire(want_shuffle=False)
+ dns.message.from_wire(wire)
+ self.failUnlessRaises(dns.message.Truncated, bad)
+
+ def test_MessyTruncated(self):
+ def bad():
+ a = dns.message.from_text(answer_text)
+ a.flags |= dns.flags.TC
+ wire = a.to_wire(want_shuffle=False)
+ dns.message.from_wire(wire[:-3])
+ self.failUnlessRaises(dns.message.Truncated, bad)
+
if __name__ == '__main__':
unittest.main()