multi: bool = False,
tsig_ctx: Optional[Any] = None,
prepend_length: bool = False,
+ prefer_truncation: bool = False,
**kw: Dict[str, Any],
) -> bytes:
"""Return a string containing the message in DNS compressed wire
wants the message length prepended to the message itself. This is
useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ).
+ *prefer_truncation*, a ``bool``, should be set to ``True`` if the caller
+ wants the message to be truncated if it would otherwise exceed the
+ maximum length. If the truncation occurs before the additional section,
+ the TC bit will be set.
+
Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
Returns a ``bytes``.
r.reserve(opt_reserve)
tsig_reserve = self._compute_tsig_reserve()
r.reserve(tsig_reserve)
- for rrset in self.question:
- r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
- for rrset in self.answer:
- r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
- for rrset in self.authority:
- r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
- for rrset in self.additional:
- r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
+ try:
+ for rrset in self.question:
+ r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
+ for rrset in self.answer:
+ r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
+ for rrset in self.authority:
+ r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
+ for rrset in self.additional:
+ r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
+ except dns.exception.TooBig:
+ if prefer_truncation:
+ if r.section < dns.renderer.ADDITIONAL:
+ r.flags |= dns.flags.TC
+ else:
+ raise
r.release_reserved()
if self.opt is not None:
r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
self.assertIsNotNone(q2.tsig)
self.assertEqual(q, q2)
+ def test_prefer_truncation_answer(self):
+ q = dns.message.make_query("www.example", "a")
+ rrs = [
+ dns.rrset.from_text("www.example.", 3600, "in", "a", f"1.2.3.{n}")
+ for n in range(32)
+ ]
+ r = dns.message.make_response(q)
+ r.answer.extend(rrs)
+
+ # Normally, we get an exception
+ with self.assertRaises(dns.exception.TooBig):
+ w1 = r.to_wire(max_size=512)
+
+ # With prefer_truncation, we get a truncated response where 1 record
+ # doesn't fit, and TC is set.
+ w2 = r.to_wire(max_size=512, prefer_truncation=True)
+ r2 = dns.message.from_wire(w2, one_rr_per_rrset=True)
+ self.assertNotEqual(r2.flags & dns.flags.TC, 0)
+ self.assertEqual(len(r2.answer), 30)
+
+ def test_prefer_truncation_edns(self):
+ q = dns.message.make_query("www.example", "a", payload=512)
+ rrs = [
+ dns.rrset.from_text("www.example.", 3600, "in", "a", f"1.2.3.{n}")
+ for n in range(32)
+ ]
+ r = dns.message.make_response(q)
+ r.answer.extend(rrs)
+
+ # Normally, we get an exception
+ with self.assertRaises(dns.exception.TooBig):
+ w1 = r.to_wire(max_size=512)
+
+ # With prefer_truncation, we get a truncated response where 2 records
+ # don't fit, and TC is set.
+ w2 = r.to_wire(max_size=512, prefer_truncation=True)
+ r2 = dns.message.from_wire(w2, one_rr_per_rrset=True)
+ self.assertNotEqual(r2.flags & dns.flags.TC, 0)
+ self.assertEqual(len(r2.answer), 29)
+
+ def test_prefer_truncation_additional(self):
+ q = dns.message.make_query("www.example", "a")
+ rrs = [
+ dns.rrset.from_text("www.example.", 3600, "in", "a", f"1.2.3.{n}")
+ for n in range(32)
+ ]
+ r = dns.message.make_response(q)
+ r.additional.extend(rrs)
+
+ # Normally, we get an exception
+ with self.assertRaises(dns.exception.TooBig):
+ w1 = r.to_wire(max_size=512)
+
+ # With prefer_truncation, we get a truncated response where 1 record
+ # doesn't fit, and TC is not set.
+ w2 = r.to_wire(max_size=512, prefer_truncation=True)
+ r2 = dns.message.from_wire(w2, one_rr_per_rrset=True)
+ self.assertEqual(r2.flags & dns.flags.TC, 0)
+ self.assertEqual(len(r2.additional), 30)
+
if __name__ == "__main__":
unittest.main()