self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
self.opt: Optional[dns.rrset.RRset] = None
self.request_payload = 0
+ self.pad = 0
self.keyring: Any = None
self.tsig: Optional[dns.rrset.RRset] = None
self.request_mac = b""
rrset = None
return rrset
+ def _compute_opt_reserve(self) -> int:
+ """Compute the size required for the OPT RR, padding excluded"""
+ if not self.opt:
+ return 0
+ # 1 byte for the root name, 10 for the standard RR fields
+ size = 11
+ # This would be more efficient if options had a size() method, but we won't
+ # worry about that for now. We also don't worry if there is an existing padding
+ # option, as it is unlikely and probably harmless, as the worst case is that we
+ # may add another, and this seems to be legal.
+ for option in self.opt[0].options:
+ wire = option.to_wire()
+ # We add 4 here to account for the option type and length
+ size += len(wire) + 4
+ if self.pad:
+ # Padding will be added, so again add the option type and length.
+ size += 4
+ return size
+
+ def _compute_tsig_reserve(self) -> int:
+ """Compute the size required for the TSIG RR"""
+ # This would be more efficient if TSIGs had a size method, but we won't
+ # worry about for now. Also, we can't really cope with the potential
+ # compressibility of the TSIG owner name, so we estimate with the uncompressed
+ # size. We will disable compression when TSIG and padding are both is active
+ # so that the padding comes out right.
+ if not self.tsig:
+ return 0
+ f = io.BytesIO()
+ self.tsig.to_wire(f)
+ return len(f.getvalue())
+
def to_wire(
self,
origin: Optional[dns.name.Name] = None,
elif max_size > 65535:
max_size = 65535
r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
+ opt_reserve = self._compute_opt_reserve()
+ r.reserve(opt_reserve)
+ tsig_reserve = self._compute_tsig_reserve()
+ r.reserve(tsig_reserve)
for rrset in self.question:
r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
for rrset in self.answer:
r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
for rrset in self.authority:
r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
- if self.opt is not None:
- r.add_rrset(dns.renderer.ADDITIONAL, self.opt)
for rrset in self.additional:
r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
+ r.release_reserved()
+ if self.opt is not None:
+ r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
r.write_header()
if self.tsig is not None:
(new_tsig, ctx) = dns.tsig.sign(
self.keyring.algorithm,
0,
fudge,
- b"",
+ b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm],
original_id,
tsig_error,
other_data,
payload: int = DEFAULT_EDNS_PAYLOAD,
request_payload: Optional[int] = None,
options: Optional[List[dns.edns.Option]] = None,
+ pad: int = 0,
) -> None:
"""Configure EDNS behavior.
- *edns*, an ``int``, is the EDNS level to use. Specifying
- ``None``, ``False``, or ``-1`` means "do not use EDNS", and in this case
- the other parameters are ignored. Specifying ``True`` is
- equivalent to specifying 0, i.e. "use EDNS0".
+ *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``,
+ or ``-1`` means "do not use EDNS", and in this case the other parameters are
+ ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0".
*ednsflags*, an ``int``, the EDNS flag values.
- *payload*, an ``int``, is the EDNS sender's payload field, which is the
- maximum size of UDP datagram the sender can handle. I.e. how big
- a response to this message can be.
+ *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum
+ size of UDP datagram the sender can handle. I.e. how big a response to this
+ message can be.
+
+ *request_payload*, an ``int``, is the EDNS payload size to use when sending this
+ message. If not specified, defaults to the value of *payload*.
- *request_payload*, an ``int``, is the EDNS payload size to use when
- sending this message. If not specified, defaults to the value of
- *payload*.
+ *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options.
- *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS
- options.
+ *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
+ padding bytes to make the message size a multiple of *pad*. Note that if
+ padding is non-zero, an EDNS PADDING option will always be added to the
+ message.
"""
if edns is None or edns is False:
if request_payload is None:
request_payload = payload
self.request_payload = request_payload
+ self.pad = pad
@property
def edns(self) -> int:
idna_codec: Optional[dns.name.IDNACodec] = None,
id: Optional[int] = None,
flags: int = dns.flags.RD,
+ pad: int = 0,
) -> QueryMessage:
"""Make a query message.
*flags*, an ``int``, the desired query flags. The default is
``dns.flags.RD``.
+ *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
+ padding bytes to make the message size a multiple of *pad*. Note that if
+ padding is non-zero, an EDNS PADDING option will always be added to the
+ message.
+
Returns a ``dns.message.QueryMessage``
"""
if kwargs and use_edns is None:
use_edns = 0
kwargs["edns"] = use_edns
+ kwargs["pad"] = pad
m.use_edns(**kwargs)
m.want_dnssec(want_dnssec)
return m
*q*, a ``dns.message.Message``, the query to send.
- *where*, a ``str``, the nameserver IP address or the full URL. If an IP
- address is given, the URL will be constructed using the following schema:
+ *where*, a ``str``, the nameserver IP address or the full URL. If an IP address is
+ given, the URL will be constructed using the following schema:
https://<IP-address>:<port>/<path>.
- *timeout*, a ``float`` or ``None``, the number of seconds to
- wait before the query times out. If ``None``, the default, wait forever.
+ *timeout*, a ``float`` or ``None``, the number of seconds to wait before the query
+ times out. If ``None``, the default, wait forever.
*port*, a ``int``, the port to send the query to. The default is 443.
- *source*, a ``str`` containing an IPv4 or IPv6 address, specifying
- the source address. The default is the wildcard address.
+ *source*, a ``str`` containing an IPv4 or IPv6 address, specifying the source
+ address. The default is the wildcard address.
- *source_port*, an ``int``, the port from which to send the message.
- The default is 0.
+ *source_port*, an ``int``, the port from which to send the message. The default is
+ 0.
- *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own
- RRset.
+ *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset.
- *ignore_trailing*, a ``bool``. If ``True``, ignore trailing
- junk at end of the received message.
+ *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the
+ received message.
- *session*, an ``httpx.Client`` or ``requests.session.Session``. If
- provided, the client/session to use to send the queries.
+ *session*, an ``httpx.Client`` or ``requests.session.Session``. If provided, the
+ client/session to use to send the queries.
*path*, a ``str``. If *where* is an IP address, then *path* will be used to
construct the URL to send the DNS query to.
*post*, a ``bool``. If ``True``, the default, POST method will be used.
- *bootstrap_address*, a ``str``, the IP address to use to bypass the
- system's DNS resolver.
+ *bootstrap_address*, a ``str``, the IP address to use to bypass the system's DNS
+ resolver.
*verify*, a ``str``, containing a path to a certificate file or directory.
r.add_rrset(dns.renderer.ANSWER, rrset_1)
r.add_rrset(dns.renderer.ANSWER, rrset_2)
r.add_rrset(dns.renderer.AUTHORITY, ns_rrset)
- r.add_edns(0, 0, 4096)
r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_1)
r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_2)
+ r.add_edns(0, 0, 4096)
r.write_header()
r.add_tsig(keyname, secret, 300, 1, 0, '', request_mac)
wire = r.get_wire()
+ If padding is going to be used, then the OPT record MUST be
+ written after everything else in the additional section except for
+ the TSIG (if any).
+
output, an io.BytesIO, where rendering is written
id: the message id
self.counts = [0, 0, 0, 0]
self.output.write(b"\x00" * 12)
self.mac = ""
+ self.reserved = 0
+ self.was_padded = False
def _rollback(self, where):
"""Truncate the output buffer at offset *where*, and remove any
n = rdataset.to_wire(name, self.output, self.compress, self.origin, **kw)
self.counts[section] += n
+ def add_opt(self, opt, pad=0, opt_size=0, tsig_size=0):
+ """Add *opt* to the additional section, applying padding if desired. The
+ padding will take the specified precomputed OPT size and TSIG size into
+ account.
+
+ Note that we don't have reliable way of knowing how big a GSS-TSIG digest
+ might be, so we we might not get an even multiple of the pad in that case."""
+ if pad:
+ ttl = opt.ttl
+ assert opt_size >= 11
+ opt_rdata = opt[0]
+ size_without_padding = self.output.tell() + opt_size + tsig_size
+ remainder = size_without_padding % pad
+ if remainder:
+ pad = b"\x00" * (pad - remainder)
+ else:
+ pad = b""
+ options = list(opt_rdata.options)
+ options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
+ opt = dns.message.Message._make_opt(ttl, opt_rdata.rdclass, options)
+ self.was_padded = True
+ self.add_rrset(ADDITIONAL, opt)
+
def add_edns(self, edns, ednsflags, payload, options=None):
"""Add an EDNS OPT record to the message."""
ednsflags &= 0xFF00FFFF
ednsflags |= edns << 16
opt = dns.message.Message._make_opt(ednsflags, payload, options)
- self.add_rrset(ADDITIONAL, opt)
+ self.add_opt(opt)
def add_tsig(
self,
return ctx
def _write_tsig(self, tsig, keyname):
+ if self.was_padded:
+ compress = None
+ else:
+ compress = self.compress
self._set_section(ADDITIONAL)
with self._track_size():
- keyname.to_wire(self.output, self.compress, self.origin)
+ keyname.to_wire(self.output, compress, self.origin)
self.output.write(
struct.pack("!HHIH", dns.rdatatype.TSIG, dns.rdataclass.ANY, 0, 0)
)
"""Return the wire format message."""
return self.output.getvalue()
+
+ def reserve(self, size: int) -> None:
+ """Reserve *size* bytes."""
+ if size < 0:
+ raise ValueError(f"reserved amount must be non-negative")
+ if size > self.max_size:
+ raise ValueError(f"cannot reserve more than the maximum size")
+ self.reserved += size
+ self.max_size -= size
+
+ def release_reserved(self) -> None:
+ """Release the reserved bytes."""
+ self.max_size += self.reserved
+ self.reserved = 0
default_algorithm = HMAC_SHA256
+mac_sizes = {
+ HMAC_SHA1: 20,
+ HMAC_SHA224: 28,
+ HMAC_SHA256: 32,
+ HMAC_SHA256_128: 16,
+ HMAC_SHA384: 48,
+ HMAC_SHA384_192: 24,
+ HMAC_SHA512: 64,
+ HMAC_SHA512_256: 32,
+ HMAC_MD5: 16,
+ GSS_TSIG: 128, # This is what we assume to be the worst case!
+}
+
class GSSTSig:
"""
import binascii
import dns.exception
+import dns.edns
import dns.flags
import dns.message
import dns.name
import dns.update
import dns.rdtypes.ANY.OPT
import dns.rdtypes.ANY.TSIG
+import dns.tsigkeyring
from tests.util import here
)
self.assertEqual(m, expected_message)
+ def test_padding_basic(self):
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=0)
+ w = q.to_wire()
+ self.assertEqual(len(w), 40)
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=128)
+ w = q.to_wire()
+ self.assertEqual(len(w), 128)
+ q2 = dns.message.from_wire(w)
+ self.assertEqual(q, q2)
+
+ def test_padding_various(self):
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=1)
+ w = q.to_wire()
+ self.assertEqual(len(w), 44)
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=2)
+ w = q.to_wire()
+ self.assertEqual(len(w), 44)
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=3)
+ w = q.to_wire()
+ self.assertEqual(len(w), 45)
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=44)
+ w = q.to_wire()
+ self.assertEqual(len(w), 44)
+ q = dns.message.make_query("www.example", "a", use_edns=0, pad=67)
+ w = q.to_wire()
+ self.assertEqual(len(w), 67)
+
+ def test_padding_with_option(self):
+ options = [dns.edns.ECSOption("1.2.3.0", 24)]
+ q = dns.message.make_query(
+ "www.example", "a", use_edns=0, pad=128, options=options
+ )
+ w = q.to_wire()
+ self.assertEqual(len(w), 128)
+ q2 = dns.message.from_wire(w)
+ self.assertEqual(q, q2)
+
+ def test_padding_with_tsig_and_option(self):
+ keyring = dns.tsigkeyring.from_text({"keyname.": "NjHwPsMKjdN++dOfE5iAiQ=="})
+ options = [dns.edns.ECSOption("1.2.3.0", 24)]
+ q = dns.message.make_query(
+ "www.example", "a", use_edns=0, options=options, pad=128
+ )
+ q.use_tsig(keyring)
+ w = q.to_wire()
+ self.assertEqual(len(w), 256)
+ q2 = dns.message.from_wire(w, keyring=keyring)
+ self.assertIsNotNone(q2.tsig)
+ self.assertEqual(q, q2)
+
if __name__ == "__main__":
unittest.main()
r.add_rdataset(dns.renderer.ANSWER, qname, rds)
self.assertRaises(dns.exception.FormError, bad)
+
+ def test_reservation(self):
+ r = dns.renderer.Renderer(flags=dns.flags.QR, max_size=512)
+ r.reserve(100)
+ assert r.max_size == 412
+ r.release_reserved()
+ assert r.max_size == 512
+ with self.assertRaises(ValueError):
+ r.reserve(-1)
+ with self.assertRaises(ValueError):
+ r.reserve(513)