target_name=None, till=None, rc4_support=True,
to_rodc=False, kdc_options=None,
expected_flags=None, unexpected_flags=None,
+ expect_client_claims=None,
+ expect_device_claims=None,
+ expected_client_claims=None,
+ unexpected_client_claims=None,
+ expected_device_claims=None,
+ unexpected_device_claims=None,
pac_request=True, expect_pac=True, fresh=False):
user_name = tgt.cname['name-string'][0]
ticket_sname = tgt.sname
pac_request, str(expected_flags), str(unexpected_flags),
till, rc4_support,
str(ticket_sname),
+ expect_client_claims, expect_device_claims,
+ str(expected_client_claims),
+ str(unexpected_client_claims),
+ str(expected_device_claims),
+ str(unexpected_device_claims),
expect_pac)
if not fresh:
expected_supported_etypes=target_creds.tgs_supported_enctypes,
expected_flags=expected_flags,
unexpected_flags=unexpected_flags,
+ expect_client_claims=expect_client_claims,
+ expect_device_claims=expect_device_claims,
+ expected_client_claims=expected_client_claims,
+ unexpected_client_claims=unexpected_client_claims,
+ expected_device_claims=expected_device_claims,
+ unexpected_device_claims=unexpected_device_claims,
ticket_decryption_key=decryption_key,
check_rep_fn=self.generic_check_kdc_rep,
check_kdc_private_fn=self.generic_check_kdc_private,
def get_tgt(self, creds, to_rodc=False, kdc_options=None,
client_account=None, client_name_type=NT_PRINCIPAL,
+ target_creds=None, ticket_etype=None,
expected_flags=None, unexpected_flags=None,
expected_account_name=None, expected_upn_name=None,
expected_cname=None,
expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
expect_requester_sid=None,
rc4_support=True,
+ expect_edata=None,
+ expect_client_claims=None, expect_device_claims=None,
+ expected_client_claims=None, unexpected_client_claims=None,
+ expected_device_claims=None, unexpected_device_claims=None,
fresh=False):
if client_account is not None:
user_name = client_account
cache_key = (user_name, to_rodc, kdc_options, pac_request,
client_name_type,
+ ticket_etype,
str(expected_flags), str(unexpected_flags),
expected_account_name, expected_upn_name, expected_sid,
str(sname), str(realm),
str(expected_cname),
rc4_support,
expect_pac, expect_pac_attrs,
- expect_pac_attrs_pac_request, expect_requester_sid)
+ expect_pac_attrs_pac_request, expect_requester_sid,
+ expect_client_claims, expect_device_claims,
+ str(expected_client_claims),
+ str(unexpected_client_claims),
+ str(expected_device_claims),
+ str(unexpected_device_claims))
if not fresh:
tgt = self.tkt_cache.get(cache_key)
till = self.get_KerberosTime(offset=36000)
- if to_rodc:
+ if target_creds is not None:
+ krbtgt_creds = target_creds
+ elif to_rodc:
krbtgt_creds = self.get_rodc_krbtgt_creds()
else:
krbtgt_creds = self.get_krbtgt_creds()
ticket_decryption_key = (
- self.TicketDecryptionKey_from_creds(krbtgt_creds))
+ self.TicketDecryptionKey_from_creds(krbtgt_creds,
+ etype=ticket_etype))
expected_etypes = krbtgt_creds.tgs_supported_enctypes
expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
expect_requester_sid=expect_requester_sid,
rc4_support=rc4_support,
+ expect_client_claims=expect_client_claims,
+ expect_device_claims=expect_device_claims,
+ expected_client_claims=expected_client_claims,
+ unexpected_client_claims=unexpected_client_claims,
+ expected_device_claims=expected_device_claims,
+ unexpected_device_claims=unexpected_device_claims,
to_rodc=to_rodc)
self.check_pre_authentication(rep)
expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
expect_requester_sid=expect_requester_sid,
rc4_support=rc4_support,
+ expect_client_claims=expect_client_claims,
+ expect_device_claims=expect_device_claims,
+ expected_client_claims=expected_client_claims,
+ unexpected_client_claims=unexpected_client_claims,
+ expected_device_claims=expected_device_claims,
+ unexpected_device_claims=unexpected_device_claims,
to_rodc=to_rodc)
self.check_as_reply(rep)
from pyasn1.error import PyAsn1Error
from samba.credentials import Credentials
-from samba.dcerpc import krb5pac, security
+from samba.dcerpc import claims, krb5pac, security
from samba.gensec import FEATURE_SEAL
from samba.ndr import ndr_pack, ndr_unpack
PADATA_REQ_ENC_PA_REP
)
import samba.tests.krb5.kcrypto as kcrypto
+from samba.tests.krb5 import xpress
def BitStringEncoder_encodeValue32(
def assertSequenceElementsEqual(self, expected, got, *,
require_strict=None,
+ unchecked=None,
require_ordered=True):
- if self.strict_checking and require_ordered:
+ if self.strict_checking and require_ordered and not unchecked:
self.assertEqual(expected, got)
else:
fail_msg = f'expected: {expected} got: {got}'
- if not self.strict_checking and require_strict is not None:
- fail_msg += f' (ignoring: {require_strict})'
- expected = (x for x in expected if x not in require_strict)
- got = (x for x in got if x not in require_strict)
+ ignored = set()
+ if unchecked:
+ ignored.update(unchecked)
+ if require_strict and not self.strict_checking:
+ ignored.update(require_strict)
+
+ if ignored:
+ fail_msg += f' (ignoring: {ignored})'
+ expected = (x for x in expected if x not in ignored)
+ got = (x for x in got if x not in ignored)
self.assertCountEqual(expected, got, fail_msg)
strict_edata_checking=True,
expect_edata=None,
expect_pac=True,
- expect_claims=True,
+ expect_client_claims=None,
+ expect_device_info=None,
+ expect_device_claims=None,
expect_upn_dns_info_ex=None,
expect_pac_attrs=None,
expect_pac_attrs_pac_request=None,
expect_requester_sid=None,
rc4_support=True,
+ expected_client_claims=None,
+ unexpected_client_claims=None,
+ expected_device_claims=None,
+ unexpected_device_claims=None,
to_rodc=False):
if expected_error_mode == 0:
expected_error_mode = ()
'strict_edata_checking': strict_edata_checking,
'expect_edata': expect_edata,
'expect_pac': expect_pac,
- 'expect_claims': expect_claims,
+ 'expect_client_claims': expect_client_claims,
+ 'expect_device_info': expect_device_info,
+ 'expect_device_claims': expect_device_claims,
'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
'expect_pac_attrs': expect_pac_attrs,
'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
'expect_requester_sid': expect_requester_sid,
'rc4_support': rc4_support,
+ 'expected_client_claims': expected_client_claims,
+ 'unexpected_client_claims': unexpected_client_claims,
+ 'expected_device_claims': expected_device_claims,
+ 'unexpected_device_claims': unexpected_device_claims,
'to_rodc': to_rodc
}
if callback_dict is None:
strict_edata_checking=True,
expect_edata=None,
expect_pac=True,
- expect_claims=True,
+ expect_client_claims=None,
+ expect_device_info=None,
+ expect_device_claims=None,
expect_upn_dns_info_ex=None,
expect_pac_attrs=None,
expect_pac_attrs_pac_request=None,
expected_proxy_target=None,
expected_transited_services=None,
rc4_support=True,
+ expected_client_claims=None,
+ unexpected_client_claims=None,
+ expected_device_claims=None,
+ unexpected_device_claims=None,
to_rodc=False):
if expected_error_mode == 0:
expected_error_mode = ()
'strict_edata_checking': strict_edata_checking,
'expect_edata': expect_edata,
'expect_pac': expect_pac,
- 'expect_claims': expect_claims,
+ 'expect_client_claims': expect_client_claims,
+ 'expect_device_info': expect_device_info,
+ 'expect_device_claims': expect_device_claims,
'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
'expect_pac_attrs': expect_pac_attrs,
'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
'expected_proxy_target': expected_proxy_target,
'expected_transited_services': expected_transited_services,
'rc4_support': rc4_support,
+ 'expected_client_claims': expected_client_claims,
+ 'unexpected_client_claims': unexpected_client_claims,
+ 'expected_device_claims': expected_device_claims,
+ 'unexpected_device_claims': unexpected_device_claims,
'to_rodc': to_rodc
}
if callback_dict is None:
rep_msg_type = kdc_exchange_dict['rep_msg_type']
armor_tgt = kdc_exchange_dict['armor_tgt']
+ compound_id = rep_msg_type == KRB_TGS_REP and armor_tgt is not None
+
expected_sname = kdc_exchange_dict['expected_sname']
- expect_claims = kdc_exchange_dict['expect_claims']
+ expect_client_claims = kdc_exchange_dict['expect_client_claims']
+ expect_device_info = kdc_exchange_dict['expect_device_info']
+ expect_device_claims = kdc_exchange_dict['expect_device_claims']
expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
krb5pac.PAC_TYPE_SRV_CHECKSUM,
if constrained_delegation:
expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
- if self.kdc_fast_support:
- if expect_claims:
- expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
+ require_strict = set()
+ unchecked = set()
+ if not self.tkt_sig_support:
+ require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
+
+ expected_client_claims = kdc_exchange_dict['expected_client_claims']
+ unexpected_client_claims = kdc_exchange_dict[
+ 'unexpected_client_claims']
- if (rep_msg_type == KRB_TGS_REP
- and armor_tgt is not None):
- expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
- expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
+ if self.kdc_claims_support and expect_client_claims:
+ expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
+ else:
+ self.assertFalse(
+ expected_client_claims,
+ 'expected client claims, but client claims not expected in '
+ 'PAC')
+ self.assertFalse(
+ unexpected_client_claims,
+ 'unexpected client claims, but client claims not expected in '
+ 'PAC')
+
+ if expect_client_claims is None:
+ unchecked.add(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
+
+ expected_device_claims = kdc_exchange_dict['expected_device_claims']
+ unexpected_device_claims = kdc_exchange_dict['unexpected_device_claims']
+
+ if (self.kdc_claims_support and self.kdc_compound_id_support
+ and expect_device_claims and compound_id):
+ expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
+ else:
+ self.assertFalse(
+ expect_device_claims,
+ 'expected device claims buffer, but client claims not '
+ 'expected in PAC')
+ self.assertFalse(
+ expected_device_claims,
+ 'expected device claims, but device claims not expected in '
+ 'PAC')
+ self.assertFalse(
+ unexpected_device_claims,
+ 'unexpected device claims, but device claims not expected in '
+ 'PAC')
+
+ if expect_device_claims is None and compound_id:
+ unchecked.add(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
+
+ if self.kdc_compound_id_support and compound_id and expect_device_info:
+ expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
+ else:
+ self.assertFalse(expect_device_info,
+ 'expected device info with no armor TGT or '
+ 'for non-TGS request')
+
+ if expect_device_info is None and compound_id:
+ unchecked.add(krb5pac.PAC_TYPE_DEVICE_INFO)
if rep_msg_type == KRB_TGS_REP:
if not self.is_tgs_principal(expected_sname):
expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
- require_strict = {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO,
- krb5pac.PAC_TYPE_DEVICE_INFO,
- krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}
- if not self.tkt_sig_support:
- require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
-
expect_extra_pac_buffers = self.is_tgs(expected_sname)
expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
self.assertSequenceElementsEqual(
expected_types, buffer_types,
require_ordered=False,
- require_strict=require_strict)
+ require_strict=require_strict,
+ unchecked=unchecked)
expected_account_name = kdc_exchange_dict['expected_account_name']
expected_groups = kdc_exchange_dict['expected_groups']
if expected_sid is not None:
self.assertEqual(expected_sid, str(requester_sid))
+ elif pac_buffer.type in {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO,
+ krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}:
+ remaining = pac_buffer.info.remaining
+
+ if pac_buffer.type == krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO:
+ claims_type = 'client claims'
+ expected_claims = expected_client_claims
+ unexpected_claims = unexpected_client_claims
+ else:
+ claims_type = 'device claims'
+ expected_claims = expected_device_claims
+ unexpected_claims = unexpected_device_claims
+
+ if not remaining:
+ # Windows may produce an empty claims buffer.
+ self.assertFalse(expected_claims,
+ f'expected {claims_type}, but the PAC '
+ f'buffer was empty')
+ continue
+
+ if expected_claims:
+ empty_msg = ', and {claims_type} were expected'
+ else:
+ empty_msg = ' for {claims_type} (should be missing)'
+
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR,
+ remaining)
+ client_claims = client_claims.claims.metadata
+ self.assertIsNotNone(client_claims,
+ f'got empty CLAIMS_SET_METADATA_NDR '
+ f'inner structure {empty_msg}')
+
+ claims_data = bytes(client_claims.claims_set)
+ self.assertIsNotNone(claims_data,
+ f'got empty CLAIMS_SET_METADATA '
+ f'structure {empty_msg}')
+ self.assertGreater(len(claims_data), 0,
+ f'got empty encoded claims data '
+ f'{empty_msg}')
+ self.assertEqual(len(claims_data),
+ client_claims.claims_set_size,
+ f'encoded {claims_type} data size mismatch')
+
+ uncompressed_size = client_claims.uncompressed_claims_set_size
+ compression_format = client_claims.compression_format
+
+ if self.strict_checking:
+ if uncompressed_size < 384:
+ self.assertEqual(claims.CLAIMS_COMPRESSION_FORMAT_NONE,
+ compression_format,
+ f'{claims_type} unexpectedly '
+ f'compressed ({uncompressed_size} '
+ f'bytes uncompressed)')
+ else:
+ self.assertEqual(
+ claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF,
+ compression_format,
+ f'{claims_type} unexpectedly not compressed '
+ f'({uncompressed_size} bytes uncompressed)')
+
+ claims_data = xpress.decompress(claims_data,
+ compression_format,
+ uncompressed_size)
+
+ claims_set = ndr_unpack(claims.CLAIMS_SET_NDR,
+ claims_data)
+ claims_set = claims_set.claims.claims
+ self.assertIsNotNone(claims_set,
+ f'got empty CLAIMS_SET_NDR inner '
+ f'structure {empty_msg}')
+
+ claims_arrays = claims_set.claims_arrays
+ self.assertIsNotNone(claims_arrays,
+ f'got empty CLAIMS_SET structure '
+ f'{empty_msg}')
+ self.assertGreater(len(claims_arrays), 0,
+ f'got empty claims array {empty_msg}')
+ self.assertEqual(len(claims_arrays),
+ claims_set.claims_array_count,
+ f'{claims_type} arrays size mismatch')
+
+ got_claims = {}
+
+ for claims_array in claims_arrays:
+ claim_entries = claims_array.claim_entries
+ self.assertIsNotNone(claim_entries,
+ f'got empty CLAIMS_ARRAY structure '
+ f'{empty_msg}')
+ self.assertGreater(len(claim_entries), 0,
+ f'got empty claim entries array '
+ f'{empty_msg}')
+ self.assertEqual(len(claim_entries),
+ claims_array.claims_count,
+ f'{claims_type} entries array size '
+ f'mismatch')
+
+ for entry in claim_entries:
+ if unexpected_claims is not None:
+ self.assertNotIn(entry.id, unexpected_claims,
+ f'got unexpected {claims_type} '
+ f'in PAC')
+ if expected_claims is None:
+ continue
+
+ expected_claim = expected_claims.get(entry.id)
+ if expected_claim is None:
+ continue
+
+ self.assertNotIn(entry.id, got_claims,
+ f'got duplicate {claims_type}')
+
+ self.assertIsNotNone(entry.values.values,
+ f'got {claims_type} with no '
+ f'values')
+ self.assertGreater(len(entry.values.values), 0,
+ f'got empty {claims_type} values '
+ f'array')
+ self.assertEqual(len(entry.values.values),
+ entry.values.value_count,
+ f'{claims_type} values array size '
+ f'mismatch')
+
+ expected_claim_values = expected_claim.get('values')
+ self.assertIsNotNone(expected_claim_values,
+ f'got expected {claims_type} '
+ f'with no values')
+
+ values = type(expected_claim_values)(
+ entry.values.values)
+
+ got_claims[entry.id] = {
+ 'source_type': claims_array.claims_source_type,
+ 'type': entry.type,
+ 'values': values,
+ }
+
+ self.assertEqual(expected_claims, got_claims or None,
+ f'{claims_type} did not match expectations')
+
+ elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
+ device_info = pac_buffer.info.info
+
+ armor_auth_data = armor_tgt.ticket_private.get(
+ 'authorization-data')
+ self.assertIsNotNone(armor_auth_data,
+ 'missing authdata for armor TGT')
+ armor_pac_data = self.get_pac(armor_auth_data)
+ armor_pac = ndr_unpack(krb5pac.PAC_DATA, armor_pac_data)
+ for armor_pac_buffer in armor_pac.buffers:
+ if armor_pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
+ armor_info = armor_pac_buffer.info.info.info3
+ break
+ else:
+ self.fail('missing logon info for armor PAC')
+
+ self.assertEqual(armor_info.base.rid, device_info.rid)
+
+ self.assertEqual(armor_info.base.primary_gid,
+ device_info.primary_gid)
+ self.assertEqual(security.DOMAIN_RID_DOMAIN_MEMBERS,
+ device_info.primary_gid)
+
+ self.assertEqual(armor_info.base.domain_sid,
+ device_info.domain_sid)
+
+ def get_groups(groups):
+ return [(x.rid, x.attributes) for x in groups.rids]
+
+ self.assertEqual(get_groups(armor_info.base.groups),
+ get_groups(device_info.groups))
+
+ self.assertEqual(1, device_info.sid_count)
+ self.assertEqual(
+ security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+ str(device_info.sids[0].sid))
+
+ claims_valid_sid, claims_valid_rid = (
+ security.SID_CLAIMS_VALID.rsplit('-', 1))
+
+ self.assertEqual(1, device_info.domain_group_count)
+ domain_group = device_info.domain_groups[0]
+ self.assertEqual(claims_valid_sid,
+ str(domain_group.domain_sid))
+
+ self.assertEqual(1, domain_group.groups.count)
+ self.assertEqual(int(claims_valid_rid),
+ domain_group.groups.rids[0].rid)
+
def generic_check_kdc_error(self,
kdc_exchange_dict,
callback_dict,
expect_pac_attrs=None,
expect_pac_attrs_pac_request=None,
expect_requester_sid=None,
+ expect_client_claims=None,
+ expect_device_claims=None,
+ expected_client_claims=None,
+ unexpected_client_claims=None,
+ expected_device_claims=None,
+ unexpected_device_claims=None,
expect_edata=None,
rc4_support=True,
to_rodc=False):
expect_pac_attrs=expect_pac_attrs,
expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
expect_requester_sid=expect_requester_sid,
+ expect_client_claims=expect_client_claims,
+ expect_device_claims=expect_device_claims,
+ expected_client_claims=expected_client_claims,
+ unexpected_client_claims=unexpected_client_claims,
+ expected_device_claims=expected_device_claims,
+ unexpected_device_claims=unexpected_device_claims,
expect_edata=expect_edata,
rc4_support=rc4_support,
to_rodc=to_rodc)