From: Nick Hall Date: Sun, 5 Jul 2020 23:40:36 +0000 (+0100) Subject: Add gss-tsig support to dnspython X-Git-Tag: v2.1.0rc1~101^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2d8309bdfbda1b24e64d1b0d03ced4408898dfdd;p=thirdparty%2Fdnspython.git Add gss-tsig support to dnspython --- diff --git a/dns/tsig.py b/dns/tsig.py index 8f34fe67..51fffaed 100644 --- a/dns/tsig.py +++ b/dns/tsig.py @@ -71,6 +71,32 @@ class PeerBadTruncation(PeerError): """The peer didn't like amount of truncation in the TSIG we sent""" + +class GSSTSig: + """ + GSS-TSIG TSIG implementation. This uses the GSS-API context established + in the TKEY message handshake to sign messages using GSS-API message + integrity codes, per the RFC. + + In order to avoid a direct GSSAPI dependency, the keyring holds a ref + to the GSSAPI object required, rather than the key itself. + """ + def __init__(self, gssapi_context): + self.gssapi_context = gssapi_context + self.data = b'' + self.name = 'gss-tsig' + + def update(self, data): + self.data += data + + def digest(self): + # defer to the GSSAPI function to sign + return self.gssapi_context.get_signature(self.data) + + def verify(self, mac): + # defer to the GSSAPI function to verify + return self.gssapi_context.verify_signature(self.data, mac) + # TSIG Algorithms HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") @@ -79,12 +105,14 @@ HMAC_SHA224 = dns.name.from_text("hmac-sha224") HMAC_SHA256 = dns.name.from_text("hmac-sha256") HMAC_SHA384 = dns.name.from_text("hmac-sha384") HMAC_SHA512 = dns.name.from_text("hmac-sha512") +GSS_TSIG = dns.name.from_text("gss-tsig") _hashes = { HMAC_SHA224: hashlib.sha224, HMAC_SHA256: hashlib.sha256, HMAC_SHA384: hashlib.sha384, HMAC_SHA512: hashlib.sha512, + GSS_TSIG: GSSTSig, HMAC_SHA1: hashlib.sha1, HMAC_MD5: hashlib.md5, } @@ -142,6 +170,30 @@ def _maybe_start_digest(key, mac, multi): return None +def _verify_mac_for_context(ctx, key, expected): + """Verifies a MAC for the specified context and key. + + @raises BadSignature: I{expected} does not match expected TSIG + """ + + try: + digestmod = _hashes[key.algorithm] + except KeyError: + raise NotImplementedError(f"TSIG algorithm {key.algorithm} " + + "is not supported") + + if digestmod == GSSTSig: + try: + ctx.verify(expected) + except Exception: + # note the usage of a bare exception + raise BadSignature + else: + mac = ctx.digest() + if not hmac.compare_digest(mac, expected): + raise BadSignature + + def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata for the input parameters, the HMAC MAC calculated by applying the @@ -194,14 +246,12 @@ def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, if key.algorithm != rdata.algorithm: raise BadAlgorithm ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi) - mac = ctx.digest() - if not hmac.compare_digest(mac, rdata.mac): - raise BadSignature - return _maybe_start_digest(key, mac, multi) + _verify_mac_for_context(ctx, key, rdata.mac) + return _maybe_start_digest(key, rdata.mac, multi) def get_context(key): - """Returns an HMAC context foe the specified key. + """Returns an HMAC context for the specified key. @rtype: HMAC context @raises NotImplementedError: I{algorithm} is not supported @@ -212,7 +262,11 @@ def get_context(key): except KeyError: raise NotImplementedError(f"TSIG algorithm {key.algorithm} " + "is not supported") - return hmac.new(key.secret, digestmod=digestmod) + + if digestmod == GSSTSig: + return GSSTSig(key.secret) + else: + return hmac.new(key.secret, digestmod=digestmod) class Key: diff --git a/dns/tsigkeyring.py b/dns/tsigkeyring.py index 4afee57f..47a1f79f 100644 --- a/dns/tsigkeyring.py +++ b/dns/tsigkeyring.py @@ -55,5 +55,10 @@ def to_text(keyring): if isinstance(key, bytes): textring[name] = b64encode(key) else: - textring[name] = (key.algorithm.to_text(), b64encode(key.secret)) + if isinstance(key.secret, bytes): + text_secret = b64encode(key.secret) + else: + text_secret = str(key.secret) + + textring[name] = (key.algorithm.to_text(), text_secret) return textring diff --git a/tests/test_tsig.py b/tests/test_tsig.py index f97e53b2..2d4d92b1 100644 --- a/tests/test_tsig.py +++ b/tests/test_tsig.py @@ -1,8 +1,7 @@ # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license -import hashlib import unittest -import time +from unittest.mock import Mock import dns.rcode import dns.tsig @@ -17,6 +16,7 @@ keyring = dns.tsigkeyring.from_text( keyname = dns.name.from_text('keyname') + class TSIGTestCase(unittest.TestCase): def test_get_context(self): @@ -43,6 +43,30 @@ class TSIGTestCase(unittest.TestCase): m.use_tsig(keyring, keyname, tsig_error=dns.rcode.BADKEY) self.assertEqual(m.tsig_error, dns.rcode.BADKEY) + def test_gssapi_context(self): + # mock out the gssapi context to return some dummy values + gssapi_context_mock = Mock() + gssapi_context_mock.get_signature.return_value = b'xxxxxxxxxxx' + gssapi_context_mock.verify_signature.return_value = None + + # create the key and add it to the keyring + key = dns.tsig.Key('gsstsigtest', gssapi_context_mock, 'gss-tsig') + ctx = dns.tsig.get_context(key) + self.assertEqual(ctx.name, 'gss-tsig') + gsskeyname = dns.name.from_text('gsstsigtest') + keyring[gsskeyname] = key + + # create example message and go to/from wire to simulate sign/verify + m = dns.message.make_query('example', 'a') + m.use_tsig(keyring, gsskeyname) + w = m.to_wire() + # not raising is passing + dns.message.from_wire(w, keyring) + + # assertions to make sure the "gssapi" functions were called + gssapi_context_mock.get_signature.assert_called_once() + gssapi_context_mock.verify_signature.assert_called_once() + def test_sign_and_validate(self): m = dns.message.make_query('example', 'a') m.use_tsig(keyring, keyname)