spn=None, upn=None, additional_details=None,
ou=None, account_control=0, add_dollar=None,
expired_password=False, force_nt4_hash=False,
+ export_to_keytab=True,
preserve=True):
"""Create an account for testing.
The dn of the created account is added to self.accounts,
guid = guid.decode('utf-8')
creds.set_guid(guid)
+ if export_to_keytab:
+ self.remember_creds_for_keytab_export(creds)
+
return (creds, dn)
def get_security_descriptor(self, dn):
add_dollar=add_dollar,
force_nt4_hash=force_nt4_hash,
expired_password=expired_password,
+ export_to_keytab=False, # explicit below
preserve=use_cache)
expected_etypes = None
if secure_channel_type is not None:
creds.set_secure_channel_type(secure_channel_type)
+ self.remember_creds_for_keytab_export(creds)
+
return creds
def get_new_username(self):
claims_support=self.kdc_claims_support,
compound_id_support=self.kdc_compound_id_support)
+ if type(self).export_existing_creds:
+ self.remember_creds_for_keytab_export(creds)
+
return creds
c = self._get_krb5_creds(prefix='RODC_KRBTGT',
krbtgt_keys = self.get_keys(krbtgt_creds)
self.creds_set_keys(krbtgt_creds, krbtgt_keys)
+ self.remember_creds_for_keytab_export(krbtgt_creds)
+
acct_res = samdb.search(base=rodc_ctx.acct_dn,
scope=ldb.SCOPE_BASE,
attrs=['msDS-KeyVersionNumber',
# used by the caller...
# self.creds_set_keys(computer_creds, computer_keys)
+ self.remember_creds_for_keytab_export(computer_creds)
+
if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
claims_support=self.kdc_claims_support,
compound_id_support=self.kdc_compound_id_support)
+ if type(self).export_existing_creds:
+ self.remember_creds_for_keytab_export(creds)
+
return creds
c = self._get_krb5_creds(prefix='KRBTGT',
extra_bits=extra_bits,
remove_bits=remove_bits)
+ if type(self).export_existing_creds:
+ self.remember_creds_for_keytab_export(creds)
+
return creds
c = self._get_krb5_creds(prefix='DC',
extra_bits=extra_bits,
remove_bits=remove_bits)
+ if type(self).export_existing_creds:
+ self.remember_creds_for_keytab_export(creds)
+
return creds
c = self._get_krb5_creds(prefix='SERVER',
#
import sys
+import os
import socket
import struct
import time
from samba import unix2nttime
from samba.credentials import Credentials
-from samba.dcerpc import claims, krb5pac, netlogon, samr, security
+from samba.dcerpc import claims, krb5pac, netlogon, samr, security, krb5ccache
from samba.gensec import FEATURE_SEAL
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc.misc import (
SEC_CHAN_WKSTA,
SEC_CHAN_BDC,
SEC_CHAN_RODC,
+ SEC_CHAN_DOMAIN,
+ SEC_CHAN_DNS_DOMAIN,
)
from samba.dsdb import (
UF_SMARTCARD_REQUIRED
KU_TGS_REQ_AUTH_DAT_SESSION,
KU_TGS_REQ_AUTH_DAT_SUBKEY,
KU_TICKET,
+ NT_UNKNOWN,
NT_PRINCIPAL,
NT_SRV_INST,
NT_WELLKNOWN,
crash_windows = '1'
cls.crash_windows = bool(int(crash_windows))
+ export_keytab_file = samba.tests.env_get_var_value('EXPORT_KEYTAB_FILE',
+ allow_missing=True)
+ cls.export_keytab_file = export_keytab_file
+ cls.keytab_entries = []
+ export_keytab_append = samba.tests.env_get_var_value('EXPORT_KEYTAB_APPEND',
+ allow_missing=True)
+ if export_keytab_append is None:
+ export_keytab_append = '0'
+ cls.export_keytab_append = bool(int(export_keytab_append))
+ export_existing_creds = samba.tests.env_get_var_value('EXPORT_EXISTING_CREDS_TO_KEYTAB',
+ allow_missing=True)
+ if export_existing_creds is None:
+ export_existing_creds = '0'
+ cls.export_existing_creds = bool(int(export_existing_creds))
+ export_given_creds = samba.tests.env_get_var_value('EXPORT_GIVEN_CREDS_TO_KEYTAB',
+ allow_missing=True)
+ if export_given_creds is None:
+ export_given_creds = '0'
+ cls.export_given_creds = bool(int(export_given_creds))
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.export_keytab()
+ super().tearDownClass()
+
def setUp(self):
super().setUp()
self.do_asn1_print = False
self.do_hexdump = False
+ cls = type(self)
+ if cls.export_keytab_file and \
+ not cls.export_keytab_append and \
+ os.path.exists(cls.export_keytab_file):
+ self.fail("export_keytab_file[%s] already exists" % cls.export_keytab_file)
+
strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
allow_missing=True)
if strict_checking is None:
allow_missing=allow_missing)
return val
+ def remember_keytab_entry(self, princ, realm, enctype, key, kvno=None):
+ cls = type(self)
+ if cls.export_keytab_file is None:
+ return
+
+ if kvno is None:
+ kvno = 0
+
+ keyb = krb5ccache.KEYTAB_KEYBLOCK()
+ keyb.data = list(key)
+ keyb.length = len(key)
+
+ comps = princ.split('/')
+ keyp = krb5ccache.KEYTAB_PRINCIPAL()
+ keyp.realm = realm
+ keyp.component_count = len(comps)
+ keyp.components = comps
+ keyp.name_type = NT_UNKNOWN
+
+ keye = krb5ccache.KEYTAB_ENTRY()
+ keye.principal = keyp
+ keye.timestamp = 0
+ # key_version is only 1 byte
+ # full_key_version below is the full 32-bit value
+ keye.key_version = kvno & 0xff
+ keye.enctype = enctype
+ keye.key = keyb
+ keye.full_key_version = kvno
+
+ cls.keytab_entries.append(keye)
+
+ def remember_creds_for_keytab_export(self, creds):
+ princ = creds.get_username()
+ realm = creds.get_realm()
+ kvno = creds.get_kvno()
+
+ sec_chan = creds.get_secure_channel_type()
+ if sec_chan in [SEC_CHAN_DOMAIN, SEC_CHAN_DNS_DOMAIN]:
+ incoming_creds = creds.get_trust_incoming_creds()
+ outgoing_creds = creds.get_trust_outgoing_creds()
+ if incoming_creds and outgoing_creds:
+ # creds is the account_creds
+ pass
+ elif incoming_creds:
+ # creds is the outgoing_creds
+ princ = "krbtgt/%s" % incoming_creds.get_realm()
+ elif outgoing_creds:
+ # creds is the incoming_creds
+ princ = "krbtgt/%s" % outgoing_creds.get_realm()
+
+ need_rc4 = False
+ need_aes256_sha1 = False
+ if creds.get_password() is not None:
+ need_aes256_sha1 = True
+ need_rc4 = True
+ elif creds.get_nt_hash() is not None:
+ need_rc4 = True
+ etypes = creds.get_tgs_krb5_etypes()
+ for etype in etypes:
+ if etype == kcrypto.Enctype.AES256:
+ need_aes256_sha1 = False
+ if etype == kcrypto.Enctype.RC4:
+ need_rc4 = False
+ try:
+ key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+ except ValueError:
+ pass
+ except AssertionError:
+ pass
+ else:
+ key = key.export_obj()['keyvalue']
+ self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+
+ if need_aes256_sha1:
+ etype = kcrypto.Enctype.AES256
+ try:
+ key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+ except ValueError:
+ pass
+ except AssertionError:
+ pass
+ else:
+ key = key.export_obj()['keyvalue']
+ self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+ if need_rc4:
+ etype = kcrypto.Enctype.RC4
+ try:
+ key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+ except ValueError:
+ pass
+ except AssertionError:
+ pass
+ else:
+ key = key.export_obj()['keyvalue']
+ self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+
+ @classmethod
+ def export_keytab(cls):
+ if cls.export_keytab_file is None:
+ return
+
+ last_mke = None
+
+ if os.path.exists(cls.export_keytab_file):
+ if not cls.export_keytab_append:
+ return
+ with open(cls.export_keytab_file, 'rb') as f:
+ blob = f.read()
+ ke = ndr_unpack(krb5ccache.KEYTAB, blob)
+ tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES()
+ tmp_mke.entry = ke.entry
+ tmp_mke.further_entry = ke.further_entry
+ last_mke = tmp_mke
+
+ for keye in cls.keytab_entries:
+ if last_mke:
+ further_entry = ndr_pack(last_mke)
+ else:
+ further_entry = b''
+ tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES()
+ tmp_mke.entry = keye
+ tmp_mke.further_entry = further_entry
+ last_mke = tmp_mke
+
+ if last_mke is None:
+ return
+
+ ke = krb5ccache.KEYTAB()
+ ke.entry = last_mke.entry
+ ke.further_entry = last_mke.further_entry
+ blob = ndr_pack(ke)
+
+ with open(cls.export_keytab_file, 'wb') as f:
+ f.write(blob)
+
def _get_krb5_creds_from_env(self, prefix,
default_username=None,
allow_missing_password=False,
'Please supply %s encryption keys '
'in environment' % prefix)
+ if type(self).export_given_creds:
+ self.remember_creds_for_keytab_export(c)
+
return c
def _get_krb5_creds(self,