]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
samba-tool user disable: add new --remove-supplemental-groups option
authorBjörn Baumbach <bb@sernet.de>
Wed, 20 Nov 2024 16:54:17 +0000 (17:54 +0100)
committerBjörn Baumbach <bb@sernet.de>
Thu, 23 Jan 2025 19:51:05 +0000 (19:51 +0000)
Removes all supplemental groups from a user, what is commonly
wanted when a user is disabled.

Pair-programmed-with: Stefan Metzmacher <metze@samba.org>
Signed-off-by: Björn Baumbach <bb@sernet.de>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jule Anger <janger@samba.org>
Autobuild-User(master): Björn Baumbach <bb@sernet.de>
Autobuild-Date(master): Thu Jan 23 19:51:05 UTC 2025 on atb-devel-224

docs-xml/manpages/samba-tool.8.xml
python/samba/netcmd/user/disable.py
python/samba/samdb.py
python/samba/tests/samba_tool/user.py

index 62ce4e690d451b0f4cde089a67dfa4838d2be5e4..3bd7b75a73e06a52de1c6293f981bfd862c29b3a 100644 (file)
 <refsect3>
        <title>user disable <replaceable>username</replaceable></title>
        <para>Disable a user account.</para>
+       <variablelist>
+       <varlistentry>
+       <term>--remove-supplemental-groups</term>
+       <listitem><para>
+       Remove user from all groups, but keep the primary group.
+       </para></listitem>
+       </varlistentry>
+       </variablelist>
 </refsect3>
 
 <refsect3>
index 37310e08880e775d6a7361dca677246445677627..d959f6d45f833ec9aff44e10949b26b5928c3407 100644 (file)
@@ -24,6 +24,8 @@ from samba import ldb
 from samba.auth import system_session
 from samba.netcmd import Command, CommandError, Option
 from samba.samdb import SamDB
+from samba.dcerpc import security
+from samba.ndr import ndr_unpack
 
 
 class cmd_user_disable(Command):
@@ -38,6 +40,9 @@ class cmd_user_disable(Command):
                help="LDAP filter to select user",
                type=str,
                dest="search_filter"),
+        Option("--remove-supplemental-groups",
+               help="Remove user's supplemental groups",
+               action="store_true"),
     ]
 
     takes_args = ["username?"]
@@ -49,7 +54,8 @@ class cmd_user_disable(Command):
     }
 
     def run(self, username=None, sambaopts=None, credopts=None,
-            versionopts=None, search_filter=None, H=None):
+            versionopts=None, search_filter=None, H=None,
+            remove_supplemental_groups=False):
         if username is None and search_filter is None:
             raise CommandError("Either the username or '--filter' must be specified!")
 
@@ -63,18 +69,46 @@ class cmd_user_disable(Command):
         samdb = SamDB(url=H, session_info=system_session(),
                       credentials=creds, lp=lp)
 
-        res = samdb.search(base=samdb.domain_dn(),
-                           expression=search_filter,
-                           scope=ldb.SCOPE_SUBTREE)
-        if len(res) < 1:
-            raise CommandError("Unable to find user for '%s'" % (
+        samdb.transaction_start()
+        try:
+            res = samdb.search(base=samdb.domain_dn(),
+                               expression=search_filter,
+                               scope=ldb.SCOPE_SUBTREE,
+                               controls=["extended_dn:1:1"],
+                               attrs=["objectSid", "memberOf"])
+            user_groups = res[0].get("memberOf")
+            if user_groups is None:
+                user_groups = []
+            user_binary_sid = res[0].get("objectSid", idx=0)
+            user_sid = ndr_unpack(security.dom_sid, user_binary_sid)
+        except IndexError:
+            samdb.transaction_cancel()
+            raise CommandError("Unable to find user '%s'" % (
                                username or search_filter))
+        except Exception as msg:
+            samdb.transaction_cancel()
+            raise CommandError("Failed to find user '%s': '%s'" % (
+                               username or search_filter, msg))
         if len(res) > 1:
+            samdb.transaction_cancel()
             raise CommandError("Found more than one user '%s'" % (
                                username or search_filter))
 
+        if remove_supplemental_groups:
+            for user_group in user_groups:
+                try:
+                    samdb.add_remove_group_members(str(user_group),
+                                                   [str(user_sid)],
+                                                   add_members_operation=False)
+                except Exception as msg:
+                    samdb.transaction_cancel()
+                    raise CommandError("Failed to remove user from group "
+                                       "'%s': %s" % (user_group, msg))
+
         try:
             samdb.disable_account(search_filter)
         except Exception as msg:
+            samdb.transaction_cancel()
             raise CommandError("Failed to disable user '%s': %s" % (
                 username or search_filter, msg))
+        samdb.transaction_commit()
index 7cfb3459e3c6faaf2d13e9ad74fd1cc6b811f688..0545aed98ebac2fec13d2c1196181fe7af553d81 100644 (file)
@@ -35,6 +35,7 @@ from samba.common import normalise_int32
 from samba.common import get_bytes, cmp
 from samba.dcerpc import security
 from samba import is_ad_dc_built
+from samba import NTSTATUSError, ntstatus
 import binascii
 
 __docformat__ = "restructuredText"
@@ -365,13 +366,13 @@ lockoutTime: 0
 
         return filter
 
-    def add_remove_group_members(self, groupname, members,
+    def add_remove_group_members(self, group, members,
                                  add_members_operation=True,
                                  member_types=None,
                                  member_base_dn=None):
         """Adds or removes group members
 
-        :param groupname: Name of the target group
+        :param group: sAMAccountName, DN, SID or GUID of the target group
         :param members: list of group members
         :param add_members_operation: Defines if its an add or remove
             operation
@@ -385,26 +386,92 @@ lockoutTime: 0
         if member_base_dn is None:
             member_base_dn = self.domain_dn()
 
-        groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (
-            ldb.binary_encode(groupname), "CN=Group,CN=Schema,CN=Configuration", self.domain_dn())
+        partial_groupfilter = None
+
+        group_sid = None
+        try:
+            group_sid = security.dom_sid(group)
+        except ValueError:
+            pass
+        if group_sid is not None:
+            partial_groupfilter = "(objectClass=*)"
+
+        group_guid = None
+        if partial_groupfilter is None:
+            try:
+                group_guid = misc.GUID(group)
+            except NTSTATUSError as e:
+                (status, _) = e.args
+                if status != ntstatus.NT_STATUS_INVALID_PARAMETER:
+                    raise e
+            if group_guid is not None:
+                partial_groupfilter = "(objectClass=*)"
+
+        if partial_groupfilter is None:
+            group_dn = None
+            try:
+                if isinstance(group, ldb.Dn):
+                    group_dn = ldb.Dn(self, group.extended_str(1))
+                else:
+                    group_dn = ldb.Dn(self, str(group))
+            except ValueError:
+                pass
+            if group_dn is not None:
+                group_b_sid = group_dn.get_extended_component("SID")
+                group_b_guid = group_dn.get_extended_component("GUID")
+                if group_b_sid is not None:
+                    group_sid = ndr_unpack(security.dom_sid, group_b_sid)
+                    partial_groupfilter = "(objectClass=*)"
+                elif group_b_guid is not None:
+                    group_guid = ndr_unpack(misc.GUID, group_b_guid)
+                    partial_groupfilter = "(objectClass=*)"
+                else:
+                    search_base = str(group_dn)
+                    search_scope = ldb.SCOPE_BASE
+
+        if group_sid is not None:
+            search_base = '<SID=%s>' % group_sid
+            search_scope = ldb.SCOPE_BASE
+
+        if group_guid is not None:
+            search_base = '<GUID=%s>' % group_guid
+            search_scope = ldb.SCOPE_BASE
+
+        if partial_groupfilter is None:
+            search_base = self.domain_dn()
+            search_scope = ldb.SCOPE_SUBTREE
+            partial_groupfilter = "(sAMAccountName=%s)" % (
+                ldb.binary_encode(group))
+
+        groupfilter = "(&%s(objectCategory=%s,%s))" % (
+            partial_groupfilter,
+            "CN=Group,CN=Schema,CN=Configuration",
+            self.domain_dn())
 
         self.transaction_start()
         try:
-            targetgroup = self.search(base=self.domain_dn(),
-                                      scope=ldb.SCOPE_SUBTREE,
+            targetgroup = self.search(base=search_base,
+                                      scope=search_scope,
                                       expression=groupfilter,
                                       controls=["extended_dn:1:1"],
                                       attrs=['member'])
             if len(targetgroup) == 0:
-                raise Exception('Unable to find group "%s"' % groupname)
+                raise Exception('Unable to find group "%s"' % group)
             assert(len(targetgroup) == 1)
 
             modified = False
 
+            if group_sid is not None:
+                targetgroup_dn = '<SID=%s>' % group_sid
+            elif group_guid is not None:
+                targetgroup_dn = '<GUID=%s>' % group_guid
+            else:
+                targetgroup_dn = str(targetgroup[0].dn)
+
             addtargettogroup = """
 dn: %s
 changetype: modify
-""" % (str(targetgroup[0].dn))
+""" % (targetgroup_dn)
 
             for member in members:
                 targetmember_dn = None
index 290d5daebe182b629d01f3e3f1bd7eb07881d2dc..caef93407bf5425f21e03ce2f6ebfea98812fd8c 100644 (file)
@@ -1126,6 +1126,41 @@ sAMAccountName: %s
             self.assertCmdSuccess(result, out, err, "Error running user unlock")
             self.assertEqual(err, "", "Shouldn't be any error messages")
 
+    def test_disable_remove_supplemental_groups(self):
+        """disable user and remove supplemental groups"""
+        username = "userRemoveGroups"
+        user = self._randomUser({"name": username})
+        self._create_user(user)
+
+        usergroups = self._get_groups(username)
+        self.assertTrue(len(usergroups) == 1, "exactly one membership expected")
+        self.assertEqual(usergroups[0],
+                         "Domain Users",
+                         "Unexpected groupmembership")
+
+        self._add_groupmember("Domain Admins", username)
+        self._add_groupmember("Print Operators", username)
+
+        usergroups = self._get_groups(username)
+        self.assertTrue(len(usergroups) == 3, "exactly 3 memberships expected")
+
+        (result, out, err) = self.runsubcmd(
+            "user", "disable", username,
+            "--remove-supplemental-groups",
+            "-H", "ldap://%s" % os.environ["DC_SERVER"],
+            "-U%s%%%s" % (os.environ["DC_USERNAME"],
+            os.environ["DC_PASSWORD"]))
+        self.assertCmdSuccess(
+            result, out, err,
+            "Error running user disable --remove-supplemental-groups")
+        self.assertEqual(err, "",
+                         "Shouldn't be any error messages from user disable")
+
+        usergroups = self._get_groups(username)
+        self.assertTrue(len(usergroups) == 1, "exactly one membership expected")
+        self.assertEqual(usergroups[0], "Domain Users",
+                         "Unexpected groupmembership")
+
     def _randomUser(self, base=None):
         """create a user with random attribute values, you can specify base attributes"""
         if base is None:
@@ -1271,3 +1306,46 @@ template """
             return userlist[0]
         else:
             return None
+
+    def _add_groupmember(self, group, user):
+        (result, out, err) =  self.runsubcmd(
+            "group", "addmembers", group, user,
+            "-H", "ldap://%s" % os.environ["DC_SERVER"],
+            "-U%s%%%s" % (os.environ["DC_USERNAME"],
+                          os.environ["DC_PASSWORD"]))
+        self.assertCmdSuccess(
+            result, out, err, "Error running group addmembers")
+        self.assertEqual(
+            err,
+            "",
+            "Shouldn't be any error messages from group addmembers")
+
+        return out.rstrip().split("\n")
+
+    def _remove_groupmember(self, group, user):
+        (result, out, err) = self.runsubcmd(
+            "group", "removemembers", group, user,
+            "-H", "ldap://%s" % os.environ["DC_SERVER"],
+            "-U%s%%%s" % (os.environ["DC_USERNAME"],
+                          os.environ["DC_PASSWORD"]))
+        self.assertCmdSuccess(
+            result, out, err, "Error running group removemembers")
+        self.assertEqual(
+            err,
+            "",
+            "Shouldn't be any error messages from group removemembers")
+
+        return out.rstrip().split("\n")
+
+    def _get_groups(self, user):
+        (result, out, err) = self.runsubcmd(
+            "user", "getgroups", user,
+            "-H", "ldap://%s" % os.environ["DC_SERVER"],
+            "-U%s%%%s" % (os.environ["DC_USERNAME"],
+            os.environ["DC_PASSWORD"]))
+        self.assertCmdSuccess(result, out, err, "Error running user getgroups")
+        self.assertEqual(err,
+                         "",
+                         "Shouldn't be any error messages from user getgroups")
+
+        return out.rstrip().split("\n")