]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
python:tests/krb5: allow get_mock_rodc_krbtgt_creds(preserve=False) to create a tmp...
authorStefan Metzmacher <metze@samba.org>
Fri, 13 Dec 2024 14:42:37 +0000 (15:42 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 8 Jan 2025 09:13:30 +0000 (09:13 +0000)
This also exposes credentials for the machine account for netlogon
testing.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jennifer Sutton <jennifersutton@catalyst.net.nz>
python/samba/tests/krb5/kdc_base_test.py
python/samba/tests/krb5/raw_testcase.py

index 093b6712e8eedc910db4c593f14dfe827052bb0d..83a91240c1f2df288722e8d9a8668b5120fe5e9f 100644 (file)
@@ -66,7 +66,7 @@ from samba.dcerpc import (
     samr,
     security,
 )
-from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA
+from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA, SEC_CHAN_RODC
 from samba.domain.models import AuthenticationPolicy, AuthenticationSilo
 from samba.drs_utils import drs_Replicate, drsuapi_connect
 from samba.dsdb import (
@@ -327,6 +327,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             for dn in reversed(self.test_accounts):
                 delete_force(self._ldb, dn)
 
+        if self._test_rodc_ctx is not None:
+            self._test_rodc_ctx.cleanup_old_join(force=True)
+
         super().tearDown()
 
     @classmethod
@@ -357,6 +360,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         # A list containing DNs of accounts that should be removed when the
         # current test finishes.
         self.test_accounts = []
+        self._test_rodc_ctx = None
 
     def get_lp(self) -> LoadParm:
         if self._lp is None:
@@ -413,8 +417,13 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return dn
 
-    def get_mock_rodc_ctx(self):
-        if self._rodc_ctx is None:
+    def get_mock_rodc_ctx(self, preserve=True):
+        if preserve:
+            rodc_ctx = self._rodc_ctx
+        else:
+            rodc_ctx = self._test_rodc_ctx
+
+        if rodc_ctx is None:
             admin_creds = self.get_admin_creds()
             lp = self.get_lp()
 
@@ -430,9 +439,16 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                                      domain=None)
             self.create_rodc(rodc_ctx)
 
-            type(self)._rodc_ctx = rodc_ctx
+            if preserve:
+                # Mark this rodc for deletion in tearDownClass() after all the
+                # tests in this class finish.
+                type(self)._rodc_ctx = rodc_ctx
+            else:
+                # Mark this rodc for deletion in tearDown() after the current
+                # test finishes.
+                self._test_rodc_ctx = rodc_ctx
 
-        return self._rodc_ctx
+        return rodc_ctx
 
     def get_domain_functional_level(self, ldb=None):
         if self._functional_level is None:
@@ -2386,14 +2402,15 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
     def get_mock_rodc_krbtgt_creds(self,
                                    require_keys=True,
-                                   require_strongest_key=False):
+                                   require_strongest_key=False,
+                                   preserve=True):
         if require_strongest_key:
             self.assertTrue(require_keys)
 
         def create_rodc_krbtgt_account():
             samdb = self.get_samdb()
 
-            rodc_ctx = self.get_mock_rodc_ctx()
+            rodc_ctx = self.get_mock_rodc_ctx(preserve=preserve)
 
             krbtgt_dn = rodc_ctx.new_krbtgt_dn
 
@@ -2404,20 +2421,56 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             dn = res[0].dn
             username = str(rodc_ctx.krbtgt_name)
 
-            creds = KerberosCredentials()
-            creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
-            creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
-            creds.set_username(username)
+            krbtgt_creds = KerberosCredentials()
+            krbtgt_creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
+            krbtgt_creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
+            krbtgt_creds.set_username(username)
 
             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
             krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
 
             rodc_kvno = krbtgt_number << 16 | kvno
-            creds.set_kvno(rodc_kvno)
-            creds.set_dn(dn)
-
-            keys = self.get_keys(creds)
-            self.creds_set_keys(creds, keys)
+            krbtgt_creds.set_kvno(rodc_kvno)
+            krbtgt_creds.set_dn(dn)
+
+            krbtgt_keys = self.get_keys(krbtgt_creds)
+            self.creds_set_keys(krbtgt_creds, krbtgt_keys)
+
+            acct_res = samdb.search(base=rodc_ctx.acct_dn,
+                                    scope=ldb.SCOPE_BASE,
+                                    attrs=['msDS-KeyVersionNumber',
+                                           'objectSid',
+                                           'objectGUID'])
+
+            computer_creds = KerberosCredentials()
+            computer_creds.set_domain(krbtgt_creds.get_domain())
+            computer_creds.set_realm(krbtgt_creds.get_realm())
+            computer_creds.set_username(rodc_ctx.samname)
+            computer_creds.set_password(rodc_ctx.acct_pass)
+            computer_creds.set_workstation(rodc_ctx.myname)
+            computer_creds.set_secure_channel_type(misc.SEC_CHAN_RODC)
+            computer_creds.set_dn(acct_res[0].dn)
+            computer_creds.set_type(self.AccountType.RODC)
+            computer_creds.set_user_account_control(rodc_ctx.userAccountControl)
+
+            computer_kvno = int(acct_res[0]['msDS-KeyVersionNumber'][0])
+            computer_creds.set_kvno(computer_kvno)
+
+            sid = acct_res[0].get('objectSid', idx=0)
+            sid = samdb.schema_format_value('objectSID', sid)
+            sid = sid.decode('utf-8')
+            computer_creds.set_sid(sid)
+            guid = acct_res[0].get('objectGUID', idx=0)
+            guid = samdb.schema_format_value('objectGUID', guid)
+            guid = guid.decode('utf-8')
+            computer_creds.set_guid(guid)
+
+            computer_keys = self.get_keys(computer_creds)
+            # we just let wireshark see the keys via drsuapi
+            # but we don't force them on computer_creds
+            # as computer_creds.set_password() could be
+            # used by the caller...
+            # self.creds_set_keys(computer_creds, computer_keys)
 
             if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
                 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
@@ -2426,11 +2479,19 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                 extra_bits = 0
             remove_bits = (security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
                            security.KERB_ENCTYPE_RC4_HMAC_MD5)
-            self.creds_set_enctypes(creds,
+            self.creds_set_enctypes(krbtgt_creds,
+                                    extra_bits=extra_bits,
+                                    remove_bits=remove_bits)
+            self.creds_set_enctypes(computer_creds,
                                     extra_bits=extra_bits,
                                     remove_bits=remove_bits)
 
-            return creds
+            krbtgt_creds.set_rodc_computer_creds(computer_creds)
+
+            return krbtgt_creds
+
+        if not preserve:
+            return create_rodc_krbtgt_account()
 
         c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
                                  allow_missing_password=True,
index e31ff5b9b129e74456193bd7cd12d1864ed25afb..bc19a8ab2d659ccb343d4afa1f91f0c0dab5e361 100644 (file)
@@ -53,6 +53,7 @@ from samba.ndr import ndr_pack, ndr_unpack
 from samba.dcerpc.misc import (
     SEC_CHAN_WKSTA,
     SEC_CHAN_BDC,
+    SEC_CHAN_RODC,
 )
 from samba.dsdb import (
     UF_SMARTCARD_REQUIRED
@@ -408,6 +409,7 @@ class KerberosCredentials(Credentials):
         'kvno',
         'sid',
         'guid',
+        'rodc_computer_creds',
         'spn',
         'tgs_supported_enctypes',
         'upn',
@@ -448,6 +450,8 @@ class KerberosCredentials(Credentials):
 
         self._private_key = None
 
+        self.rodc_computer_creds = None
+
     def set_as_supported_enctypes(self, value):
         self.as_supported_enctypes = int(value)
 
@@ -554,7 +558,7 @@ class KerberosCredentials(Credentials):
             salt_name = self.get_username()
 
         secure_schannel_type = self.get_secure_channel_type()
-        if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]:
+        if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC,SEC_CHAN_RODC]:
             salt_name = self.get_username().lower()
             if salt_name[-1] == '$':
                 salt_name = salt_name[:-1]
@@ -629,6 +633,11 @@ class KerberosCredentials(Credentials):
     def get_public_key(self):
         return self.get_private_key().public_key()
 
+    def set_rodc_computer_creds(self, computer_creds):
+        self.rodc_computer_creds = computer_creds
+
+    def get_rodc_computer_creds(self):
+        return self.rodc_computer_creds
 
 class KerberosTicketCreds:
     __slots__ = [