From 0cdfab7c1ea9470faf015493a6274f0637789dc8 Mon Sep 17 00:00:00 2001 From: Brian Wellington Date: Mon, 10 Aug 2020 14:27:44 -0700 Subject: [PATCH] Adds support for reading TSIG text format. Implements from_text for the TSIG record type, and clean up some other things. Fixes the text format to emit fields in the right order; fudge and time_signed were reversed. This also matches BIND's output format now. Add get_uint48() to the tokenizer, so that from_text() can use it. Add get_uint48() to the wire parser, and use it in from_wire, for consistency. Change dns.tsig.sign() to use rdata.replace() rather than constructing a new TSIG record manually; this couldn't be done before, because replace() uses text format for validation. --- dns/rdtypes/ANY/TSIG.py | 39 +++++++++++++++++++++++++++++++-------- dns/tokenizer.py | 15 +++++++++++++++ dns/tsig.py | 7 ++----- dns/wire.py | 3 +++ tests/test_tokenizer.py | 3 +++ tests/test_tsig.py | 19 +++++++++++++++++++ 6 files changed, 73 insertions(+), 13 deletions(-) diff --git a/dns/rdtypes/ANY/TSIG.py b/dns/rdtypes/ANY/TSIG.py index 537f9c34..85b80be1 100644 --- a/dns/rdtypes/ANY/TSIG.py +++ b/dns/rdtypes/ANY/TSIG.py @@ -15,15 +15,13 @@ # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +import base64 import struct import dns.exception import dns.rdata -# We don't implement from_text, and that's ok. -# pylint: disable=abstract-method - class TSIG(dns.rdata.Rdata): """TSIG record""" @@ -65,10 +63,35 @@ class TSIG(dns.rdata.Rdata): def to_text(self, origin=None, relativize=True, **kw): algorithm = self.algorithm.choose_relativity(origin, relativize) - return f"{algorithm} {self.fudge} {self.time_signed} " + \ + error = dns.rcode.to_text(self.error, True) + text = f"{algorithm} {self.time_signed} {self.fudge} " + \ f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \ - f"{self.original_id} {self.error} " + \ - f"{len(self.other)} {dns.rdata._base64ify(self.other, 0)}" + f"{self.original_id} {error} {len(self.other)}" + if self.other: + text += f" {dns.rdata._base64ify(self.other, 0)}" + return text + + @classmethod + def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True, + relativize_to=None): + algorithm = tok.get_name(relativize=False) + time_signed = tok.get_uint48() + fudge = tok.get_uint16() + mac_len = tok.get_uint16() + mac = base64.b64decode(tok.get_string()) + if len(mac) != mac_len: + raise SyntaxError('invalid MAC') + original_id = tok.get_uint16() + error = dns.rcode.from_text(tok.get_string()) + other_len = tok.get_uint16() + if other_len > 0: + other = base64.b64decode(tok.get_string()) + if len(other) != other_len: + raise SyntaxError('invalid other data') + else: + other = b'' + return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac, + original_id, error, other) def _to_wire(self, file, compress=None, origin=None, canonicalize=False): self.algorithm.to_wire(file, None, origin, False) @@ -85,8 +108,8 @@ class TSIG(dns.rdata.Rdata): @classmethod def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): algorithm = parser.get_name(origin) - (time_hi, time_lo, fudge) = parser.get_struct('!HIH') - time_signed = (time_hi << 32) + time_lo + time_signed = parser.get_uint48() + fudge = parser.get_uint16() mac = parser.get_counted_bytes(2) (original_id, error) = parser.get_struct('!HH') other = parser.get_counted_bytes(2) diff --git a/dns/tokenizer.py b/dns/tokenizer.py index bef720b6..7d698eae 100644 --- a/dns/tokenizer.py +++ b/dns/tokenizer.py @@ -530,6 +530,21 @@ class Tokenizer: '%d is not an unsigned 32-bit integer' % value) return value + def get_uint48(self, base=10): + """Read the next token and interpret it as a 48-bit unsigned + integer. + + Raises dns.exception.SyntaxError if not a 48-bit unsigned integer. + + Returns an int. + """ + + value = self.get_int(base=base) + if value < 0 or value > 281474976710655: + raise dns.exception.SyntaxError( + '%d is not an unsigned 48-bit integer' % value) + return value + def get_string(self, max_length=None): """Read the next token and interpret it as a string. diff --git a/dns/tsig.py b/dns/tsig.py index d9d3c244..117b5e5e 100644 --- a/dns/tsig.py +++ b/dns/tsig.py @@ -264,12 +264,9 @@ def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi) mac = ctx.sign() - tsig = dns.rdtypes.ANY.TSIG.TSIG(dns.rdataclass.ANY, dns.rdatatype.TSIG, - key.algorithm, time, rdata.fudge, mac, - rdata.original_id, rdata.error, - rdata.other) + tsig = rdata.replace(time_signed=time, mac=mac) - return tsig, _maybe_start_digest(key, mac, multi) + return (tsig, _maybe_start_digest(key, mac, multi)) def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, diff --git a/dns/wire.py b/dns/wire.py index a3149605..572e27e7 100644 --- a/dns/wire.py +++ b/dns/wire.py @@ -42,6 +42,9 @@ class Parser: def get_uint32(self): return struct.unpack('!I', self.get_bytes(4))[0] + def get_uint48(self): + return int.from_bytes(self.get_bytes(6), 'big') + def get_struct(self, format): return struct.unpack(format, self.get_bytes(struct.calcsize(format))) diff --git a/tests/test_tokenizer.py b/tests/test_tokenizer.py index 8340f462..e4797a5e 100644 --- a/tests/test_tokenizer.py +++ b/tests/test_tokenizer.py @@ -206,6 +206,9 @@ class TokenizerTestCase(unittest.TestCase): with self.assertRaises(dns.exception.SyntaxError): tok = dns.tokenizer.Tokenizer('q1234') tok.get_int() + with self.assertRaises(dns.exception.SyntaxError): + tok = dns.tokenizer.Tokenizer('281474976710656') + tok.get_uint48() with self.assertRaises(dns.exception.SyntaxError): tok = dns.tokenizer.Tokenizer('4294967296') tok.get_uint32() diff --git a/tests/test_tsig.py b/tests/test_tsig.py index 6e3993bf..b8a38b48 100644 --- a/tests/test_tsig.py +++ b/tests/test_tsig.py @@ -245,3 +245,22 @@ class TSIGTestCase(unittest.TestCase): def test_hmac_sha512_256(self): self._test_truncated_algorithm(dns.tsig.HMAC_SHA512_256, 256) + + def text_format(self): + key = dns.tsig.Key('foo', b'abcdefg', algorithm=alg) + q = dns.message.make_query('example', 'a') + q.use_tsig(key) + _ = q.to_wire() + + text = q.tsig[0].to_text() + tsig2 = dns.rdata.from_text('ANY', 'TSIG', text) + self.assertEqual(tsig2, q.tsig[0]) + + q = dns.message.make_query('example', 'a') + q.use_tsig(key, other_data='abc') + q.use_tsig(key) + _ = q.to_wire() + + text = q.tsig[0].to_text() + tsig2 = dns.rdata.from_text('ANY', 'TSIG', text) + self.assertEqual(tsig2, q.tsig[0]) -- 2.47.3