]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: Create RODC account for testing
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Mon, 13 Sep 2021 09:24:05 +0000 (21:24 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 15 Sep 2021 07:59:31 +0000 (07:59 +0000)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14642

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
python/samba/tests/krb5/kdc_base_test.py

index 56102cfc4ead62d26ce3e6aeec239e5f8f0607b4..892d3aaf41fcd62d68d09325ed1232a377b38370 100644 (file)
@@ -42,8 +42,10 @@ from samba.dsdb import (
     UF_NO_AUTH_DATA_REQUIRED,
     UF_NORMAL_ACCOUNT,
     UF_NOT_DELEGATED,
+    UF_PARTIAL_SECRETS_ACCOUNT,
     UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
 )
+from samba.join import DCJoinContext
 from samba.ndr import ndr_pack, ndr_unpack
 from samba import net
 from samba.samdb import SamDB, dsdb_Dn
@@ -105,6 +107,8 @@ class KDCBaseTest(RawKerberosTest):
 
         cls.account_cache = {}
 
+        cls._rodc_ctx = None
+
         cls.ldb_cleanups = []
 
     @classmethod
@@ -121,6 +125,10 @@ class KDCBaseTest(RawKerberosTest):
 
             for dn in cls.accounts:
                 delete_force(cls._ldb, dn)
+
+        if cls._rodc_ctx is not None:
+            cls._rodc_ctx.cleanup_old_join(force=True)
+
         super().tearDownClass()
 
     def setUp(self):
@@ -171,6 +179,25 @@ class KDCBaseTest(RawKerberosTest):
 
         return dn
 
+    def get_mock_rodc_ctx(self):
+        if self._rodc_ctx is None:
+            admin_creds = self.get_admin_creds()
+            lp = self.get_lp()
+
+            rodc_name = 'KRB5RODC'
+            site_name = 'Default-First-Site-Name'
+
+            type(self)._rodc_ctx = DCJoinContext(server=self.dc_host,
+                                                 creds=admin_creds,
+                                                 lp=lp,
+                                                 site=site_name,
+                                                 netbios_name=rodc_name,
+                                                 targetdir=None,
+                                                 domain=None)
+            self.create_rodc(self._rodc_ctx)
+
+        return self._rodc_ctx
+
     def get_domain_functional_level(self, ldb):
         if self._functional_level is None:
             res = ldb.search(base='',
@@ -259,6 +286,49 @@ class KDCBaseTest(RawKerberosTest):
 
         return (creds, dn)
 
+    def create_rodc(self, ctx):
+        ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+        ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+        ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
+
+        ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
+                                f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
+                                f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
+                                f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
+                                f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
+        ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
+
+        mysid = ctx.get_mysid()
+        admin_dn = f'<SID={mysid}>'
+        ctx.managedby = admin_dn
+
+        ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
+                                  UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+                                  UF_PARTIAL_SECRETS_ACCOUNT)
+
+        ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
+        ctx.secure_channel_type = misc.SEC_CHAN_RODC
+        ctx.RODC = True
+        ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+                             drsuapi.DRSUAPI_DRS_PER_SYNC |
+                             drsuapi.DRSUAPI_DRS_GET_ANC |
+                             drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+                             drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+        ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
+
+        ctx.build_nc_lists()
+
+        ctx.cleanup_old_join()
+
+        try:
+            ctx.join_add_objects()
+        except Exception:
+            # cleanup the failed join (checking we still have a live LDB
+            # connection to the remote DC first)
+            ctx.refresh_ldb_connection()
+            ctx.cleanup_old_join()
+            raise
+
     def replicate_account_to_rodc(self, dn):
         samdb = self.get_samdb()
         rodc_samdb = self.get_rodc_samdb()
@@ -705,6 +775,50 @@ class KDCBaseTest(RawKerberosTest):
                                  fallback_creds_fn=download_rodc_krbtgt_creds)
         return c
 
+    def get_mock_rodc_krbtgt_creds(self,
+                                   require_keys=True,
+                                   require_strongest_key=False):
+        if require_strongest_key:
+            self.assertTrue(require_keys)
+
+        def create_rodc_krbtgt_account():
+            samdb = self.get_samdb()
+
+            rodc_ctx = self.get_mock_rodc_ctx()
+
+            krbtgt_dn = rodc_ctx.new_krbtgt_dn
+
+            res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
+                               scope=ldb.SCOPE_BASE,
+                               attrs=['msDS-KeyVersionNumber',
+                                      'msDS-SecondaryKrbTgtNumber'])
+            dn = res[0].dn
+            username = str(rodc_ctx.krbtgt_name)
+
+            creds = KerberosCredentials()
+            creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
+            creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
+            creds.set_username(username)
+
+            kvno = int(res[0]['msDS-KeyVersionNumber'][0])
+            krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
+
+            rodc_kvno = krbtgt_number << 16 | kvno
+            creds.set_kvno(rodc_kvno)
+            creds.set_dn(dn)
+
+            keys = self.get_keys(samdb, dn)
+            self.creds_set_keys(creds, keys)
+
+            return creds
+
+        c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
+                                 allow_missing_password=True,
+                                 allow_missing_keys=not require_keys,
+                                 require_strongest_key=require_strongest_key,
+                                 fallback_creds_fn=create_rodc_krbtgt_account)
+        return c
+
     def get_krbtgt_creds(self,
                          require_keys=True,
                          require_strongest_key=False):