]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
netcmd: domain: silo member command tests
authorRob van der Linde <rob@catalyst.net.nz>
Tue, 6 Jun 2023 02:11:26 +0000 (14:11 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Sun, 25 Jun 2023 23:29:32 +0000 (23:29 +0000)
Signed-off-by: Rob van der Linde <rob@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Joseph Sutton <josephsutton@catalyst.net.nz>
python/samba/tests/samba_tool/domain_auth_base.py
python/samba/tests/samba_tool/domain_auth_silo.py

index 43795ddbd9ed05c9096e881cad17d2c1c0753222..66445e99c6a61a1087a182d42909f38ceffd1d33 100644 (file)
@@ -88,6 +88,23 @@ class BaseAuthCmdTest(SambaToolCmdTest):
         authn_policies_dn.add_child("CN=AuthN Policies")
         return authn_policies_dn
 
+    def get_users_dn(self):
+        """Returns Users DN."""
+        users_dn = self.samdb.get_root_basedn()
+        users_dn.add_child("CN=Users")
+        return users_dn
+
+    def get_user(self, username):
+        """Get a user by username."""
+        users_dn = self.get_users_dn()
+
+        result = self.samdb.search(base=users_dn,
+                                   scope=SCOPE_ONELEVEL,
+                                   expression=f"(sAMAccountName={username})")
+
+        if len(result) == 1:
+            return result[0]
+
     def _run(self, *argv):
         """Override _run, so we don't always have to pass host and creds."""
         args = list(argv)
index 96e617dfbe367ec1821adf21691689c20d0d6dac..47d0ea84fdedb10579aae62418a37d4c17730590 100644 (file)
@@ -21,6 +21,7 @@
 #
 
 import json
+from collections import defaultdict
 from unittest.mock import patch
 
 from ldb import LdbError
@@ -435,3 +436,132 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
 
             # When using --force we don't get the hint.
             self.assertNotIn("Try --force", err)
+
+
+class AuthSiloMemberCmdTestCase(BaseAuthCmdTest):
+
+    def setUp(self):
+        super().setUp()
+        self.members = defaultdict(list)
+
+        # Create an organisational unit to test in.
+        self.ou = self.samdb.get_default_basedn()
+        self.ou.add_child("OU=Domain Auth Tests")
+        self.samdb.create_ou(self.ou)
+
+        # Assign members to silos
+        self.add_silo_member("Developers", "bob")
+        self.add_silo_member("Developers", "jane")
+        self.add_silo_member("Managers", "alice")
+
+    def tearDown(self):
+        # Remove organisational unit.
+        self.samdb.delete(self.ou, ["tree_delete:1"])
+
+        # Remove members from silos before deleting them in super.
+        for silo, members in self.members.items():
+            for member in members:
+                self.remove_silo_member(silo, member)
+
+        super().tearDown()
+
+    def create_computer(self, name):
+        """Create a Computer and return the dn."""
+        dn = f"CN={name},{self.ou}"
+        self.samdb.newcomputer(name, self.ou)
+        return dn
+
+    def add_silo_member(self, silo, member):
+        """Add a member to an authentication silo."""
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "add",
+                                       "--name", silo, "--member", member)
+
+        self.assertIsNone(result, msg=err)
+        self.assertIn(f"User '{member}' added to the {silo} silo.", out)
+
+        # Ensure that tearDown cleans up the silo members.
+        self.members[silo].append(member)
+
+    def remove_silo_member(self, silo, member):
+        """Remove a member to an authentication silo."""
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "remove",
+                                       "--name", silo, "--member", member)
+
+        self.assertIsNone(result, msg=err)
+
+    def test_authentication_silo_member_list(self):
+        """Test listing authentication policy members in list format."""
+        alice = self.get_user("alice")
+        jane = self.get_user("jane")
+        bob = self.get_user("bob")
+
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "list",
+                                       "--name", "Developers")
+
+        self.assertIsNone(result, msg=err)
+        self.assertIn(str(bob.dn), out)
+        self.assertIn(str(jane.dn), out)
+        self.assertNotIn(str(alice.dn), out)
+
+    def test_authentication_silo_member_list_json(self):
+        """Test listing authentication policy members list in json format."""
+        alice = self.get_user("alice")
+        jane = self.get_user("jane")
+        bob = self.get_user("bob")
+
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "list",
+                                       "--name", "Developers", "--json")
+
+        self.assertIsNone(result, msg=err)
+        members = json.loads(out)
+        members_dn = [member["dn"] for member in members]
+        self.assertIn(str(bob.dn), members_dn)
+        self.assertIn(str(jane.dn), members_dn)
+        self.assertNotIn(str(alice.dn), members_dn)
+
+    def test_authentication_silo_member_list_name_missing(self):
+        """Test list authentication policy members without the name argument."""
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "list")
+
+        self.assertIsNotNone(result)
+        self.assertIn("Argument --name is required.", err)
+
+    def test_authentication_silo_member_add_user(self):
+        """Test adding a user to an authentication silo."""
+        self.add_silo_member("Developers", "joe")
+
+        # Check if member is in silo
+        user = self.get_user("joe")
+        silo = self.get_authentication_silo("Developers")
+        members = [str(member) for member in silo["msDS-AuthNPolicySiloMembers"]]
+        self.assertIn(str(user.dn), members)
+
+    def test_authentication_silo_member_add_computer(self):
+        """Test adding a computer to an authentication silo"""
+        name = "AUTH_SILO_CMP"
+        computer = self.create_computer(name)
+        silo = "Developers"
+
+        # Don't use self.add_silo_member as it will try to clean up the user.
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "add",
+                                       "--name", silo,
+                                       "--member", computer)
+
+        self.assertIsNone(result, msg=err)
+        self.assertIn(f"User '{name}' added to the {silo} silo.", out)
+
+    def test_authentication_silo_member_add_unknown_user(self):
+        """Test adding an unknown user to an authentication silo."""
+        result, out, err = self.runcmd("domain", "auth", "silo",
+                                       "member", "add",
+                                       "--name", "Developers",
+                                       "--member", "does_not_exist")
+
+        self.assertIsNotNone(result)
+        self.assertIn("User 'does_not_exist' not found.", err)