def machine_account_creds(self):
if self.machine_creds is None:
samdb = self.get_samdb()
- self.machine_creds, _ = self.create_account(samdb,
- MACHINE_NAME,
- machine_account=True)
+ self.machine_creds, _ = self.create_account(
+ samdb,
+ MACHINE_NAME,
+ account_type=self.AccountType.COMPUTER)
self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
import binascii
import collections
import secrets
+from enum import Enum, auto
from collections import namedtuple
import ldb
""" Base class for KDC tests.
"""
+ class AccountType(Enum):
+ USER = auto()
+ COMPUTER = auto()
+
@classmethod
def setUpClass(cls):
super().setUpClass()
return default_enctypes
- def create_account(self, samdb, name, machine_account=False,
+ def create_account(self, samdb, name, account_type=AccountType.USER,
spn=None, upn=None, additional_details=None,
ou=None, account_control=0):
'''Create an account for testing.
which is used by tearDownClass to clean up the created accounts.
'''
if ou is None:
- guid = (DS_GUID_COMPUTERS_CONTAINER if machine_account
- else DS_GUID_USERS_CONTAINER)
+ if account_type is account_type.COMPUTER:
+ guid = DS_GUID_COMPUTERS_CONTAINER
+ else:
+ guid = DS_GUID_USERS_CONTAINER
ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
# remove the account if it exists, this will happen if a previous test
# run failed
delete_force(samdb, dn)
- if machine_account:
- object_class = "computer"
- account_name = "%s$" % name
- account_control |= UF_WORKSTATION_TRUST_ACCOUNT
- else:
+ if account_type is self.AccountType.USER:
object_class = "user"
account_name = name
account_control |= UF_NORMAL_ACCOUNT
+ else:
+ object_class = "computer"
+ account_name = "%s$" % name
+ if account_type is self.AccountType.COMPUTER:
+ account_control |= UF_WORKSTATION_TRUST_ACCOUNT
+ else:
+ self.fail()
password = generate_random_password(32, 32)
utf16pw = ('"%s"' % password).encode('utf-16-le')
"userAccountControl": str(account_control),
"unicodePwd": utf16pw}
if spn is not None:
+ if isinstance(spn, str):
+ spn = spn.format(account=account_name)
+ else:
+ spn = tuple(s.format(account=account_name) for s in spn)
details["servicePrincipalName"] = spn
if upn is not None:
details["userPrincipalName"] = upn
creds.set_domain(samdb.domain_netbios_name().upper())
creds.set_password(password)
creds.set_username(account_name)
- if machine_account:
- creds.set_workstation(name)
- else:
+ if account_type is self.AccountType.USER:
creds.set_workstation('')
+ else:
+ creds.set_workstation(name)
creds.set_dn(ldb.Dn(samdb, dn))
creds.set_spn(spn)
#
return cleanup
def get_cached_creds(self, *,
- machine_account,
+ account_type,
opts=None,
use_cache=True):
if opts is None:
opts = {}
opts_default = {
+ 'spn': None,
'allowed_replication': False,
'allowed_replication_mock': False,
'denied_replication': False,
}
account_opts = {
- 'machine_account': machine_account,
+ 'account_type': account_type,
**opts_default,
**opts
}
return creds
def create_account_opts(self, *,
- machine_account,
+ account_type,
+ spn,
allowed_replication,
allowed_replication_mock,
denied_replication,
delegation_from_dn,
trusted_to_auth_for_delegation,
fast_support):
- if machine_account:
- self.assertFalse(not_delegated)
- else:
+ if account_type is self.AccountType.USER:
+ self.assertIsNone(spn)
self.assertIsNone(delegation_to_spn)
self.assertIsNone(delegation_from_dn)
self.assertFalse(trusted_to_auth_for_delegation)
+ else:
+ self.assertFalse(not_delegated)
samdb = self.get_samdb()
rodc_samdb = self.get_rodc_samdb()
details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
security_descriptor)
- if machine_account:
+ if spn is None and account_type is not self.AccountType.USER:
spn = 'host/' + user_name
- else:
- spn = None
creds, dn = self.create_account(samdb, user_name,
- machine_account=machine_account,
+ account_type=account_type,
spn=spn,
additional_details=details,
account_control=user_account_control)
allow_missing_password=False,
allow_missing_keys=True):
def create_client_account():
- return self.get_cached_creds(machine_account=False)
+ return self.get_cached_creds(account_type=self.AccountType.USER)
c = self._get_krb5_creds(prefix='CLIENT',
allow_missing_password=allow_missing_password,
allow_missing_password=False,
allow_missing_keys=True):
def create_mach_account():
- return self.get_cached_creds(machine_account=True,
+ return self.get_cached_creds(account_type=self.AccountType.COMPUTER,
opts={'fast_support': True})
c = self._get_krb5_creds(prefix='MAC',
allow_missing_keys=True):
def create_service_account():
return self.get_cached_creds(
- machine_account=True,
+ account_type=self.AccountType.COMPUTER,
opts={
'trusted_to_auth_for_delegation': True,
'fast_support': True
samdb = self.get_samdb()
user_name = "tsttktusr"
(uc, dn) = self.create_account(samdb, user_name)
- (mc, _) = self.create_account(samdb, "tsttktmac", machine_account=True)
+ (mc, _) = self.create_account(samdb, "tsttktmac",
+ account_type=self.AccountType.COMPUTER)
realm = uc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
def test_client_no_auth_data_required(self):
client_creds = self.get_cached_creds(
- machine_account=False,
+ account_type=self.AccountType.USER,
opts={'no_auth_data_required': True})
service_creds = self.get_service_creds()
def test_service_no_auth_data_required(self):
client_creds = self.get_client_creds()
service_creds = self.get_cached_creds(
- machine_account=True,
+ account_type=self.AccountType.COMPUTER,
opts={'no_auth_data_required': True})
tgt = self.get_tgt(client_creds)
realm = uc.get_realm().lower()
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
#
samdb = self.get_samdb()
mach_name = "mskilemac"
- (mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, dn) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
realm = mc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
realm = uc.get_realm().lower()
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
realm = uc.get_realm().lower()
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
ename = user_name + "@" + realm
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
realm = uc.get_realm().lower()
mach_name = "mskilemac"
- (mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, dn) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
ename = mach_name + "@" + realm
uname = mach_name + "$@" + realm
ename = alt_name + "@" + realm
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
uname = user_name + "@" + realm
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
ename = alt_name + "@" + realm
mach_name = "mskilemac"
- (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+ (mc, _) = self.create_account(samdb, mach_name,
+ account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
# and including the RODCIdentifier.
def test_rodc_ticket_signature(self):
user_creds = self.get_cached_creds(
- machine_account=False,
+ account_type=self.AccountType.USER,
opts={
'revealed_to_rodc': True
})
target_creds = self.get_cached_creds(
- machine_account=True,
+ account_type=self.AccountType.COMPUTER,
opts={
'revealed_to_rodc': True
})
def _run_s4u2self_test(self, kdc_dict):
client_opts = kdc_dict.pop('client_opts', None)
- client_creds = self.get_cached_creds(machine_account=False,
- opts=client_opts)
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts=client_opts)
service_opts = kdc_dict.pop('service_opts', None)
- service_creds = self.get_cached_creds(machine_account=True,
- opts=service_opts)
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts=service_opts)
service_tgt = self.get_tgt(service_creds)
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
def _run_delegation_test(self, kdc_dict):
client_opts = kdc_dict.pop('client_opts', None)
- client_creds = self.get_cached_creds(machine_account=False,
- opts=client_opts)
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts=client_opts)
service1_opts = kdc_dict.pop('service1_opts', {})
service2_opts = kdc_dict.pop('service2_opts', {})
self.assertFalse(allow_delegation and allow_rbcd)
if allow_rbcd:
- service1_creds = self.get_cached_creds(machine_account=True,
- opts=service1_opts)
+ service1_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts=service1_opts)
self.assertNotIn('delegation_from_dn', service2_opts)
service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
- service2_creds = self.get_cached_creds(machine_account=True,
- opts=service2_opts)
+ service2_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts=service2_opts)
else:
- service2_creds = self.get_cached_creds(machine_account=True,
- opts=service2_opts)
+ service2_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts=service2_opts)
if allow_delegation:
self.assertNotIn('delegation_to_spn', service1_opts)
service1_opts['delegation_to_spn'] = service2_creds.get_spn()
- service1_creds = self.get_cached_creds(machine_account=True,
- opts=service1_opts)
+ service1_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts=service1_opts)
client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
(user_credentials, _) = self.create_account(samdb, user_name)
# Create the machine account.
- (mach_credentials, _) = self.create_account(samdb,
- mach_name,
- machine_account=True,
- spn="%s/%s" % (service,
- mach_name))
+ (mach_credentials, _) = self.create_account(
+ samdb,
+ mach_name,
+ account_type=self.AccountType.COMPUTER,
+ spn="%s/%s" % (service,
+ mach_name))
# Talk to the KDC to obtain the service ticket, which gets placed into
# the cache. The machine account name has to match the name in the