samr,
security,
)
-from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA
+from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA, SEC_CHAN_RODC
from samba.domain.models import AuthenticationPolicy, AuthenticationSilo
from samba.drs_utils import drs_Replicate, drsuapi_connect
from samba.dsdb import (
for dn in reversed(self.test_accounts):
delete_force(self._ldb, dn)
+ if self._test_rodc_ctx is not None:
+ self._test_rodc_ctx.cleanup_old_join(force=True)
+
super().tearDown()
@classmethod
# A list containing DNs of accounts that should be removed when the
# current test finishes.
self.test_accounts = []
+ self._test_rodc_ctx = None
def get_lp(self) -> LoadParm:
if self._lp is None:
return dn
- def get_mock_rodc_ctx(self):
- if self._rodc_ctx is None:
+ def get_mock_rodc_ctx(self, preserve=True):
+ if preserve:
+ rodc_ctx = self._rodc_ctx
+ else:
+ rodc_ctx = self._test_rodc_ctx
+
+ if rodc_ctx is None:
admin_creds = self.get_admin_creds()
lp = self.get_lp()
domain=None)
self.create_rodc(rodc_ctx)
- type(self)._rodc_ctx = rodc_ctx
+ if preserve:
+ # Mark this rodc for deletion in tearDownClass() after all the
+ # tests in this class finish.
+ type(self)._rodc_ctx = rodc_ctx
+ else:
+ # Mark this rodc for deletion in tearDown() after the current
+ # test finishes.
+ self._test_rodc_ctx = rodc_ctx
- return self._rodc_ctx
+ return rodc_ctx
def get_domain_functional_level(self, ldb=None):
if self._functional_level is None:
def get_mock_rodc_krbtgt_creds(self,
require_keys=True,
- require_strongest_key=False):
+ require_strongest_key=False,
+ preserve=True):
if require_strongest_key:
self.assertTrue(require_keys)
def create_rodc_krbtgt_account():
samdb = self.get_samdb()
- rodc_ctx = self.get_mock_rodc_ctx()
+ rodc_ctx = self.get_mock_rodc_ctx(preserve=preserve)
krbtgt_dn = rodc_ctx.new_krbtgt_dn
dn = res[0].dn
username = str(rodc_ctx.krbtgt_name)
- creds = KerberosCredentials()
- creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
- creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
- creds.set_username(username)
+ krbtgt_creds = KerberosCredentials()
+ krbtgt_creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
+ krbtgt_creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
+ krbtgt_creds.set_username(username)
kvno = int(res[0]['msDS-KeyVersionNumber'][0])
krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
rodc_kvno = krbtgt_number << 16 | kvno
- creds.set_kvno(rodc_kvno)
- creds.set_dn(dn)
-
- keys = self.get_keys(creds)
- self.creds_set_keys(creds, keys)
+ krbtgt_creds.set_kvno(rodc_kvno)
+ krbtgt_creds.set_dn(dn)
+
+ krbtgt_keys = self.get_keys(krbtgt_creds)
+ self.creds_set_keys(krbtgt_creds, krbtgt_keys)
+
+ acct_res = samdb.search(base=rodc_ctx.acct_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-KeyVersionNumber',
+ 'objectSid',
+ 'objectGUID'])
+
+ computer_creds = KerberosCredentials()
+ computer_creds.set_domain(krbtgt_creds.get_domain())
+ computer_creds.set_realm(krbtgt_creds.get_realm())
+ computer_creds.set_username(rodc_ctx.samname)
+ computer_creds.set_password(rodc_ctx.acct_pass)
+ computer_creds.set_workstation(rodc_ctx.myname)
+ computer_creds.set_secure_channel_type(misc.SEC_CHAN_RODC)
+ computer_creds.set_dn(acct_res[0].dn)
+ computer_creds.set_type(self.AccountType.RODC)
+ computer_creds.set_user_account_control(rodc_ctx.userAccountControl)
+
+ computer_kvno = int(acct_res[0]['msDS-KeyVersionNumber'][0])
+ computer_creds.set_kvno(computer_kvno)
+
+ sid = acct_res[0].get('objectSid', idx=0)
+ sid = samdb.schema_format_value('objectSID', sid)
+ sid = sid.decode('utf-8')
+ computer_creds.set_sid(sid)
+ guid = acct_res[0].get('objectGUID', idx=0)
+ guid = samdb.schema_format_value('objectGUID', guid)
+ guid = guid.decode('utf-8')
+ computer_creds.set_guid(guid)
+
+ computer_keys = self.get_keys(computer_creds)
+ # we just let wireshark see the keys via drsuapi
+ # but we don't force them on computer_creds
+ # as computer_creds.set_password() could be
+ # used by the caller...
+ # self.creds_set_keys(computer_creds, computer_keys)
if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
extra_bits = 0
remove_bits = (security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
security.KERB_ENCTYPE_RC4_HMAC_MD5)
- self.creds_set_enctypes(creds,
+ self.creds_set_enctypes(krbtgt_creds,
+ extra_bits=extra_bits,
+ remove_bits=remove_bits)
+ self.creds_set_enctypes(computer_creds,
extra_bits=extra_bits,
remove_bits=remove_bits)
- return creds
+ krbtgt_creds.set_rodc_computer_creds(computer_creds)
+
+ return krbtgt_creds
+
+ if not preserve:
+ return create_rodc_krbtgt_account()
c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
allow_missing_password=True,