]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Add gss-tsig support to dnspython
authorNick Hall <nick.hall@deshaw.com>
Sun, 5 Jul 2020 23:40:36 +0000 (00:40 +0100)
committerNick Hall <nick.hall@deshaw.com>
Fri, 7 Aug 2020 23:25:22 +0000 (00:25 +0100)
dns/tsig.py
dns/tsigkeyring.py
tests/test_tsig.py

index 8f34fe67326179567a394903442bb430da9410a9..51fffaedfd95fc69a1d6501f38a3799aeba00a64 100644 (file)
@@ -71,6 +71,32 @@ class PeerBadTruncation(PeerError):
 
     """The peer didn't like amount of truncation in the TSIG we sent"""
 
+
+class GSSTSig:
+    """
+    GSS-TSIG TSIG implementation.  This uses the GSS-API context established
+    in the TKEY message handshake to sign messages using GSS-API message
+    integrity codes, per the RFC.
+
+    In order to avoid a direct GSSAPI dependency, the keyring holds a ref
+    to the GSSAPI object required, rather than the key itself.
+    """
+    def __init__(self, gssapi_context):
+        self.gssapi_context = gssapi_context
+        self.data = b''
+        self.name = 'gss-tsig'
+
+    def update(self, data):
+        self.data += data
+
+    def digest(self):
+        # defer to the GSSAPI function to sign
+        return self.gssapi_context.get_signature(self.data)
+
+    def verify(self, mac):
+        # defer to the GSSAPI function to verify
+        return self.gssapi_context.verify_signature(self.data, mac)
+
 # TSIG Algorithms
 
 HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
@@ -79,12 +105,14 @@ HMAC_SHA224 = dns.name.from_text("hmac-sha224")
 HMAC_SHA256 = dns.name.from_text("hmac-sha256")
 HMAC_SHA384 = dns.name.from_text("hmac-sha384")
 HMAC_SHA512 = dns.name.from_text("hmac-sha512")
+GSS_TSIG = dns.name.from_text("gss-tsig")
 
 _hashes = {
     HMAC_SHA224: hashlib.sha224,
     HMAC_SHA256: hashlib.sha256,
     HMAC_SHA384: hashlib.sha384,
     HMAC_SHA512: hashlib.sha512,
+    GSS_TSIG: GSSTSig,
     HMAC_SHA1: hashlib.sha1,
     HMAC_MD5: hashlib.md5,
 }
@@ -142,6 +170,30 @@ def _maybe_start_digest(key, mac, multi):
         return None
 
 
+def _verify_mac_for_context(ctx, key, expected):
+    """Verifies a MAC for the specified context and key.
+
+    @raises BadSignature: I{expected} does not match expected TSIG
+    """
+
+    try:
+        digestmod = _hashes[key.algorithm]
+    except KeyError:
+        raise NotImplementedError(f"TSIG algorithm {key.algorithm} " +
+                                  "is not supported")
+
+    if digestmod == GSSTSig:
+        try:
+            ctx.verify(expected)
+        except Exception:
+            # note the usage of a bare exception
+            raise BadSignature
+    else:
+        mac = ctx.digest()
+        if not hmac.compare_digest(mac, expected):
+            raise BadSignature
+
+
 def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False):
     """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
     for the input parameters, the HMAC MAC calculated by applying the
@@ -194,14 +246,12 @@ def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None,
     if key.algorithm != rdata.algorithm:
         raise BadAlgorithm
     ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi)
-    mac = ctx.digest()
-    if not hmac.compare_digest(mac, rdata.mac):
-        raise BadSignature
-    return _maybe_start_digest(key, mac, multi)
+    _verify_mac_for_context(ctx, key, rdata.mac)
+    return _maybe_start_digest(key, rdata.mac, multi)
 
 
 def get_context(key):
-    """Returns an HMAC context foe the specified key.
+    """Returns an HMAC context for the specified key.
 
     @rtype: HMAC context
     @raises NotImplementedError: I{algorithm} is not supported
@@ -212,7 +262,11 @@ def get_context(key):
     except KeyError:
         raise NotImplementedError(f"TSIG algorithm {key.algorithm} " +
                                   "is not supported")
-    return hmac.new(key.secret, digestmod=digestmod)
+
+    if digestmod == GSSTSig:
+        return GSSTSig(key.secret)
+    else:
+        return hmac.new(key.secret, digestmod=digestmod)
 
 
 class Key:
index 4afee57fa378abae07e72e10aad62ab56763fc80..47a1f79facf69ef315c9101d4dc9fc54a878cc4f 100644 (file)
@@ -55,5 +55,10 @@ def to_text(keyring):
         if isinstance(key, bytes):
             textring[name] = b64encode(key)
         else:
-            textring[name] = (key.algorithm.to_text(), b64encode(key.secret))
+            if isinstance(key.secret, bytes):
+                text_secret = b64encode(key.secret)
+            else:
+                text_secret = str(key.secret)
+
+            textring[name] = (key.algorithm.to_text(), text_secret)
     return textring
index f97e53b2bc3a4eedfff4b8ef84ff0b7222c50297..2d4d92b18e2b31c64c0843b9bb072c5bb68df04c 100644 (file)
@@ -1,8 +1,7 @@
 # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
 
-import hashlib
 import unittest
-import time
+from unittest.mock import Mock
 
 import dns.rcode
 import dns.tsig
@@ -17,6 +16,7 @@ keyring = dns.tsigkeyring.from_text(
 
 keyname = dns.name.from_text('keyname')
 
+
 class TSIGTestCase(unittest.TestCase):
 
     def test_get_context(self):
@@ -43,6 +43,30 @@ class TSIGTestCase(unittest.TestCase):
         m.use_tsig(keyring, keyname, tsig_error=dns.rcode.BADKEY)
         self.assertEqual(m.tsig_error, dns.rcode.BADKEY)
 
+    def test_gssapi_context(self):
+        # mock out the gssapi context to return some dummy values
+        gssapi_context_mock = Mock()
+        gssapi_context_mock.get_signature.return_value = b'xxxxxxxxxxx'
+        gssapi_context_mock.verify_signature.return_value = None
+
+        # create the key and add it to the keyring
+        key = dns.tsig.Key('gsstsigtest', gssapi_context_mock, 'gss-tsig')
+        ctx = dns.tsig.get_context(key)
+        self.assertEqual(ctx.name, 'gss-tsig')
+        gsskeyname = dns.name.from_text('gsstsigtest')
+        keyring[gsskeyname] = key
+
+        # create example message and go to/from wire to simulate sign/verify
+        m = dns.message.make_query('example', 'a')
+        m.use_tsig(keyring, gsskeyname)
+        w = m.to_wire()
+        # not raising is passing
+        dns.message.from_wire(w, keyring)
+
+        # assertions to make sure the "gssapi" functions were called
+        gssapi_context_mock.get_signature.assert_called_once()
+        gssapi_context_mock.verify_signature.assert_called_once()
+
     def test_sign_and_validate(self):
         m = dns.message.make_query('example', 'a')
         m.use_tsig(keyring, keyname)