"""The peer didn't like amount of truncation in the TSIG we sent"""
+
+class GSSTSig:
+ """
+ GSS-TSIG TSIG implementation. This uses the GSS-API context established
+ in the TKEY message handshake to sign messages using GSS-API message
+ integrity codes, per the RFC.
+
+ In order to avoid a direct GSSAPI dependency, the keyring holds a ref
+ to the GSSAPI object required, rather than the key itself.
+ """
+ def __init__(self, gssapi_context):
+ self.gssapi_context = gssapi_context
+ self.data = b''
+ self.name = 'gss-tsig'
+
+ def update(self, data):
+ self.data += data
+
+ def digest(self):
+ # defer to the GSSAPI function to sign
+ return self.gssapi_context.get_signature(self.data)
+
+ def verify(self, mac):
+ # defer to the GSSAPI function to verify
+ return self.gssapi_context.verify_signature(self.data, mac)
+
# TSIG Algorithms
HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
HMAC_SHA256 = dns.name.from_text("hmac-sha256")
HMAC_SHA384 = dns.name.from_text("hmac-sha384")
HMAC_SHA512 = dns.name.from_text("hmac-sha512")
+GSS_TSIG = dns.name.from_text("gss-tsig")
_hashes = {
HMAC_SHA224: hashlib.sha224,
HMAC_SHA256: hashlib.sha256,
HMAC_SHA384: hashlib.sha384,
HMAC_SHA512: hashlib.sha512,
+ GSS_TSIG: GSSTSig,
HMAC_SHA1: hashlib.sha1,
HMAC_MD5: hashlib.md5,
}
return None
+def _verify_mac_for_context(ctx, key, expected):
+ """Verifies a MAC for the specified context and key.
+
+ @raises BadSignature: I{expected} does not match expected TSIG
+ """
+
+ try:
+ digestmod = _hashes[key.algorithm]
+ except KeyError:
+ raise NotImplementedError(f"TSIG algorithm {key.algorithm} " +
+ "is not supported")
+
+ if digestmod == GSSTSig:
+ try:
+ ctx.verify(expected)
+ except Exception:
+ # note the usage of a bare exception
+ raise BadSignature
+ else:
+ mac = ctx.digest()
+ if not hmac.compare_digest(mac, expected):
+ raise BadSignature
+
+
def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False):
"""Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
for the input parameters, the HMAC MAC calculated by applying the
if key.algorithm != rdata.algorithm:
raise BadAlgorithm
ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi)
- mac = ctx.digest()
- if not hmac.compare_digest(mac, rdata.mac):
- raise BadSignature
- return _maybe_start_digest(key, mac, multi)
+ _verify_mac_for_context(ctx, key, rdata.mac)
+ return _maybe_start_digest(key, rdata.mac, multi)
def get_context(key):
- """Returns an HMAC context foe the specified key.
+ """Returns an HMAC context for the specified key.
@rtype: HMAC context
@raises NotImplementedError: I{algorithm} is not supported
except KeyError:
raise NotImplementedError(f"TSIG algorithm {key.algorithm} " +
"is not supported")
- return hmac.new(key.secret, digestmod=digestmod)
+
+ if digestmod == GSSTSig:
+ return GSSTSig(key.secret)
+ else:
+ return hmac.new(key.secret, digestmod=digestmod)
class Key:
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-import hashlib
import unittest
-import time
+from unittest.mock import Mock
import dns.rcode
import dns.tsig
keyname = dns.name.from_text('keyname')
+
class TSIGTestCase(unittest.TestCase):
def test_get_context(self):
m.use_tsig(keyring, keyname, tsig_error=dns.rcode.BADKEY)
self.assertEqual(m.tsig_error, dns.rcode.BADKEY)
+ def test_gssapi_context(self):
+ # mock out the gssapi context to return some dummy values
+ gssapi_context_mock = Mock()
+ gssapi_context_mock.get_signature.return_value = b'xxxxxxxxxxx'
+ gssapi_context_mock.verify_signature.return_value = None
+
+ # create the key and add it to the keyring
+ key = dns.tsig.Key('gsstsigtest', gssapi_context_mock, 'gss-tsig')
+ ctx = dns.tsig.get_context(key)
+ self.assertEqual(ctx.name, 'gss-tsig')
+ gsskeyname = dns.name.from_text('gsstsigtest')
+ keyring[gsskeyname] = key
+
+ # create example message and go to/from wire to simulate sign/verify
+ m = dns.message.make_query('example', 'a')
+ m.use_tsig(keyring, gsskeyname)
+ w = m.to_wire()
+ # not raising is passing
+ dns.message.from_wire(w, keyring)
+
+ # assertions to make sure the "gssapi" functions were called
+ gssapi_context_mock.get_signature.assert_called_once()
+ gssapi_context_mock.verify_signature.assert_called_once()
+
def test_sign_and_validate(self):
m = dns.message.make_query('example', 'a')
m.use_tsig(keyring, keyname)