UF_NO_AUTH_DATA_REQUIRED,
UF_NORMAL_ACCOUNT,
UF_NOT_DELEGATED,
+ UF_PARTIAL_SECRETS_ACCOUNT,
UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
)
+from samba.join import DCJoinContext
from samba.ndr import ndr_pack, ndr_unpack
from samba import net
from samba.samdb import SamDB, dsdb_Dn
cls.account_cache = {}
+ cls._rodc_ctx = None
+
cls.ldb_cleanups = []
@classmethod
for dn in cls.accounts:
delete_force(cls._ldb, dn)
+
+ if cls._rodc_ctx is not None:
+ cls._rodc_ctx.cleanup_old_join(force=True)
+
super().tearDownClass()
def setUp(self):
return dn
+ def get_mock_rodc_ctx(self):
+ if self._rodc_ctx is None:
+ admin_creds = self.get_admin_creds()
+ lp = self.get_lp()
+
+ rodc_name = 'KRB5RODC'
+ site_name = 'Default-First-Site-Name'
+
+ type(self)._rodc_ctx = DCJoinContext(server=self.dc_host,
+ creds=admin_creds,
+ lp=lp,
+ site=site_name,
+ netbios_name=rodc_name,
+ targetdir=None,
+ domain=None)
+ self.create_rodc(self._rodc_ctx)
+
+ return self._rodc_ctx
+
def get_domain_functional_level(self, ldb):
if self._functional_level is None:
res = ldb.search(base='',
return (creds, dn)
+ def create_rodc(self, ctx):
+ ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
+
+ ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
+ f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
+ f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
+ f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
+ f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
+ ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
+
+ mysid = ctx.get_mysid()
+ admin_dn = f'<SID={mysid}>'
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
+ UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+ ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
+
+ ctx.build_nc_lists()
+
+ ctx.cleanup_old_join()
+
+ try:
+ ctx.join_add_objects()
+ except Exception:
+ # cleanup the failed join (checking we still have a live LDB
+ # connection to the remote DC first)
+ ctx.refresh_ldb_connection()
+ ctx.cleanup_old_join()
+ raise
+
def replicate_account_to_rodc(self, dn):
samdb = self.get_samdb()
rodc_samdb = self.get_rodc_samdb()
fallback_creds_fn=download_rodc_krbtgt_creds)
return c
+ def get_mock_rodc_krbtgt_creds(self,
+ require_keys=True,
+ require_strongest_key=False):
+ if require_strongest_key:
+ self.assertTrue(require_keys)
+
+ def create_rodc_krbtgt_account():
+ samdb = self.get_samdb()
+
+ rodc_ctx = self.get_mock_rodc_ctx()
+
+ krbtgt_dn = rodc_ctx.new_krbtgt_dn
+
+ res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-KeyVersionNumber',
+ 'msDS-SecondaryKrbTgtNumber'])
+ dn = res[0].dn
+ username = str(rodc_ctx.krbtgt_name)
+
+ creds = KerberosCredentials()
+ creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
+ creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
+ creds.set_username(username)
+
+ kvno = int(res[0]['msDS-KeyVersionNumber'][0])
+ krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
+
+ rodc_kvno = krbtgt_number << 16 | kvno
+ creds.set_kvno(rodc_kvno)
+ creds.set_dn(dn)
+
+ keys = self.get_keys(samdb, dn)
+ self.creds_set_keys(creds, keys)
+
+ return creds
+
+ c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
+ allow_missing_password=True,
+ allow_missing_keys=not require_keys,
+ require_strongest_key=require_strongest_key,
+ fallback_creds_fn=create_rodc_krbtgt_account)
+ return c
+
def get_krbtgt_creds(self,
require_keys=True,
require_strongest_key=False):