# Unix SMB/CIFS implementation.
# Copyright (C) Stefan Metzmacher 2020
-# Copyright (C) 2020 Catalyst.Net Ltd
+# Copyright (C) 2020-2021 Catalyst.Net Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import sys
import os
+from datetime import datetime
+import tempfile
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
from ldb import SCOPE_BASE
from samba import generate_random_password
from samba.auth import system_session
-from samba.credentials import Credentials
-from samba.dcerpc import krb5pac
+from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
+from samba.dcerpc import krb5pac, krb5ccache
from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_NORMAL_ACCOUNT
-from samba.ndr import ndr_unpack
+from samba.ndr import ndr_pack, ndr_unpack
from samba.samdb import SamDB
from samba.tests import delete_force
from samba.tests.krb5.rfc4120_constants import (
AD_IF_RELEVANT,
AD_WIN2K_PAC,
+ AES256_CTS_HMAC_SHA1_96,
+ ARCFOUR_HMAC_MD5,
KDC_ERR_PREAUTH_REQUIRED,
KRB_AS_REP,
KRB_TGS_REP,
KU_PA_ENC_TIMESTAMP,
KU_TGS_REP_ENC_PART_SUB_KEY,
KU_TICKET,
+ NT_PRINCIPAL,
+ NT_SRV_HST,
PADATA_ENC_TIMESTAMP,
PADATA_ETYPE_INFO2,
)
msg = ldb.Message(dn)
msg[name] = ldb.MessageElement(values, flag, name)
self.ldb.modify(msg)
+
+ def create_ccache(self, cname, ticket, enc_part):
+ """ Lay out a version 4 on-disk credentials cache, to be read using the
+ FILE: protocol.
+ """
+
+ field = krb5ccache.DELTATIME_TAG()
+ field.kdc_sec_offset = 0
+ field.kdc_usec_offset = 0
+
+ v4tag = krb5ccache.V4TAG()
+ v4tag.tag = 1
+ v4tag.field = field
+
+ v4tags = krb5ccache.V4TAGS()
+ v4tags.tag = v4tag
+ v4tags.further_tags = b''
+
+ optional_header = krb5ccache.V4HEADER()
+ optional_header.v4tags = v4tags
+
+ cname_string = cname['name-string']
+
+ cprincipal = krb5ccache.PRINCIPAL()
+ cprincipal.name_type = cname['name-type']
+ cprincipal.component_count = len(cname_string)
+ cprincipal.realm = ticket['realm']
+ cprincipal.components = cname_string
+
+ sname = ticket['sname']
+ sname_string = sname['name-string']
+
+ sprincipal = krb5ccache.PRINCIPAL()
+ sprincipal.name_type = sname['name-type']
+ sprincipal.component_count = len(sname_string)
+ sprincipal.realm = ticket['realm']
+ sprincipal.components = sname_string
+
+ key = self.EncryptionKey_import(enc_part['key'])
+
+ key_data = key.export_obj()
+ keyblock = krb5ccache.KEYBLOCK()
+ keyblock.enctype = key_data['keytype']
+ keyblock.data = key_data['keyvalue']
+
+ addresses = krb5ccache.ADDRESSES()
+ addresses.count = 0
+ addresses.data = []
+
+ authdata = krb5ccache.AUTHDATA()
+ authdata.count = 0
+ authdata.data = []
+
+ # Re-encode the ticket, since it was decoded by another layer.
+ ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
+
+ authtime = enc_part['authtime']
+ try:
+ starttime = enc_part['starttime']
+ except KeyError:
+ starttime = authtime
+ endtime = enc_part['endtime']
+
+ cred = krb5ccache.CREDENTIAL()
+ cred.client = cprincipal
+ cred.server = sprincipal
+ cred.keyblock = keyblock
+ cred.authtime = int(datetime.strptime(authtime.decode(),
+ "%Y%m%d%H%M%SZ").timestamp())
+ cred.starttime = int(datetime.strptime(starttime.decode(),
+ "%Y%m%d%H%M%SZ").timestamp())
+ cred.endtime = int(datetime.strptime(endtime.decode(),
+ "%Y%m%d%H%M%SZ").timestamp())
+ cred.renew_till = cred.endtime
+ cred.is_skey = 0
+ cred.ticket_flags = int(enc_part['flags'], 2)
+ cred.addresses = addresses
+ cred.authdata = authdata
+ cred.ticket = ticket_data
+ cred.second_ticket = b''
+
+ ccache = krb5ccache.CCACHE()
+ ccache.pvno = 5
+ ccache.version = 4
+ ccache.optional_header = optional_header
+ ccache.principal = cprincipal
+ ccache.cred = cred
+
+ # Serialise the credentials cache structure.
+ result = ndr_pack(ccache)
+
+ # Create a temporary file and write the credentials.
+ cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
+ cachefile.write(result)
+ cachefile.close()
+
+ return cachefile
+
+ def create_ccache_with_user(self, user_credentials, mach_name,
+ service="host"):
+ # Obtain a service ticket authorising the user and place it into a
+ # newly created credentials cache file.
+
+ user_name = user_credentials.get_username()
+ realm = user_credentials.get_realm()
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[user_name])
+ sname = self.PrincipalName_create(name_type=NT_SRV_HST,
+ names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(user_credentials, rep)
+ key = self.get_as_rep_key(user_credentials, rep)
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
+ self.check_as_reply(rep)
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part['key'])
+ cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[user_name])
+ sname = self.PrincipalName_create(name_type=NT_SRV_HST,
+ names=[service, mach_name])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, realm, ticket, key, etype)
+ self.check_tgs_reply(rep)
+ key = self.EncryptionKey_import(enc_part['key'])
+
+ # Check the contents of the pac, and the ticket
+ ticket = rep['ticket']
+
+ # Write the ticket into a credentials cache file that can be ingested
+ # by the main credentials code.
+ cachefile = self.create_ccache(cname, ticket, enc_part)
+
+ # Create a credentials object to reference the credentials cache.
+ creds = Credentials()
+ creds.set_kerberos_state(MUST_USE_KERBEROS)
+ creds.set_username(user_name, SPECIFIED)
+ creds.set_realm(realm)
+ creds.set_named_ccache(cachefile.name, SPECIFIED, self.lp)
+
+ # Return the credentials along with the cache file.
+ return (creds, cachefile)