# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+import base64
import struct
import dns.exception
import dns.rdata
-# We don't implement from_text, and that's ok.
-# pylint: disable=abstract-method
-
class TSIG(dns.rdata.Rdata):
"""TSIG record"""
def to_text(self, origin=None, relativize=True, **kw):
algorithm = self.algorithm.choose_relativity(origin, relativize)
- return f"{algorithm} {self.fudge} {self.time_signed} " + \
+ error = dns.rcode.to_text(self.error, True)
+ text = f"{algorithm} {self.time_signed} {self.fudge} " + \
f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \
- f"{self.original_id} {self.error} " + \
- f"{len(self.other)} {dns.rdata._base64ify(self.other, 0)}"
+ f"{self.original_id} {error} {len(self.other)}"
+ if self.other:
+ text += f" {dns.rdata._base64ify(self.other, 0)}"
+ return text
+
+ @classmethod
+ def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
+ relativize_to=None):
+ algorithm = tok.get_name(relativize=False)
+ time_signed = tok.get_uint48()
+ fudge = tok.get_uint16()
+ mac_len = tok.get_uint16()
+ mac = base64.b64decode(tok.get_string())
+ if len(mac) != mac_len:
+ raise SyntaxError('invalid MAC')
+ original_id = tok.get_uint16()
+ error = dns.rcode.from_text(tok.get_string())
+ other_len = tok.get_uint16()
+ if other_len > 0:
+ other = base64.b64decode(tok.get_string())
+ if len(other) != other_len:
+ raise SyntaxError('invalid other data')
+ else:
+ other = b''
+ return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
+ original_id, error, other)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, None, origin, False)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
algorithm = parser.get_name(origin)
- (time_hi, time_lo, fudge) = parser.get_struct('!HIH')
- time_signed = (time_hi << 32) + time_lo
+ time_signed = parser.get_uint48()
+ fudge = parser.get_uint16()
mac = parser.get_counted_bytes(2)
(original_id, error) = parser.get_struct('!HH')
other = parser.get_counted_bytes(2)
'%d is not an unsigned 32-bit integer' % value)
return value
+ def get_uint48(self, base=10):
+ """Read the next token and interpret it as a 48-bit unsigned
+ integer.
+
+ Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
+
+ Returns an int.
+ """
+
+ value = self.get_int(base=base)
+ if value < 0 or value > 281474976710655:
+ raise dns.exception.SyntaxError(
+ '%d is not an unsigned 48-bit integer' % value)
+ return value
+
def get_string(self, max_length=None):
"""Read the next token and interpret it as a string.
ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi)
mac = ctx.sign()
- tsig = dns.rdtypes.ANY.TSIG.TSIG(dns.rdataclass.ANY, dns.rdatatype.TSIG,
- key.algorithm, time, rdata.fudge, mac,
- rdata.original_id, rdata.error,
- rdata.other)
+ tsig = rdata.replace(time_signed=time, mac=mac)
- return tsig, _maybe_start_digest(key, mac, multi)
+ return (tsig, _maybe_start_digest(key, mac, multi))
def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None,
def test_hmac_sha512_256(self):
self._test_truncated_algorithm(dns.tsig.HMAC_SHA512_256, 256)
+
+ def text_format(self):
+ key = dns.tsig.Key('foo', b'abcdefg', algorithm=alg)
+ q = dns.message.make_query('example', 'a')
+ q.use_tsig(key)
+ _ = q.to_wire()
+
+ text = q.tsig[0].to_text()
+ tsig2 = dns.rdata.from_text('ANY', 'TSIG', text)
+ self.assertEqual(tsig2, q.tsig[0])
+
+ q = dns.message.make_query('example', 'a')
+ q.use_tsig(key, other_data='abc')
+ q.use_tsig(key)
+ _ = q.to_wire()
+
+ text = q.tsig[0].to_text()
+ tsig2 = dns.rdata.from_text('ANY', 'TSIG', text)
+ self.assertEqual(tsig2, q.tsig[0])