]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
python:tests/krb5: allow exporting a keytab file of the accounts used by the tests
authorStefan Metzmacher <metze@samba.org>
Wed, 18 Dec 2024 10:44:27 +0000 (11:44 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 8 Jan 2025 09:13:30 +0000 (09:13 +0000)
EXPORT_KEYTAB_FILE=/dev/shm/export.keytab
EXPORT_KEYTAB_APPEND=0 or 1
EXPORT_EXISTING_CREDS_TO_KEYTAB=0 or 1
EXPORT_GIVEN_CREDS_TO_KEYTAB=0 or 1

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jennifer Sutton <jennifersutton@catalyst.net.nz>
python/samba/tests/krb5/kdc_base_test.py
python/samba/tests/krb5/raw_testcase.py

index 83a91240c1f2df288722e8d9a8668b5120fe5e9f..2dc696af35db03f7e77b25b20253c64271a0a0f5 100644 (file)
@@ -809,6 +809,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                        spn=None, upn=None, additional_details=None,
                        ou=None, account_control=0, add_dollar=None,
                        expired_password=False, force_nt4_hash=False,
+                       export_to_keytab=True,
                        preserve=True):
         """Create an account for testing.
            The dn of the created account is added to self.accounts,
@@ -962,6 +963,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         guid = guid.decode('utf-8')
         creds.set_guid(guid)
 
+        if export_to_keytab:
+            self.remember_creds_for_keytab_export(creds)
+
         return (creds, dn)
 
     def get_security_descriptor(self, dn):
@@ -2181,6 +2185,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                                         add_dollar=add_dollar,
                                         force_nt4_hash=force_nt4_hash,
                                         expired_password=expired_password,
+                                        export_to_keytab=False, # explicit below
                                         preserve=use_cache)
 
         expected_etypes = None
@@ -2277,6 +2282,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         if secure_channel_type is not None:
             creds.set_secure_channel_type(secure_channel_type)
 
+        self.remember_creds_for_keytab_export(creds)
+
         return creds
 
     def get_new_username(self):
@@ -2391,6 +2398,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                 claims_support=self.kdc_claims_support,
                 compound_id_support=self.kdc_compound_id_support)
 
+            if type(self).export_existing_creds:
+                self.remember_creds_for_keytab_export(creds)
+
             return creds
 
         c = self._get_krb5_creds(prefix='RODC_KRBTGT',
@@ -2436,6 +2446,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             krbtgt_keys = self.get_keys(krbtgt_creds)
             self.creds_set_keys(krbtgt_creds, krbtgt_keys)
 
+            self.remember_creds_for_keytab_export(krbtgt_creds)
+
             acct_res = samdb.search(base=rodc_ctx.acct_dn,
                                     scope=ldb.SCOPE_BASE,
                                     attrs=['msDS-KeyVersionNumber',
@@ -2472,6 +2484,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             # used by the caller...
             # self.creds_set_keys(computer_creds, computer_keys)
 
+            self.remember_creds_for_keytab_export(computer_creds)
+
             if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
                 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
                               security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
@@ -2540,6 +2554,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                 claims_support=self.kdc_claims_support,
                 compound_id_support=self.kdc_compound_id_support)
 
+            if type(self).export_existing_creds:
+                self.remember_creds_for_keytab_export(creds)
+
             return creds
 
         c = self._get_krb5_creds(prefix='KRBTGT',
@@ -2592,6 +2609,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                                     extra_bits=extra_bits,
                                     remove_bits=remove_bits)
 
+            if type(self).export_existing_creds:
+                self.remember_creds_for_keytab_export(creds)
+
             return creds
 
         c = self._get_krb5_creds(prefix='DC',
@@ -2642,6 +2662,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                                     extra_bits=extra_bits,
                                     remove_bits=remove_bits)
 
+            if type(self).export_existing_creds:
+                self.remember_creds_for_keytab_export(creds)
+
             return creds
 
         c = self._get_krb5_creds(prefix='SERVER',
index 1f3c555a5bd4b934fd74d7a0429ebc09bdc7f145..e899723f660357def4e4ce3f7857d18156bcc487 100644 (file)
@@ -17,6 +17,7 @@
 #
 
 import sys
+import os
 import socket
 import struct
 import time
@@ -47,13 +48,15 @@ from pyasn1.error import PyAsn1Error
 
 from samba import unix2nttime
 from samba.credentials import Credentials
-from samba.dcerpc import claims, krb5pac, netlogon, samr, security
+from samba.dcerpc import claims, krb5pac, netlogon, samr, security, krb5ccache
 from samba.gensec import FEATURE_SEAL
 from samba.ndr import ndr_pack, ndr_unpack
 from samba.dcerpc.misc import (
     SEC_CHAN_WKSTA,
     SEC_CHAN_BDC,
     SEC_CHAN_RODC,
+    SEC_CHAN_DOMAIN,
+    SEC_CHAN_DNS_DOMAIN,
 )
 from samba.dsdb import (
     UF_SMARTCARD_REQUIRED
@@ -103,6 +106,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KU_TGS_REQ_AUTH_DAT_SESSION,
     KU_TGS_REQ_AUTH_DAT_SUBKEY,
     KU_TICKET,
+    NT_UNKNOWN,
     NT_PRINCIPAL,
     NT_SRV_INST,
     NT_WELLKNOWN,
@@ -887,11 +891,42 @@ class RawKerberosTest(TestCase):
             crash_windows = '1'
         cls.crash_windows = bool(int(crash_windows))
 
+        export_keytab_file = samba.tests.env_get_var_value('EXPORT_KEYTAB_FILE',
+                                                           allow_missing=True)
+        cls.export_keytab_file = export_keytab_file
+        cls.keytab_entries = []
+        export_keytab_append = samba.tests.env_get_var_value('EXPORT_KEYTAB_APPEND',
+                                                           allow_missing=True)
+        if export_keytab_append is None:
+            export_keytab_append = '0'
+        cls.export_keytab_append = bool(int(export_keytab_append))
+        export_existing_creds = samba.tests.env_get_var_value('EXPORT_EXISTING_CREDS_TO_KEYTAB',
+                                                              allow_missing=True)
+        if export_existing_creds is None:
+            export_existing_creds = '0'
+        cls.export_existing_creds = bool(int(export_existing_creds))
+        export_given_creds = samba.tests.env_get_var_value('EXPORT_GIVEN_CREDS_TO_KEYTAB',
+                                                           allow_missing=True)
+        if export_given_creds is None:
+            export_given_creds = '0'
+        cls.export_given_creds = bool(int(export_given_creds))
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.export_keytab()
+        super().tearDownClass()
+
     def setUp(self):
         super().setUp()
         self.do_asn1_print = False
         self.do_hexdump = False
 
+        cls = type(self)
+        if cls.export_keytab_file and \
+           not cls.export_keytab_append and \
+           os.path.exists(cls.export_keytab_file):
+            self.fail("export_keytab_file[%s] already exists" % cls.export_keytab_file)
+
         strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
                                                         allow_missing=True)
         if strict_checking is None:
@@ -950,6 +985,141 @@ class RawKerberosTest(TestCase):
                                                 allow_missing=allow_missing)
         return val
 
+    def remember_keytab_entry(self, princ, realm, enctype, key, kvno=None):
+        cls = type(self)
+        if cls.export_keytab_file is None:
+            return
+
+        if kvno is None:
+            kvno = 0
+
+        keyb = krb5ccache.KEYTAB_KEYBLOCK()
+        keyb.data = list(key)
+        keyb.length = len(key)
+
+        comps = princ.split('/')
+        keyp = krb5ccache.KEYTAB_PRINCIPAL()
+        keyp.realm = realm
+        keyp.component_count = len(comps)
+        keyp.components = comps
+        keyp.name_type = NT_UNKNOWN
+
+        keye = krb5ccache.KEYTAB_ENTRY()
+        keye.principal = keyp
+        keye.timestamp = 0
+        # key_version is only 1 byte
+        # full_key_version below is the full 32-bit value
+        keye.key_version = kvno & 0xff
+        keye.enctype = enctype
+        keye.key = keyb
+        keye.full_key_version = kvno
+
+        cls.keytab_entries.append(keye)
+
+    def remember_creds_for_keytab_export(self, creds):
+        princ = creds.get_username()
+        realm = creds.get_realm()
+        kvno = creds.get_kvno()
+
+        sec_chan = creds.get_secure_channel_type()
+        if sec_chan in [SEC_CHAN_DOMAIN, SEC_CHAN_DNS_DOMAIN]:
+            incoming_creds = creds.get_trust_incoming_creds()
+            outgoing_creds = creds.get_trust_outgoing_creds()
+            if incoming_creds and outgoing_creds:
+                # creds is the account_creds
+                pass
+            elif incoming_creds:
+                # creds is the outgoing_creds
+                princ = "krbtgt/%s" % incoming_creds.get_realm()
+            elif outgoing_creds:
+                # creds is the incoming_creds
+                princ = "krbtgt/%s" % outgoing_creds.get_realm()
+
+        need_rc4 = False
+        need_aes256_sha1 = False
+        if creds.get_password() is not None:
+            need_aes256_sha1 = True
+            need_rc4 = True
+        elif creds.get_nt_hash() is not None:
+            need_rc4 = True
+        etypes = creds.get_tgs_krb5_etypes()
+        for etype in etypes:
+            if etype == kcrypto.Enctype.AES256:
+                need_aes256_sha1 = False
+            if etype == kcrypto.Enctype.RC4:
+                need_rc4 = False
+            try:
+                key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+            except ValueError:
+                pass
+            except AssertionError:
+                pass
+            else:
+                key = key.export_obj()['keyvalue']
+                self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+
+        if need_aes256_sha1:
+            etype = kcrypto.Enctype.AES256
+            try:
+                key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+            except ValueError:
+                pass
+            except AssertionError:
+                pass
+            else:
+                key = key.export_obj()['keyvalue']
+                self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+        if need_rc4:
+            etype = kcrypto.Enctype.RC4
+            try:
+                key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+            except ValueError:
+                pass
+            except AssertionError:
+                pass
+            else:
+                key = key.export_obj()['keyvalue']
+                self.remember_keytab_entry(princ, realm, etype, key, kvno=kvno)
+
+    @classmethod
+    def export_keytab(cls):
+        if cls.export_keytab_file is None:
+            return
+
+        last_mke = None
+
+        if os.path.exists(cls.export_keytab_file):
+            if not cls.export_keytab_append:
+                return
+            with open(cls.export_keytab_file, 'rb') as f:
+                blob = f.read()
+                ke = ndr_unpack(krb5ccache.KEYTAB, blob)
+                tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES()
+                tmp_mke.entry = ke.entry
+                tmp_mke.further_entry = ke.further_entry
+                last_mke = tmp_mke
+
+        for keye in cls.keytab_entries:
+            if last_mke:
+                further_entry = ndr_pack(last_mke)
+            else:
+                further_entry = b''
+            tmp_mke = krb5ccache.MULTIPLE_KEYTAB_ENTRIES()
+            tmp_mke.entry = keye
+            tmp_mke.further_entry = further_entry
+            last_mke = tmp_mke
+
+        if last_mke is None:
+            return
+
+        ke = krb5ccache.KEYTAB()
+        ke.entry = last_mke.entry
+        ke.further_entry = last_mke.further_entry
+        blob = ndr_pack(ke)
+
+        with open(cls.export_keytab_file, 'wb') as f:
+            f.write(blob)
+
     def _get_krb5_creds_from_env(self, prefix,
                                  default_username=None,
                                  allow_missing_password=False,
@@ -1021,6 +1191,9 @@ class RawKerberosTest(TestCase):
                             'Please supply %s encryption keys '
                             'in environment' % prefix)
 
+        if type(self).export_given_creds:
+            self.remember_creds_for_keytab_export(c)
+
         return c
 
     def _get_krb5_creds(self,