From: Joseph Sutton Date: Mon, 21 Feb 2022 01:58:30 +0000 (+1300) Subject: samba-tool delegation: Add commands to add/remove principals for RBCD X-Git-Tag: tevent-0.12.0~231 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9a480f274b62d8e491bcd54bfd189099729ff57a;p=thirdparty%2Fsamba.git samba-tool delegation: Add commands to add/remove principals for RBCD These commands allow updating the msDS-AllowedToActOnBehalfOfOtherIdentity attribute with principals allowed to delegate to an account. BUG: https://bugzilla.samba.org/show_bug.cgi?id=14954 Signed-off-by: Joseph Sutton Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/netcmd/delegation.py b/python/samba/netcmd/delegation.py index 7942f96ea84..a8c36b8e96a 100644 --- a/python/samba/netcmd/delegation.py +++ b/python/samba/netcmd/delegation.py @@ -25,7 +25,7 @@ from samba import dsdb from samba.samdb import SamDB from samba.auth import system_session from samba.dcerpc import security -from samba.ndr import ndr_unpack +from samba.ndr import ndr_pack, ndr_unpack from samba.netcmd.common import _get_user_realm_domain from samba.netcmd import ( Command, @@ -400,6 +400,278 @@ class cmd_delegation_del_service(Command): raise CommandError(err) +class cmd_delegation_add_principal(Command): + """Add a principal to msDS-AllowedToActOnBehalfOfOtherIdentity that may delegate to an account.""" + + synopsis = "%prog [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "versionopts": options.VersionOptions, + } + + takes_options = [ + Option("-H", "--URL", help="LDB URL for database or target server", + type=str, metavar="URL", dest="H"), + ] + + takes_args = ["accountname", "principal"] + + def run(self, accountname, principal, H=None, credopts=None, sambaopts=None, + versionopts=None): + + lp = sambaopts.get_loadparm() + creds = credopts.get_credentials(lp) + paths = provision.provision_paths_from_lp(lp, lp.get("realm")) + if H is None: + path = paths.samdb + else: + path = H + + sam = SamDB(path, session_info=system_session(), + credentials=creds, lp=lp) + # TODO once I understand how, use the domain info to naildown + # to the correct domain + cleanedaccount, _, _ = _get_user_realm_domain(accountname) + + account_res = sam.search( + expression="sAMAccountName=%s" % + ldb.binary_encode(cleanedaccount), + scope=ldb.SCOPE_SUBTREE, + attrs=["msDS-AllowedToActOnBehalfOfOtherIdentity"]) + if len(account_res) == 0: + raise CommandError(f"Unable to find account name '{accountname}'") + assert(len(account_res) == 1) + + data = account_res[0].get( + "msDS-AllowedToActOnBehalfOfOtherIdentity", idx=0) + if data is None: + # Create the security descriptor if it is not present. + owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + + security_desc = security.descriptor() + security_desc.revision = security.SD_REVISION + security_desc.type = (security.SEC_DESC_DACL_PRESENT | + security.SEC_DESC_SELF_RELATIVE) + security_desc.owner_sid = owner_sid + + dacl = None + else: + try: + security_desc = ndr_unpack(security.descriptor, data) + except RuntimeError: + raise CommandError(f"Security Descriptor of attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity " + f"for account '{accountname}' could not be " + f"unmarshalled!") + + dacl = security_desc.dacl + + if dacl is None: + # Create the DACL if it is not present. + dacl = security.acl() + dacl.revision = security.SECURITY_ACL_REVISION_ADS + dacl.num_aces = 0 + + # TODO once I understand how, use the domain info to naildown + # to the correct domain + cleanedprinc, _, _ = _get_user_realm_domain(principal) + + princ_res = sam.search(expression="sAMAccountName=%s" % + ldb.binary_encode(cleanedprinc), + scope=ldb.SCOPE_SUBTREE, + attrs=["objectSid"]) + if len(princ_res) == 0: + raise CommandError(f"Unable to find principal name '{principal}'") + assert(len(princ_res) == 1) + + princ_sid = security.dom_sid( + sam.schema_format_value( + "objectSID", + princ_res[0].get("objectSID", idx=0)).decode("utf-8")) + + aces = dacl.aces + + # Check that there is no existing ACE for this principal. + if any(ace.trustee == princ_sid for ace in aces): + raise CommandError( + f"ACE for principal '{principal}' already present in Security " + f"Descriptor of attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for account " + f"'{accountname}'.") + + # Create the new ACE. + ace = security.ace() + ace.type = security.SEC_ACE_TYPE_ACCESS_ALLOWED + ace.flags = 0 + ace.access_mask = security.SEC_ADS_GENERIC_ALL + ace.trustee = princ_sid + + aces.append(ace) + + dacl.aces = aces + dacl.num_aces += 1 + + security_desc.dacl = dacl + + new_data = ndr_pack(security_desc) + + # Set the new security descriptor. First, delete the original value to + # detect a race condition if someone else updates the attribute at the + # same time. + msg = ldb.Message() + msg.dn = account_res[0].dn + if data is not None: + msg["0"] = ldb.MessageElement( + data, ldb.FLAG_MOD_DELETE, + "msDS-AllowedToActOnBehalfOfOtherIdentity") + msg["1"] = ldb.MessageElement( + new_data, ldb.FLAG_MOD_ADD, + "msDS-AllowedToActOnBehalfOfOtherIdentity") + try: + sam.modify(msg) + except ldb.LdbError as err: + num, _ = err.args + if num == ldb.ERR_NO_SUCH_ATTRIBUTE: + raise CommandError( + f"Refused to update attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for account " + f"'{accountname}': a conflicting attribute update " + f"occurred simultaneously.") + else: + raise CommandError(err) + + +class cmd_delegation_del_principal(Command): + """Delete a principal from msDS-AllowedToActOnBehalfOfOtherIdentity that may no longer delegate to an account.""" + + synopsis = "%prog [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "versionopts": options.VersionOptions, + } + + takes_options = [ + Option("-H", "--URL", help="LDB URL for database or target server", + type=str, metavar="URL", dest="H"), + ] + + takes_args = ["accountname", "principal"] + + def run(self, accountname, principal, H=None, credopts=None, sambaopts=None, + versionopts=None): + + lp = sambaopts.get_loadparm() + creds = credopts.get_credentials(lp) + paths = provision.provision_paths_from_lp(lp, lp.get("realm")) + if H is None: + path = paths.samdb + else: + path = H + + sam = SamDB(path, session_info=system_session(), + credentials=creds, lp=lp) + # TODO once I understand how, use the domain info to naildown + # to the correct domain + cleanedaccount, _, _ = _get_user_realm_domain(accountname) + + account_res = sam.search( + expression="sAMAccountName=%s" % + ldb.binary_encode(cleanedaccount), + scope=ldb.SCOPE_SUBTREE, + attrs=["msDS-AllowedToActOnBehalfOfOtherIdentity"]) + if len(account_res) == 0: + raise CommandError("Unable to find account name '%s'" % accountname) + assert(len(account_res) == 1) + + data = account_res[0].get( + "msDS-AllowedToActOnBehalfOfOtherIdentity", idx=0) + if data is None: + raise CommandError(f"Attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for " + f"account '{accountname}' not present!") + + try: + security_desc = ndr_unpack(security.descriptor, data) + except RuntimeError: + raise CommandError(f"Security Descriptor of attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for " + f"account '{accountname}' could not be " + f"unmarshalled!") + + dacl = security_desc.dacl + if dacl is None: + raise CommandError(f"DACL not present on Security Descriptor of " + f"attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for " + f"account '{accountname}'!") + + # TODO once I understand how, use the domain info to naildown + # to the correct domain + cleanedprinc, _, _ = _get_user_realm_domain( + principal) + + princ_res = sam.search(expression="sAMAccountName=%s" % + ldb.binary_encode(cleanedprinc), + scope=ldb.SCOPE_SUBTREE, + attrs=["objectSid"]) + if len(princ_res) == 0: + raise CommandError(f"Unable to find principal name '{principal}'") + assert(len(princ_res) == 1) + + princ_sid = security.dom_sid( + sam.schema_format_value( + "objectSID", + princ_res[0].get("objectSID", idx=0)).decode("utf-8")) + + old_aces = dacl.aces + + # Remove any ACEs relating to the specified principal. + aces = [ace for ace in old_aces if ace.trustee != princ_sid] + + # Raise an error if we didn't find any. + if len(aces) == len(old_aces): + raise CommandError(f"Unable to find ACE for principal " + f"'{principal}' in Security Descriptor of " + f"attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for " + f"account '{accountname}'.") + + dacl.num_aces = len(aces) + dacl.aces = aces + + security_desc.dacl = dacl + + new_data = ndr_pack(security_desc) + + # Set the new security descriptor. First, delete the original value to + # detect a race condition if someone else updates the attribute at the + # same time. + msg = ldb.Message() + msg.dn = account_res[0].dn + msg["0"] = ldb.MessageElement( + data, ldb.FLAG_MOD_DELETE, + "msDS-AllowedToActOnBehalfOfOtherIdentity") + msg["1"] = ldb.MessageElement( + new_data, ldb.FLAG_MOD_ADD, + "msDS-AllowedToActOnBehalfOfOtherIdentity") + try: + sam.modify(msg) + except ldb.LdbError as err: + num, _ = err.args + if num == ldb.ERR_NO_SUCH_ATTRIBUTE: + raise CommandError( + f"Refused to update attribute " + f"msDS-AllowedToActOnBehalfOfOtherIdentity for account " + f"'{accountname}': a conflicting attribute update " + f"occurred simultaneously.") + else: + raise CommandError(err) + + class cmd_delegation(SuperCommand): """Delegation management.""" @@ -409,3 +681,5 @@ class cmd_delegation(SuperCommand): subcommands["for-any-protocol"] = cmd_delegation_for_any_protocol() subcommands["add-service"] = cmd_delegation_add_service() subcommands["del-service"] = cmd_delegation_del_service() + subcommands["add-principal"] = cmd_delegation_add_principal() + subcommands["del-principal"] = cmd_delegation_del_principal()