From: Stefan Metzmacher Date: Wed, 18 Dec 2024 10:44:27 +0000 (+0100) Subject: python:tests/krb5: allow exporting a keytab file of the accounts used by the tests X-Git-Tag: tdb-1.4.13~199 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9520aea8b0f2cc56a165650fa470ae5e650ad9a9;p=thirdparty%2Fsamba.git python:tests/krb5: allow exporting a keytab file of the accounts used by the tests EXPORT_KEYTAB_FILE=/dev/shm/export.keytab EXPORT_KEYTAB_APPEND=0 or 1 EXPORT_EXISTING_CREDS_TO_KEYTAB=0 or 1 EXPORT_GIVEN_CREDS_TO_KEYTAB=0 or 1 Signed-off-by: Stefan Metzmacher Reviewed-by: Jennifer Sutton --- diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 83a91240c1f..2dc696af35d 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -809,6 +809,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): spn=None, upn=None, additional_details=None, ou=None, account_control=0, add_dollar=None, expired_password=False, force_nt4_hash=False, + export_to_keytab=True, preserve=True): """Create an account for testing. The dn of the created account is added to self.accounts, @@ -962,6 +963,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): guid = guid.decode('utf-8') creds.set_guid(guid) + if export_to_keytab: + self.remember_creds_for_keytab_export(creds) + return (creds, dn) def get_security_descriptor(self, dn): @@ -2181,6 +2185,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): add_dollar=add_dollar, force_nt4_hash=force_nt4_hash, expired_password=expired_password, + export_to_keytab=False, # explicit below preserve=use_cache) expected_etypes = None @@ -2277,6 +2282,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): if secure_channel_type is not None: creds.set_secure_channel_type(secure_channel_type) + self.remember_creds_for_keytab_export(creds) + return creds def get_new_username(self): @@ -2391,6 +2398,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): claims_support=self.kdc_claims_support, compound_id_support=self.kdc_compound_id_support) + if type(self).export_existing_creds: + self.remember_creds_for_keytab_export(creds) + return creds c = self._get_krb5_creds(prefix='RODC_KRBTGT', @@ -2436,6 +2446,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): krbtgt_keys = self.get_keys(krbtgt_creds) self.creds_set_keys(krbtgt_creds, krbtgt_keys) + self.remember_creds_for_keytab_export(krbtgt_creds) + acct_res = samdb.search(base=rodc_ctx.acct_dn, scope=ldb.SCOPE_BASE, attrs=['msDS-KeyVersionNumber', @@ -2472,6 +2484,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): # used by the caller... # self.creds_set_keys(computer_creds, computer_keys) + self.remember_creds_for_keytab_export(computer_creds) + if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008: extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 | security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96) @@ -2540,6 +2554,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): claims_support=self.kdc_claims_support, compound_id_support=self.kdc_compound_id_support) + if type(self).export_existing_creds: + self.remember_creds_for_keytab_export(creds) + return creds c = self._get_krb5_creds(prefix='KRBTGT', @@ -2592,6 +2609,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): extra_bits=extra_bits, remove_bits=remove_bits) + if type(self).export_existing_creds: + self.remember_creds_for_keytab_export(creds) + return creds c = self._get_krb5_creds(prefix='DC', @@ -2642,6 +2662,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): extra_bits=extra_bits, remove_bits=remove_bits) + if type(self).export_existing_creds: + self.remember_creds_for_keytab_export(creds) + return creds c = self._get_krb5_creds(prefix='SERVER', diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 1f3c555a5bd..e899723f660 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -17,6 +17,7 @@ # import sys +import os import socket import struct import time @@ -47,13 +48,15 @@ from pyasn1.error import PyAsn1Error from samba import unix2nttime from samba.credentials import Credentials -from samba.dcerpc import claims, krb5pac, netlogon, samr, security +from samba.dcerpc import claims, krb5pac, netlogon, samr, security, krb5ccache from samba.gensec import FEATURE_SEAL from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc.misc import ( SEC_CHAN_WKSTA, SEC_CHAN_BDC, SEC_CHAN_RODC, + SEC_CHAN_DOMAIN, + SEC_CHAN_DNS_DOMAIN, ) from samba.dsdb import ( UF_SMARTCARD_REQUIRED @@ -103,6 +106,7 @@ from samba.tests.krb5.rfc4120_constants import ( KU_TGS_REQ_AUTH_DAT_SESSION, KU_TGS_REQ_AUTH_DAT_SUBKEY, KU_TICKET, + NT_UNKNOWN, NT_PRINCIPAL, NT_SRV_INST, NT_WELLKNOWN, @@ -887,11 +891,42 @@ class RawKerberosTest(TestCase): crash_windows = '1' cls.crash_windows = bool(int(crash_windows)) + export_keytab_file = samba.tests.env_get_var_value('EXPORT_KEYTAB_FILE', + allow_missing=True) + cls.export_keytab_file = export_keytab_file + cls.keytab_entries = [] + export_keytab_append = samba.tests.env_get_var_value('EXPORT_KEYTAB_APPEND', + allow_missing=True) + if export_keytab_append is None: + export_keytab_append = '0' + cls.export_keytab_append = bool(int(export_keytab_append)) + export_existing_creds = samba.tests.env_get_var_value('EXPORT_EXISTING_CREDS_TO_KEYTAB', + allow_missing=True) + if export_existing_creds is None: + export_existing_creds = '0' + cls.export_existing_creds = bool(int(export_existing_creds)) + export_given_creds = samba.tests.env_get_var_value('EXPORT_GIVEN_CREDS_TO_KEYTAB', + allow_missing=True) + if export_given_creds is None: + export_given_creds = '0' + cls.export_given_creds = bool(int(export_given_creds)) + + @classmethod + def tearDownClass(cls): + cls.export_keytab() + super().tearDownClass() + def setUp(self): super().setUp() self.do_asn1_print = False self.do_hexdump = False + cls = type(self) + if cls.export_keytab_file and \ + not cls.export_keytab_append and \ + os.path.exists(cls.export_keytab_file): + self.fail("export_keytab_file[%s] already exists" % cls.export_keytab_file) + strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True) if strict_checking is None: @@ -950,6 +985,141 @@ class RawKerberosTest(TestCase): allow_missing=allow_missing) return val + def remember_keytab_entry(self, princ, realm, enctype, key, kvno=None): + cls = type(self) + if cls.export_keytab_file is None: + return + + if kvno is None: + kvno = 0 + + keyb = krb5ccache.KEYTAB_KEYBLOCK() + keyb.data = list(key) + keyb.length = len(key) + + comps = princ.split('/') + keyp = krb5ccache.KEYTAB_PRINCIPAL() + keyp.realm = realm + keyp.component_count = len(comps) + keyp.components = comps + keyp.name_type = NT_UNKNOWN + + keye = krb5ccache.KEYTAB_ENTRY() + keye.principal = keyp + keye.timestamp = 0 + # key_version is only 1 byte + # full_key_version below is the full 32-bit value + keye.key_version = kvno & 0xff + keye.enctype = enctype + keye.key = keyb + keye.full_key_version = kvno + + cls.keytab_entries.append(keye) + + def remember_creds_for_keytab_export(self, creds): + princ = creds.get_username() + realm = creds.get_realm() + kvno = creds.get_kvno() + + sec_chan = creds.get_secure_channel_type() + if sec_chan in [SEC_CHAN_DOMAIN, SEC_CHAN_DNS_DOMAIN]: + incoming_creds = creds.get_trust_incoming_creds() + outgoing_creds = creds.get_trust_outgoing_creds() + if incoming_creds and outgoing_creds: + # creds is the account_creds + pass + elif incoming_creds: + # creds is the outgoing_creds + princ = "krbtgt/%s" % incoming_creds.get_realm() + elif outgoing_creds: + # creds is the incoming_creds + princ = "krbtgt/%s" % outgoing_creds.get_realm() + + need_rc4 = False + need_aes256_sha1 = False + if creds.get_password() is not None: + need_aes256_sha1 = True + need_rc4 = True + elif creds.get_nt_hash() is not None: + need_rc4 = True + etypes = creds.get_tgs_krb5_etypes() + for etype in etypes: + if etype == kcrypto.Enctype.AES256: + need_aes256_sha1 = False + if etype == kcrypto.Enctype.RC4: + need_rc4 = False + try: + key = self.TicketDecryptionKey_from_creds(creds, etype=etype) + except ValueError: + pass + except AssertionError: + pass + else: + key = key.export_obj()['keyvalue'] + self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno) + + if need_aes256_sha1: + etype = kcrypto.Enctype.AES256 + try: + key = self.TicketDecryptionKey_from_creds(creds, etype=etype) + except ValueError: + pass + except AssertionError: + pass + else: + key = key.export_obj()['keyvalue'] + self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno) + if need_rc4: + etype = kcrypto.Enctype.RC4 + try: + key = self.TicketDecryptionKey_from_creds(creds, etype=etype) + except ValueError: + pass + except AssertionError: + pass + else: + key = key.export_obj()['keyvalue'] + self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno) + + @classmethod + def export_keytab(cls): + if cls.export_keytab_file is None: + return + + last_mke = None + + if os.path.exists(cls.export_keytab_file): + if not cls.export_keytab_append: + return + with open(cls.export_keytab_file, 'rb') as f: + blob = f.read() + ke = ndr_unpack(krb5ccache.KEYTAB, blob) + tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES() + tmp_mke.entry = ke.entry + tmp_mke.further_entry = ke.further_entry + last_mke = tmp_mke + + for keye in cls.keytab_entries: + if last_mke: + further_entry = ndr_pack(last_mke) + else: + further_entry = b'' + tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES() + tmp_mke.entry = keye + tmp_mke.further_entry = further_entry + last_mke = tmp_mke + + if last_mke is None: + return + + ke = krb5ccache.KEYTAB() + ke.entry = last_mke.entry + ke.further_entry = last_mke.further_entry + blob = ndr_pack(ke) + + with open(cls.export_keytab_file, 'wb') as f: + f.write(blob) + def _get_krb5_creds_from_env(self, prefix, default_username=None, allow_missing_password=False, @@ -1021,6 +1191,9 @@ class RawKerberosTest(TestCase): 'Please supply %s encryption keys ' 'in environment' % prefix) + if type(self).export_given_creds: + self.remember_creds_for_keytab_export(c) + return c def _get_krb5_creds(self,