]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
krb5: Add Python functions to create a credentials cache containing a service ticket
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 27 Apr 2021 23:02:47 +0000 (11:02 +1200)
committerJeremy Allison <jra@samba.org>
Wed, 19 May 2021 01:32:34 +0000 (01:32 +0000)
This is a FILE: format credentials cache readable by the MIT/Heimdal
Kerberos libraries. This allows us to glue the Python ASN1 Kerberos
system to the MIT/Heimdal one.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
python/samba/tests/krb5/kdc_base_test.py

index 1c7f05dda6d45dbed75b4ad3b2afe03d74a076ac..d8193ae9cdc303583196d7c44edb59a04e4549a2 100644 (file)
@@ -1,6 +1,6 @@
 # Unix SMB/CIFS implementation.
 # Copyright (C) Stefan Metzmacher 2020
-# Copyright (C) 2020 Catalyst.Net Ltd
+# Copyright (C) 2020-2021 Catalyst.Net Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -18,6 +18,8 @@
 
 import sys
 import os
+from datetime import datetime
+import tempfile
 
 sys.path.insert(0, "bin/python")
 os.environ["PYTHONUNBUFFERED"] = "1"
@@ -26,10 +28,10 @@ import ldb
 from ldb import SCOPE_BASE
 from samba import generate_random_password
 from samba.auth import system_session
-from samba.credentials import Credentials
-from samba.dcerpc import krb5pac
+from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
+from samba.dcerpc import krb5pac, krb5ccache
 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_NORMAL_ACCOUNT
-from samba.ndr import ndr_unpack
+from samba.ndr import ndr_pack, ndr_unpack
 from samba.samdb import SamDB
 
 from samba.tests import delete_force
@@ -38,6 +40,8 @@ import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
 from samba.tests.krb5.rfc4120_constants import (
     AD_IF_RELEVANT,
     AD_WIN2K_PAC,
+    AES256_CTS_HMAC_SHA1_96,
+    ARCFOUR_HMAC_MD5,
     KDC_ERR_PREAUTH_REQUIRED,
     KRB_AS_REP,
     KRB_TGS_REP,
@@ -46,6 +50,8 @@ from samba.tests.krb5.rfc4120_constants import (
     KU_PA_ENC_TIMESTAMP,
     KU_TGS_REP_ENC_PART_SUB_KEY,
     KU_TICKET,
+    NT_PRINCIPAL,
+    NT_SRV_HST,
     PADATA_ENC_TIMESTAMP,
     PADATA_ETYPE_INFO2,
 )
@@ -445,3 +451,156 @@ class KDCBaseTest(RawKerberosTest):
         msg = ldb.Message(dn)
         msg[name] = ldb.MessageElement(values, flag, name)
         self.ldb.modify(msg)
+
+    def create_ccache(self, cname, ticket, enc_part):
+        """ Lay out a version 4 on-disk credentials cache, to be read using the
+            FILE: protocol.
+        """
+
+        field = krb5ccache.DELTATIME_TAG()
+        field.kdc_sec_offset = 0
+        field.kdc_usec_offset = 0
+
+        v4tag = krb5ccache.V4TAG()
+        v4tag.tag = 1
+        v4tag.field = field
+
+        v4tags = krb5ccache.V4TAGS()
+        v4tags.tag = v4tag
+        v4tags.further_tags = b''
+
+        optional_header = krb5ccache.V4HEADER()
+        optional_header.v4tags = v4tags
+
+        cname_string = cname['name-string']
+
+        cprincipal = krb5ccache.PRINCIPAL()
+        cprincipal.name_type = cname['name-type']
+        cprincipal.component_count = len(cname_string)
+        cprincipal.realm = ticket['realm']
+        cprincipal.components = cname_string
+
+        sname = ticket['sname']
+        sname_string = sname['name-string']
+
+        sprincipal = krb5ccache.PRINCIPAL()
+        sprincipal.name_type = sname['name-type']
+        sprincipal.component_count = len(sname_string)
+        sprincipal.realm = ticket['realm']
+        sprincipal.components = sname_string
+
+        key = self.EncryptionKey_import(enc_part['key'])
+
+        key_data = key.export_obj()
+        keyblock = krb5ccache.KEYBLOCK()
+        keyblock.enctype = key_data['keytype']
+        keyblock.data = key_data['keyvalue']
+
+        addresses = krb5ccache.ADDRESSES()
+        addresses.count = 0
+        addresses.data = []
+
+        authdata = krb5ccache.AUTHDATA()
+        authdata.count = 0
+        authdata.data = []
+
+        # Re-encode the ticket, since it was decoded by another layer.
+        ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
+
+        authtime = enc_part['authtime']
+        try:
+            starttime = enc_part['starttime']
+        except KeyError:
+            starttime = authtime
+        endtime = enc_part['endtime']
+
+        cred = krb5ccache.CREDENTIAL()
+        cred.client = cprincipal
+        cred.server = sprincipal
+        cred.keyblock = keyblock
+        cred.authtime = int(datetime.strptime(authtime.decode(),
+                                              "%Y%m%d%H%M%SZ").timestamp())
+        cred.starttime = int(datetime.strptime(starttime.decode(),
+                                               "%Y%m%d%H%M%SZ").timestamp())
+        cred.endtime = int(datetime.strptime(endtime.decode(),
+                                             "%Y%m%d%H%M%SZ").timestamp())
+        cred.renew_till = cred.endtime
+        cred.is_skey = 0
+        cred.ticket_flags = int(enc_part['flags'], 2)
+        cred.addresses = addresses
+        cred.authdata = authdata
+        cred.ticket = ticket_data
+        cred.second_ticket = b''
+
+        ccache = krb5ccache.CCACHE()
+        ccache.pvno = 5
+        ccache.version = 4
+        ccache.optional_header = optional_header
+        ccache.principal = cprincipal
+        ccache.cred = cred
+
+        # Serialise the credentials cache structure.
+        result = ndr_pack(ccache)
+
+        # Create a temporary file and write the credentials.
+        cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
+        cachefile.write(result)
+        cachefile.close()
+
+        return cachefile
+
+    def create_ccache_with_user(self, user_credentials, mach_name,
+                                service="host"):
+        # Obtain a service ticket authorising the user and place it into a
+        # newly created credentials cache file.
+
+        user_name = user_credentials.get_username()
+        realm = user_credentials.get_realm()
+
+        # Do the initial AS-REQ, should get a pre-authentication required
+        # response
+        etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+        cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=[user_name])
+        sname = self.PrincipalName_create(name_type=NT_SRV_HST,
+                                          names=["krbtgt", realm])
+
+        rep = self.as_req(cname, sname, realm, etype)
+        self.check_pre_authenication(rep)
+
+        # Do the next AS-REQ
+        padata = self.get_pa_data(user_credentials, rep)
+        key = self.get_as_rep_key(user_credentials, rep)
+        rep = self.as_req(cname, sname, realm, etype, padata=padata)
+        self.check_as_reply(rep)
+
+        # Request a ticket to the host service on the machine account
+        ticket = rep['ticket']
+        enc_part = self.get_as_rep_enc_data(key, rep)
+        key = self.EncryptionKey_import(enc_part['key'])
+        cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=[user_name])
+        sname = self.PrincipalName_create(name_type=NT_SRV_HST,
+                                          names=[service, mach_name])
+
+        (rep, enc_part) = self.tgs_req(
+            cname, sname, realm, ticket, key, etype)
+        self.check_tgs_reply(rep)
+        key = self.EncryptionKey_import(enc_part['key'])
+
+        # Check the contents of the pac, and the ticket
+        ticket = rep['ticket']
+
+        # Write the ticket into a credentials cache file that can be ingested
+        # by the main credentials code.
+        cachefile = self.create_ccache(cname, ticket, enc_part)
+
+        # Create a credentials object to reference the credentials cache.
+        creds = Credentials()
+        creds.set_kerberos_state(MUST_USE_KERBEROS)
+        creds.set_username(user_name, SPECIFIED)
+        creds.set_realm(realm)
+        creds.set_named_ccache(cachefile.name, SPECIFIED, self.lp)
+
+        # Return the credentials along with the cache file.
+        return (creds, cachefile)