import random
import re
-from functools import partial
-
import ldb
from samba import werror
-from samba.dcerpc import krb5pac, netlogon, samr, security
+from samba.dcerpc import netlogon, security
from samba.tests import DynamicTestCase, env_get_var_value
from samba.tests.krb5 import kcrypto
from samba.tests.krb5.kdc_base_test import GroupType, KDCBaseTest, Principal
self.assertEqual(domain_local_sid, str(sids[0].sid))
self.assertEqual(self.resource_attrs, sids[0].attributes)
- # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
- # is useful for creating arbitrary tickets that can be used to perform a
- # TGS-REQ.
- def ticket_with_sids(self,
- ticket,
- new_sids,
- domain_sid,
- user_rid,
- set_user_flags=0,
- reset_user_flags=0):
- krbtgt_creds = self.get_krbtgt_creds()
- krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
-
- checksum_keys = {
- krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
- }
-
- modify_pac_fn = partial(self.set_pac_sids,
- new_sids=new_sids,
- domain_sid=domain_sid,
- user_rid=user_rid,
- set_user_flags=set_user_flags,
- reset_user_flags=reset_user_flags)
-
- return self.modified_ticket(ticket,
- modify_pac_fn=modify_pac_fn,
- checksum_keys=checksum_keys)
-
- # Replace the SIDs in a PAC with 'new_sids'.
- def set_pac_sids(self,
- pac,
- new_sids,
- domain_sid,
- user_rid,
- set_user_flags=0,
- reset_user_flags=0):
- base_sids = []
- extra_sids = []
- resource_sids = []
-
- resource_domain = None
-
- primary_gid = None
-
- # Filter our SIDs into three arrays depending on their ultimate
- # location in the PAC.
- for sid, sid_type, attrs in new_sids:
- if sid_type is self.SidType.BASE_SID:
- domain, rid = sid.rsplit('-', 1)
- self.assertEqual(domain_sid, domain,
- f'base SID {sid} must be in our domain')
-
- base_sid = samr.RidWithAttribute()
- base_sid.rid = int(rid)
- base_sid.attributes = attrs
-
- base_sids.append(base_sid)
- elif sid_type is self.SidType.EXTRA_SID:
- extra_sid = netlogon.netr_SidAttr()
- extra_sid.sid = security.dom_sid(sid)
- extra_sid.attributes = attrs
-
- extra_sids.append(extra_sid)
- elif sid_type is self.SidType.RESOURCE_SID:
- domain, rid = sid.rsplit('-', 1)
- if resource_domain is None:
- resource_domain = domain
- else:
- self.assertEqual(resource_domain, domain,
- 'resource SIDs must share the same '
- 'domain')
-
- resource_sid = samr.RidWithAttribute()
- resource_sid.rid = int(rid)
- resource_sid.attributes = attrs
-
- resource_sids.append(resource_sid)
- elif sid_type is self.SidType.PRIMARY_GID:
- self.assertIsNone(primary_gid,
- f'must not specify a second primary GID '
- f'{sid}')
- self.assertIsNone(attrs, 'cannot specify primary GID attrs')
-
- domain, primary_gid = sid.rsplit('-', 1)
- self.assertEqual(domain_sid, domain,
- f'primary GID {sid} must be in our domain')
- else:
- self.fail(f'invalid SID type {sid_type}')
-
- found_logon_info = True
-
- user_sid = security.dom_sid(f'{domain_sid}-{user_rid}')
-
- pac_buffers = pac.buffers
- for pac_buffer in pac_buffers:
- # Find the LOGON_INFO PAC buffer.
- if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
- logon_info = pac_buffer.info.info
-
- # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
- logon_info.info3.sidcount = len(extra_sids)
- if extra_sids:
- logon_info.info3.sids = extra_sids
- logon_info.info3.base.user_flags |= (
- netlogon.NETLOGON_EXTRA_SIDS)
- else:
- logon_info.info3.sids = None
- logon_info.info3.base.user_flags &= ~(
- netlogon.NETLOGON_EXTRA_SIDS)
-
- # Add Base SIDs.
- logon_info.info3.base.groups.count = len(base_sids)
- if base_sids:
- logon_info.info3.base.groups.rids = base_sids
- else:
- logon_info.info3.base.groups.rids = None
-
- logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
- logon_info.info3.base.rid = int(user_rid)
-
- if primary_gid is not None:
- logon_info.info3.base.primary_gid = int(primary_gid)
-
- # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
- logon_info.resource_groups.groups.count = len(resource_sids)
- if resource_sids:
- resource_domain = security.dom_sid(resource_domain)
- logon_info.resource_groups.domain_sid = resource_domain
- logon_info.resource_groups.groups.rids = resource_sids
- logon_info.info3.base.user_flags |= (
- netlogon.NETLOGON_RESOURCE_GROUPS)
- else:
- logon_info.resource_groups.domain_sid = None
- logon_info.resource_groups.groups.rids = None
- logon_info.info3.base.user_flags &= ~(
- netlogon.NETLOGON_RESOURCE_GROUPS)
-
- logon_info.info3.base.user_flags |= set_user_flags
- logon_info.info3.base.user_flags &= ~reset_user_flags
-
- found_logon_info = True
-
- # Also replace the user's SID in the UPN DNS buffer.
- elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
- upn_dns_info_ex = pac_buffer.info.ex
-
- upn_dns_info_ex.objectsid = user_sid
-
- # But don't replace the user's SID in the Requester SID buffer, or
- # we'll get a SID mismatch.
-
- self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
-
- pac.buffers = pac_buffers
-
- return pac
-
# A list of test cases.
cases = [
# AS-REQ tests.
os.environ["PYTHONUNBUFFERED"] = "1"
from datetime import datetime, timezone
+from functools import partial
import tempfile
import binascii
import collections
return mapped_sids
+ # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
+ # is useful for creating arbitrary tickets that can be used to perform a
+ # TGS-REQ.
+ def ticket_with_sids(self,
+ ticket,
+ new_sids,
+ domain_sid,
+ user_rid,
+ set_user_flags=0,
+ reset_user_flags=0):
+ krbtgt_creds = self.get_krbtgt_creds()
+ krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
+
+ checksum_keys = {
+ krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
+ }
+
+ modify_pac_fn = partial(self.set_pac_sids,
+ new_sids=new_sids,
+ domain_sid=domain_sid,
+ user_rid=user_rid,
+ set_user_flags=set_user_flags,
+ reset_user_flags=reset_user_flags)
+
+ return self.modified_ticket(ticket,
+ modify_pac_fn=modify_pac_fn,
+ checksum_keys=checksum_keys)
+
+ # Replace the SIDs in a PAC with 'new_sids'.
+ def set_pac_sids(self,
+ pac,
+ new_sids,
+ domain_sid,
+ user_rid,
+ set_user_flags=0,
+ reset_user_flags=0):
+ base_sids = []
+ extra_sids = []
+ resource_sids = []
+
+ resource_domain = None
+
+ primary_gid = None
+
+ # Filter our SIDs into three arrays depending on their ultimate
+ # location in the PAC.
+ for sid, sid_type, attrs in new_sids:
+ if sid_type is self.SidType.BASE_SID:
+ domain, rid = sid.rsplit('-', 1)
+ self.assertEqual(domain_sid, domain,
+ f'base SID {sid} must be in our domain')
+
+ base_sid = samr.RidWithAttribute()
+ base_sid.rid = int(rid)
+ base_sid.attributes = attrs
+
+ base_sids.append(base_sid)
+ elif sid_type is self.SidType.EXTRA_SID:
+ extra_sid = netlogon.netr_SidAttr()
+ extra_sid.sid = security.dom_sid(sid)
+ extra_sid.attributes = attrs
+
+ extra_sids.append(extra_sid)
+ elif sid_type is self.SidType.RESOURCE_SID:
+ domain, rid = sid.rsplit('-', 1)
+ if resource_domain is None:
+ resource_domain = domain
+ else:
+ self.assertEqual(resource_domain, domain,
+ 'resource SIDs must share the same '
+ 'domain')
+
+ resource_sid = samr.RidWithAttribute()
+ resource_sid.rid = int(rid)
+ resource_sid.attributes = attrs
+
+ resource_sids.append(resource_sid)
+ elif sid_type is self.SidType.PRIMARY_GID:
+ self.assertIsNone(primary_gid,
+ f'must not specify a second primary GID '
+ f'{sid}')
+ self.assertIsNone(attrs, 'cannot specify primary GID attrs')
+
+ domain, primary_gid = sid.rsplit('-', 1)
+ self.assertEqual(domain_sid, domain,
+ f'primary GID {sid} must be in our domain')
+ else:
+ self.fail(f'invalid SID type {sid_type}')
+
+ found_logon_info = True
+
+ user_sid = security.dom_sid(f'{domain_sid}-{user_rid}')
+
+ pac_buffers = pac.buffers
+ for pac_buffer in pac_buffers:
+ # Find the LOGON_INFO PAC buffer.
+ if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
+ logon_info = pac_buffer.info.info
+
+ # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
+ logon_info.info3.sidcount = len(extra_sids)
+ if extra_sids:
+ logon_info.info3.sids = extra_sids
+ logon_info.info3.base.user_flags |= (
+ netlogon.NETLOGON_EXTRA_SIDS)
+ else:
+ logon_info.info3.sids = None
+ logon_info.info3.base.user_flags &= ~(
+ netlogon.NETLOGON_EXTRA_SIDS)
+
+ # Add Base SIDs.
+ logon_info.info3.base.groups.count = len(base_sids)
+ if base_sids:
+ logon_info.info3.base.groups.rids = base_sids
+ else:
+ logon_info.info3.base.groups.rids = None
+
+ logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
+ logon_info.info3.base.rid = int(user_rid)
+
+ if primary_gid is not None:
+ logon_info.info3.base.primary_gid = int(primary_gid)
+
+ # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
+ logon_info.resource_groups.groups.count = len(resource_sids)
+ if resource_sids:
+ resource_domain = security.dom_sid(resource_domain)
+ logon_info.resource_groups.domain_sid = resource_domain
+ logon_info.resource_groups.groups.rids = resource_sids
+ logon_info.info3.base.user_flags |= (
+ netlogon.NETLOGON_RESOURCE_GROUPS)
+ else:
+ logon_info.resource_groups.domain_sid = None
+ logon_info.resource_groups.groups.rids = None
+ logon_info.info3.base.user_flags &= ~(
+ netlogon.NETLOGON_RESOURCE_GROUPS)
+
+ logon_info.info3.base.user_flags |= set_user_flags
+ logon_info.info3.base.user_flags &= ~reset_user_flags
+
+ found_logon_info = True
+
+ # Also replace the user's SID in the UPN DNS buffer.
+ elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
+ upn_dns_info_ex = pac_buffer.info.ex
+
+ upn_dns_info_ex.objectsid = user_sid
+
+ # But don't replace the user's SID in the Requester SID buffer, or
+ # we'll get a SID mismatch.
+
+ self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
+
+ pac.buffers = pac_buffers
+
+ return pac
+
def get_cached_creds(self, *,
account_type,
opts=None,