]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Adds support for reading TSIG text format. 571/head
authorBrian Wellington <bwelling@xbill.org>
Mon, 10 Aug 2020 21:27:44 +0000 (14:27 -0700)
committerBrian Wellington <bwelling@xbill.org>
Mon, 10 Aug 2020 21:27:44 +0000 (14:27 -0700)
Implements from_text for the TSIG record type, and clean up some other
things.

Fixes the text format to emit fields in the right order; fudge and
time_signed were reversed.  This also matches BIND's output format now.

Add get_uint48() to the tokenizer, so that from_text() can use it.  Add
get_uint48() to the wire parser, and use it in from_wire, for
consistency.

Change dns.tsig.sign() to use rdata.replace() rather than constructing a
new TSIG record manually; this couldn't be done before, because
replace() uses text format for validation.

dns/rdtypes/ANY/TSIG.py
dns/tokenizer.py
dns/tsig.py
dns/wire.py
tests/test_tokenizer.py
tests/test_tsig.py

index 537f9c347c03ce12d00b44b38a4e626f4ebb8beb..85b80be1c478151e39c59112d5be04464b792e2a 100644 (file)
 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 
+import base64
 import struct
 
 import dns.exception
 import dns.rdata
 
 
-# We don't implement from_text, and that's ok.
-# pylint: disable=abstract-method
-
 class TSIG(dns.rdata.Rdata):
 
     """TSIG record"""
@@ -65,10 +63,35 @@ class TSIG(dns.rdata.Rdata):
 
     def to_text(self, origin=None, relativize=True, **kw):
         algorithm = self.algorithm.choose_relativity(origin, relativize)
-        return f"{algorithm} {self.fudge} {self.time_signed} " + \
+        error = dns.rcode.to_text(self.error, True)
+        text = f"{algorithm} {self.time_signed} {self.fudge} " + \
                f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \
-               f"{self.original_id} {self.error} " + \
-               f"{len(self.other)} {dns.rdata._base64ify(self.other, 0)}"
+               f"{self.original_id} {error} {len(self.other)}"
+        if self.other:
+            text += f" {dns.rdata._base64ify(self.other, 0)}"
+        return text
+
+    @classmethod
+    def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
+                  relativize_to=None):
+        algorithm = tok.get_name(relativize=False)
+        time_signed = tok.get_uint48()
+        fudge = tok.get_uint16()
+        mac_len = tok.get_uint16()
+        mac = base64.b64decode(tok.get_string())
+        if len(mac) != mac_len:
+            raise SyntaxError('invalid MAC')
+        original_id = tok.get_uint16()
+        error = dns.rcode.from_text(tok.get_string())
+        other_len = tok.get_uint16()
+        if other_len > 0:
+            other = base64.b64decode(tok.get_string())
+            if len(other) != other_len:
+                raise SyntaxError('invalid other data')
+        else:
+            other = b''
+        return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
+                   original_id, error, other)
 
     def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
         self.algorithm.to_wire(file, None, origin, False)
@@ -85,8 +108,8 @@ class TSIG(dns.rdata.Rdata):
     @classmethod
     def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
         algorithm = parser.get_name(origin)
-        (time_hi, time_lo, fudge) = parser.get_struct('!HIH')
-        time_signed = (time_hi << 32) + time_lo
+        time_signed = parser.get_uint48()
+        fudge = parser.get_uint16()
         mac = parser.get_counted_bytes(2)
         (original_id, error) = parser.get_struct('!HH')
         other = parser.get_counted_bytes(2)
index bef720b64548bc1221cd27fc0bdcebbbc7e2037a..7d698eae644bc7a9f965dc8deb3ce6de4dbf535c 100644 (file)
@@ -530,6 +530,21 @@ class Tokenizer:
                 '%d is not an unsigned 32-bit integer' % value)
         return value
 
+    def get_uint48(self, base=10):
+        """Read the next token and interpret it as a 48-bit unsigned
+        integer.
+
+        Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
+
+        Returns an int.
+        """
+
+        value = self.get_int(base=base)
+        if value < 0 or value > 281474976710655:
+            raise dns.exception.SyntaxError(
+                '%d is not an unsigned 48-bit integer' % value)
+        return value
+
     def get_string(self, max_length=None):
         """Read the next token and interpret it as a string.
 
index d9d3c244b3d9345a0b91fa5eb7622c10654395d8..117b5e5e83a5eb979db1f77f1308734e681cfcae 100644 (file)
@@ -264,12 +264,9 @@ def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False):
 
     ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi)
     mac = ctx.sign()
-    tsig = dns.rdtypes.ANY.TSIG.TSIG(dns.rdataclass.ANY, dns.rdatatype.TSIG,
-                                     key.algorithm, time, rdata.fudge, mac,
-                                     rdata.original_id, rdata.error,
-                                     rdata.other)
+    tsig = rdata.replace(time_signed=time, mac=mac)
 
-    return tsig, _maybe_start_digest(key, mac, multi)
+    return (tsig, _maybe_start_digest(key, mac, multi))
 
 
 def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None,
index a31496052b19178af5f07400448f17e2880b5db3..572e27e708d59aa9c75e9ab92d90deace8e07616 100644 (file)
@@ -42,6 +42,9 @@ class Parser:
     def get_uint32(self):
         return struct.unpack('!I', self.get_bytes(4))[0]
 
+    def get_uint48(self):
+        return int.from_bytes(self.get_bytes(6), 'big')
+
     def get_struct(self, format):
         return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
 
index 8340f46250eb14dfbbbfdd96750c7a03d7666827..e4797a5e196d0dea1a0a5562f11ed4fd65d9e24c 100644 (file)
@@ -206,6 +206,9 @@ class TokenizerTestCase(unittest.TestCase):
         with self.assertRaises(dns.exception.SyntaxError):
             tok = dns.tokenizer.Tokenizer('q1234')
             tok.get_int()
+        with self.assertRaises(dns.exception.SyntaxError):
+            tok = dns.tokenizer.Tokenizer('281474976710656')
+            tok.get_uint48()
         with self.assertRaises(dns.exception.SyntaxError):
             tok = dns.tokenizer.Tokenizer('4294967296')
             tok.get_uint32()
index 6e3993bfd68c8ca8398e1529332f07f71440eb91..b8a38b480c6bb8cb9047f7e92f2b0353d7320b1b 100644 (file)
@@ -245,3 +245,22 @@ class TSIGTestCase(unittest.TestCase):
 
     def test_hmac_sha512_256(self):
         self._test_truncated_algorithm(dns.tsig.HMAC_SHA512_256, 256)
+
+    def text_format(self):
+        key = dns.tsig.Key('foo', b'abcdefg', algorithm=alg)
+        q = dns.message.make_query('example', 'a')
+        q.use_tsig(key)
+        _ = q.to_wire()
+
+        text = q.tsig[0].to_text()
+        tsig2 = dns.rdata.from_text('ANY', 'TSIG', text)
+        self.assertEqual(tsig2, q.tsig[0])
+
+        q = dns.message.make_query('example', 'a')
+        q.use_tsig(key, other_data='abc')
+        q.use_tsig(key)
+        _ = q.to_wire()
+
+        text = q.tsig[0].to_text()
+        tsig2 = dns.rdata.from_text('ANY', 'TSIG', text)
+        self.assertEqual(tsig2, q.tsig[0])