--- /dev/null
+import contextlib
+
+import dns.exception
+
+
+@contextlib.contextmanager
+def prefixed_length(output, length_length):
+ output.write(b"\00" * length_length)
+ start = output.tell()
+ yield
+ end = output.tell()
+ length = end - start
+ if length > 0:
+ try:
+ output.seek(start - length_length)
+ try:
+ output.write(length.to_bytes(length_length, "big"))
+ except OverflowError:
+ raise dns.exception.FormError
+ finally:
+ output.seek(end)
self.offset = offset
-DEFAULT_EDNS_PAYLOAD = 1232
+DEFAULT_EDNS_PAYLOAD = dns.renderer.DEFAULT_EDNS_PAYLOAD
MAX_CHAIN = 16
IndexKeyType = tuple[
def _make_tsig(
keyname, algorithm, time_signed, fudge, mac, original_id, error, other
):
- tsig = dns.rdtypes.ANY.TSIG.TSIG(
- dns.rdataclass.ANY,
- dns.rdatatype.TSIG,
- algorithm,
- time_signed,
- fudge,
- mac,
- original_id,
- error,
- other,
+ return dns.renderer._make_tsig(
+ keyname, algorithm, time_signed, fudge, mac, original_id, error, other
)
- return dns.rrset.from_rdata(keyname, 0, tsig)
def use_tsig(
self,
@staticmethod
def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
- opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
- return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
+ return dns.renderer._make_opt(flags, payload, options)
def use_edns(
self,
Returns a ``dns.name.Name``.
"""
- if not isinstance(text, str):
- raise ValueError("input to from_unicode() must be a unicode string")
- if not (origin is None or isinstance(origin, Name)):
- raise ValueError("origin must be a Name or None")
labels = []
label = ""
escaping = False
# then it's still "all ASCII" even though the domain name has
# codepoints > 127.
text = text.encode("ascii")
- if not isinstance(text, bytes):
- raise ValueError("input to from_text() must be a string")
- if not (origin is None or isinstance(origin, Name)):
- raise ValueError("origin must be a Name or None")
labels = []
label = b""
escaping = False
import dns.rdata
import dns.rdataclass
import dns.rdatatype
-import dns.renderer
import dns.set
import dns.ttl
+from dns._render_util import prefixed_length
# define SimpleSet here for backwards compatibility
SimpleSet = dns.set.Set
for rd in l:
name.to_wire(file, compress, origin)
file.write(struct.pack("!HHI", self.rdtype, rdclass, self.ttl))
- with dns.renderer.prefixed_length(file, 2):
+ with prefixed_length(file, 2):
rd.to_wire(file, compress, origin)
return len(self)
import dns.edns
import dns.exception
+import dns.name
import dns.rdataclass
import dns.rdatatype
+import dns.rdtypes.ANY.OPT
+import dns.rdtypes.ANY.TSIG
+import dns.rrset
import dns.tsig
+from dns._render_util import prefixed_length as prefixed_length # type: ignore
-# Note we can't import dns.message for cicularity reasons
+DEFAULT_EDNS_PAYLOAD = 1232
QUESTION = 0
ANSWER = 1
ADDITIONAL = 3
-@contextlib.contextmanager
-def prefixed_length(output, length_length):
- output.write(b"\00" * length_length)
- start = output.tell()
- yield
- end = output.tell()
- length = end - start
- if length > 0:
- try:
- output.seek(start - length_length)
- try:
- output.write(length.to_bytes(length_length, "big"))
- except OverflowError:
- raise dns.exception.FormError
- finally:
- output.seek(end)
+def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
+ opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
+ return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
+
+
+def _make_tsig(keyname, algorithm, time_signed, fudge, mac, original_id, error, other):
+ tsig = dns.rdtypes.ANY.TSIG.TSIG(
+ dns.rdataclass.ANY,
+ dns.rdatatype.TSIG,
+ algorithm,
+ time_signed,
+ fudge,
+ mac,
+ original_id,
+ error,
+ other,
+ )
+ return dns.rrset.from_rdata(keyname, 0, tsig)
class Renderer:
pad = b""
options = list(opt_rdata.options)
options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
- opt = dns.message.Message._make_opt( # type: ignore
- ttl, opt_rdata.rdclass, options
- )
+ opt = _make_opt(ttl, opt_rdata.rdclass, options) # type: ignore
self.was_padded = True
self.add_rrset(ADDITIONAL, opt)
# make sure the EDNS version in ednsflags agrees with edns
ednsflags &= 0xFF00FFFF
ednsflags |= edns << 16
- opt = dns.message.Message._make_opt(ednsflags, payload, options) # type: ignore
+ opt = _make_opt(ednsflags, payload, options) # type: ignore
self.add_opt(opt)
def add_tsig(
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig( # type: ignore
+ tsig = _make_tsig( # type: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig( # type: ignore
+ tsig = _make_tsig( # type: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, ctx) = dns.tsig.sign(
self.assertRaises(dns.name.BadEscape, bad2)
- def testFromUnicodeNotString(self):
- def bad():
- dns.name.from_unicode(b"123") # type: ignore
-
- self.assertRaises(ValueError, bad)
-
- def testFromUnicodeBadOrigin(self):
- def bad():
- dns.name.from_unicode("example", 123) # type: ignore
-
- self.assertRaises(ValueError, bad)
-
def testFromUnicodeEmptyLabel(self):
def bad():
dns.name.from_unicode("a..b.example")
def testFromUnicodeEmptyName(self):
self.assertEqual(dns.name.from_unicode("@", None), dns.name.empty)
- def testFromTextNotString(self):
- def bad():
- dns.name.from_text(123) # type: ignore
-
- self.assertRaises(ValueError, bad)
-
- def testFromTextBadOrigin(self):
- def bad():
- dns.name.from_text("example", 123) # type: ignore
-
- self.assertRaises(ValueError, bad)
-
def testBadPunycode(self):
c = dns.name.IDNACodec()
with self.assertRaises(dns.name.IDNAException):