from pyasn1.error import PyAsn1Error
from samba.credentials import Credentials
-from samba.dcerpc import claims, krb5pac, security
+from samba.dcerpc import claims, krb5pac, netlogon, security
from samba.gensec import FEATURE_SEAL
from samba.ndr import ndr_pack, ndr_unpack
SET = object()
CHANGE = object()
+ # The location of a SID within the PAC
+ class SidType(Enum):
+ BASE_SID = object() # in info3.base.groups
+ EXTRA_SID = object() # in info3.sids
+ RESOURCE_SID = object() # in resource_groups
+
pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
krb5pac.PAC_TYPE_KDC_CHECKSUM,
krb5pac.PAC_TYPE_TICKET_CHECKSUM}
kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
+ # Check the SIDs in a LOGON_INFO PAC buffer.
+ def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict):
+ info3 = logon_info_buffer.info.info.info3
+ logon_info = info3.base
+ resource_groups = logon_info_buffer.info.info.resource_groups
+
+ expected_groups = kdc_exchange_dict['expected_groups']
+ unexpected_groups = kdc_exchange_dict['unexpected_groups']
+ expected_sid = kdc_exchange_dict['expected_sid']
+
+ domain_sid = logon_info.domain_sid
+
+ if expected_sid is not None:
+ got_sid = f'{domain_sid}-{logon_info.rid}'
+ self.assertEqual(expected_sid, got_sid)
+
+ if expected_groups is None and unexpected_groups is None:
+ # Nothing more to do.
+ return
+
+ # Check the SIDs in the PAC.
+
+ # A representation of the PAC.
+ pac_sids = set()
+
+ # Collect the Extra SIDs.
+ if info3.sids is not None:
+ self.assertTrue(logon_info.user_flags & (
+ netlogon.NETLOGON_EXTRA_SIDS),
+ 'extra SIDs present, but EXTRA_SIDS flag not set')
+ self.assertTrue(info3.sids, 'got empty SIDs')
+
+ for sid_attr in info3.sids:
+ got_sid = str(sid_attr.sid)
+ if unexpected_groups is not None:
+ self.assertNotIn(got_sid, unexpected_groups)
+
+ pac_sid = (got_sid,
+ self.SidType.EXTRA_SID,
+ sid_attr.attributes)
+ self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+ pac_sids.add(pac_sid)
+ else:
+ self.assertFalse(logon_info.user_flags & (
+ netlogon.NETLOGON_EXTRA_SIDS),
+ 'no extra SIDs present, but EXTRA_SIDS flag set')
+
+ # Collect the Base RIDs.
+ if logon_info.groups.rids is not None:
+ self.assertTrue(logon_info.groups.rids, 'got empty RIDs')
+
+ for group in logon_info.groups.rids:
+ got_sid = f'{domain_sid}-{group.rid}'
+ if unexpected_groups is not None:
+ self.assertNotIn(got_sid, unexpected_groups)
+
+ pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
+ self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+ pac_sids.add(pac_sid)
+
+ # Collect the Resource SIDs.
+ if resource_groups.groups.rids is not None:
+ self.assertTrue(logon_info.user_flags & (
+ netlogon.NETLOGON_RESOURCE_GROUPS),
+ 'resource groups present, but RESOURCE_GROUPS '
+ 'flag not set')
+ self.assertTrue(resource_groups.groups.rids, 'got empty RIDs')
+
+ resource_group_sid = resource_groups.domain_sid
+ for resource_group in resource_groups.groups.rids:
+ got_sid = f'{resource_group_sid}-{resource_group.rid}'
+ if unexpected_groups is not None:
+ self.assertNotIn(got_sid, unexpected_groups)
+
+ pac_sid = (got_sid,
+ self.SidType.RESOURCE_SID,
+ resource_group.attributes)
+ self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+ pac_sids.add(pac_sid)
+ else:
+ self.assertFalse(logon_info.user_flags & (
+ netlogon.NETLOGON_RESOURCE_GROUPS),
+ 'no resource groups present, but RESOURCE_GROUPS '
+ 'flag set')
+
+ # Compare the aggregated SIDs against the set of expected SIDs.
+ if expected_groups is not None:
+ if ... in expected_groups:
+ # The caller is only interested in asserting the
+ # presence of particular groups, and doesn't mind if
+ # other groups are present as well.
+ pac_sids.add(...)
+ self.assertLessEqual(expected_groups, pac_sids,
+ 'expected groups')
+ else:
+ # The caller wants to make sure the groups match
+ # exactly.
+ self.assertEqual(expected_groups, pac_sids,
+ 'expected != got')
+
def check_pac_buffers(self, pac_data, kdc_exchange_dict):
pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
unchecked=unchecked)
expected_account_name = kdc_exchange_dict['expected_account_name']
- expected_groups = kdc_exchange_dict['expected_groups']
- unexpected_groups = kdc_exchange_dict['unexpected_groups']
expected_sid = kdc_exchange_dict['expected_sid']
expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
self.assertEqual(expected_account_name,
str(logon_info.account_name))
- if expected_sid is not None:
- expected_rid = int(expected_sid.rsplit('-', 1)[1])
- self.assertEqual(expected_rid, logon_info.rid)
-
- if expected_groups is not None:
- self.assertIsNotNone(info3.sids)
- got_sids = {str(sid_attr.sid) for sid_attr in info3.sids}
- self.assertEqual(info3.sidcount,
- len(got_sids),
- 'Found duplicate SIDs')
-
- match_count = 0
- for g in expected_groups:
- for sid_attr in info3.sids:
- if g == str(sid_attr.sid):
- match_count += 1
- self.assertEqual(match_count, len(expected_groups))
-
- if unexpected_groups is not None:
- match_count = 0
-
- for g in unexpected_groups:
- self.assertIsNotNone(info3.sids)
- for sid_attr in info3.sids:
- if g == str(sid_attr.sid):
- match_count += 1
- self.assertEqual(match_count, 0)
+ self.check_logon_info_sids(pac_buffer, kdc_exchange_dict)
elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
upn_dns_info = pac_buffer.info
from samba.tests.krb5.kcrypto import Cksumtype, Enctype
from samba.tests.krb5.kdc_base_test import KDCBaseTest
from samba.tests.krb5.raw_testcase import (
+ RawKerberosTest,
RodcPacEncryptionKey,
ZeroedChecksumKey
)
)
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
+SidType = RawKerberosTest.SidType
+
global_asn1_print = False
global_hexdump = False
class S4UKerberosTests(KDCBaseTest):
+ default_attrs = (security.SE_GROUP_MANDATORY |
+ security.SE_GROUP_ENABLED_BY_DEFAULT |
+ security.SE_GROUP_ENABLED)
+
def setUp(self):
super(S4UKerberosTests, self).setUp()
self.do_asn1_print = global_asn1_print
'client_opts': {
'not_delegated': False
},
- 'expected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY],
- 'unexpected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY]
+ 'expected_groups': {
+ (security.SID_SERVICE_ASSERTED_IDENTITY,
+ SidType.EXTRA_SID,
+ self.default_attrs),
+ ...
+ },
+ 'unexpected_groups': {
+ security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+ },
})
def _run_delegation_test(self, kdc_dict):
{
'expected_error_mode': 0,
'allow_delegation': True,
- 'expected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY],
- 'unexpected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY]
+ 'expected_groups': {
+ (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+ SidType.EXTRA_SID,
+ self.default_attrs),
+ ...
+ },
+ 'unexpected_groups': {
+ security.SID_SERVICE_ASSERTED_IDENTITY,
+ },
})
def test_constrained_delegation_service_asserted_identity(self):
'service1_opts': {
'trusted_to_auth_for_delegation': True,
},
- 'expected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY],
- 'unexpected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY]
+ 'expected_groups': {
+ (security.SID_SERVICE_ASSERTED_IDENTITY,
+ SidType.EXTRA_SID,
+ self.default_attrs),
+ ...
+ },
+ 'unexpected_groups': {
+ security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+ },
})
def test_constrained_delegation_no_auth_data_required(self):