]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
netcmd: domain: add error handling to domain auth commands
authorRob van der Linde <rob@catalyst.net.nz>
Fri, 23 Jun 2023 00:26:38 +0000 (12:26 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Sun, 25 Jun 2023 23:29:32 +0000 (23:29 +0000)
Where we wre catching LdbError before we now catch ModelError, all
exceptions that are known and handled in the model layer will have a
user-friendly error message.

Signed-off-by: Rob van der Linde <rob@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Joseph Sutton <josephsutton@catalyst.net.nz>
python/samba/netcmd/domain/auth/policy.py
python/samba/netcmd/domain/auth/silo.py
python/samba/netcmd/domain/auth/silo_member.py
python/samba/tests/samba_tool/domain_auth_policy.py
python/samba/tests/samba_tool/domain_auth_silo.py

index ccf4139a3e720c0adb4922565527a73dd6444c50..07b21bdf81dc017d0f87443b2daa8d09e9879aa7 100644 (file)
 #
 
 import samba.getopt as options
-from ldb import LdbError
 from samba.netcmd import Command, CommandError, Option, SuperCommand
 from samba.netcmd.domain.models import AuthenticationPolicy
 from samba.netcmd.domain.models.auth_policy import MIN_TGT_LIFETIME,\
     MAX_TGT_LIFETIME, StrongNTLMPolicy
+from samba.netcmd.domain.models.exceptions import ModelError
 from samba.netcmd.validators import Range
 
 
@@ -52,8 +52,11 @@ class cmd_domain_auth_policy_list(Command):
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
         # Authentication policies grouped by cn.
-        policies = {policy.cn: policy.as_dict()
-                    for policy in AuthenticationPolicy.query(ldb)}
+        try:
+            policies = {policy.cn: policy.as_dict()
+                        for policy in AuthenticationPolicy.query(ldb)}
+        except ModelError as e:
+            raise CommandError(e)
 
         # Using json output format gives more detail.
         if output_format == "json":
@@ -88,8 +91,12 @@ class cmd_domain_auth_policy_view(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            policy = AuthenticationPolicy.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication policy exists first.
-        policy = AuthenticationPolicy.get(ldb, cn=name)
         if policy is None:
             raise CommandError(f"Authentication policy {name} not found.")
 
@@ -171,8 +178,12 @@ class cmd_domain_auth_policy_create(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            policy = AuthenticationPolicy.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Make sure authentication policy doesn't already exist.
-        policy = AuthenticationPolicy.get(ldb, cn=name)
         if policy is not None:
             raise CommandError(f"Authentication policy {name} already exists.")
 
@@ -201,7 +212,7 @@ class cmd_domain_auth_policy_create(Command):
 
             if protect:
                 policy.protect(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         # Authentication policy created successfully.
@@ -281,8 +292,12 @@ class cmd_domain_auth_policy_modify(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            policy = AuthenticationPolicy.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication policy exists.
-        policy = AuthenticationPolicy.get(ldb, cn=name)
         if policy is None:
             raise CommandError(f"Authentication policy {name} not found.")
 
@@ -326,7 +341,7 @@ class cmd_domain_auth_policy_modify(Command):
                 policy.protect(ldb)
             elif unprotect:
                 policy.unprotect(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         # Authentication policy updated successfully.
@@ -360,8 +375,12 @@ class cmd_domain_auth_policy_delete(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            policy = AuthenticationPolicy.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication policy exists first.
-        policy = AuthenticationPolicy.get(ldb, cn=name)
         if policy is None:
             raise CommandError(f"Authentication policy {name} not found.")
 
@@ -371,7 +390,7 @@ class cmd_domain_auth_policy_delete(Command):
                 policy.unprotect(ldb)
 
             policy.delete(ldb)
-        except LdbError as e:
+        except ModelError as e:
             if not force:
                 raise CommandError(
                     f"{e}\nTry --force to delete protected authentication policies.")
index a5f76292042b625d3738bd3189e9e1fa8407f863..194a5b4f487bf1c01580338efba70e15f5fb26bc 100644 (file)
@@ -21,9 +21,9 @@
 #
 
 import samba.getopt as options
-from ldb import LdbError
 from samba.netcmd import Command, CommandError, Option, SuperCommand
 from samba.netcmd.domain.models import AuthenticationPolicy, AuthenticationSilo
+from samba.netcmd.domain.models.exceptions import ModelError
 
 from .silo_member import cmd_domain_auth_silo_member
 
@@ -51,8 +51,11 @@ class cmd_domain_auth_silo_list(Command):
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
         # Authentication silos grouped by cn.
-        silos = {silo.cn: silo.as_dict()
-                 for silo in AuthenticationSilo.query(ldb)}
+        try:
+            silos = {silo.cn: silo.as_dict()
+                     for silo in AuthenticationSilo.query(ldb)}
+        except ModelError as e:
+            raise CommandError(e)
 
         # Using json output format gives more detail.
         if output_format == "json":
@@ -87,8 +90,12 @@ class cmd_domain_auth_silo_view(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
@@ -173,8 +180,12 @@ class cmd_domain_auth_silo_create(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Make sure silo doesn't already exist.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is not None:
             raise CommandError(f"Authentication silo {name} already exists.")
 
@@ -206,7 +217,7 @@ class cmd_domain_auth_silo_create(Command):
 
             if protect:
                 silo.protect(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         # Authentication silo created successfully.
@@ -266,7 +277,7 @@ class cmd_domain_auth_silo_modify(Command):
         """
         try:
             return AuthenticationPolicy.lookup(ldb, name)
-        except (LookupError, ValueError) as e:
+        except (LookupError, ModelError, ValueError) as e:
             raise CommandError(e)
 
     def run(self, ldap_url=None, sambaopts=None, credopts=None, name=None,
@@ -290,8 +301,12 @@ class cmd_domain_auth_silo_modify(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
@@ -331,7 +346,7 @@ class cmd_domain_auth_silo_modify(Command):
                 silo.protect(ldb)
             elif unprotect:
                 silo.unprotect(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         # Silo updated successfully.
@@ -365,8 +380,12 @@ class cmd_domain_auth_silo_delete(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
@@ -376,7 +395,7 @@ class cmd_domain_auth_silo_delete(Command):
                 silo.unprotect(ldb)
 
             silo.delete(ldb)
-        except LdbError as e:
+        except ModelError as e:
             if not force:
                 raise CommandError(
                     f"{e}\nTry --force to delete protected authentication silos.")
index 981904b948f2c9fbbb0646daedc1e98d01f743db..e637f43d18e67de3e85ecbc2f968ea1edb199913 100644 (file)
 #
 
 import samba.getopt as options
-from ldb import Dn, LdbError
+from ldb import Dn
 from samba.netcmd import Command, CommandError, Option, SuperCommand
 from samba.netcmd.domain.models import AuthenticationSilo, User
+from samba.netcmd.domain.models.exceptions import ModelError
 
 
 class cmd_domain_auth_silo_member_add(Command):
@@ -57,16 +58,25 @@ class cmd_domain_auth_silo_member_add(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
         # Try a Dn first, then sAMAccountName.
         try:
-            user = User.get(ldb, dn=Dn(ldb, member))
+            user_query = {"dn": Dn(ldb, member)}
         except ValueError:
-            user = User.get(ldb, username=member)
+            user_query = {"username": member}
+
+        try:
+            user = User.get(ldb, **user_query)
+        except ModelError as e:
+            raise CommandError(e)
 
         # Ensure the user actually exists first.
         if user is None:
@@ -79,7 +89,7 @@ class cmd_domain_auth_silo_member_add(Command):
         try:
             silo.add_member(ldb, user)
             user.save(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         self.outf.write(f"User '{user.name}' added to the {name} silo.\n")
@@ -113,13 +123,20 @@ class cmd_domain_auth_silo_member_list(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
         # Fetch all members.
-        members = [User.get(ldb, dn=dn) for dn in silo.members]
+        try:
+            members = [User.get(ldb, dn=dn) for dn in silo.members]
+        except ModelError as e:
+            raise CommandError(e)
 
         # Using json output format gives more detail.
         if output_format == "json":
@@ -160,16 +177,25 @@ class cmd_domain_auth_silo_member_remove(Command):
 
         ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
 
+        try:
+            silo = AuthenticationSilo.get(ldb, cn=name)
+        except ModelError as e:
+            raise CommandError(e)
+
         # Check if authentication silo exists first.
-        silo = AuthenticationSilo.get(ldb, cn=name)
         if silo is None:
             raise CommandError(f"Authentication silo {name} not found.")
 
         # Try a Dn first, then sAMAccountName.
         try:
-            user = User.get(ldb, dn=Dn(ldb, member))
+            user_query = {"dn": Dn(ldb, member)}
         except ValueError:
-            user = User.get(ldb, username=member)
+            user_query = {"username": member}
+
+        try:
+            user = User.get(ldb, **user_query)
+        except ModelError as e:
+            raise CommandError(e)
 
         # Ensure the user actually exists first.
         if user is None:
@@ -182,7 +208,7 @@ class cmd_domain_auth_silo_member_remove(Command):
         try:
             silo.remove_member(ldb, user)
             user.save(ldb)
-        except LdbError as e:
+        except ModelError as e:
             raise CommandError(e)
 
         self.outf.write(f"User '{user.name}' removed from the {name} silo.\n")
index d73af183f0b4623a61aee77876be225da97207d5..acd62804cf1a0b1393f5d098b53f260c09f429f8 100644 (file)
@@ -24,8 +24,8 @@ import json
 from optparse import OptionValueError
 from unittest.mock import patch
 
-from ldb import LdbError
 from samba.netcmd import CommandError
+from samba.netcmd.domain.models.exceptions import ModelError
 from samba.samdb import SamDB
 from samba.sd_utils import SDUtils
 
@@ -262,9 +262,9 @@ class AuthPolicyCmdTestCase(BaseAuthCmdTest):
 
     def test_authentication_policy_create_fails(self):
         """Test creating an authentication policy, but it fails."""
-        # Raise LdbError when ldb.add() is called.
+        # Raise ModelError when ldb.add() is called.
         with patch.object(SamDB, "add") as add_mock:
-            add_mock.side_effect = LdbError("Custom error message")
+            add_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "policy", "create",
                                            "--name", "createFails")
             self.assertEqual(result, -1)
@@ -476,9 +476,9 @@ class AuthPolicyCmdTestCase(BaseAuthCmdTest):
 
     def test_authentication_policy_modify_fails(self):
         """Test modifying an authentication policy, but it fails."""
-        # Raise LdbError when ldb.add() is called.
+        # Raise ModelError when ldb.add() is called.
         with patch.object(SamDB, "modify") as modify_mock:
-            modify_mock.side_effect = LdbError("Custom error message")
+            modify_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "policy", "modify",
                                            "--name", "Single Policy",
                                            "--description", "New description")
@@ -555,9 +555,9 @@ class AuthPolicyCmdTestCase(BaseAuthCmdTest):
         self.assertIsNotNone(policy)
 
         # Try delete with --force.
-        # Patch SDUtils.dacl_delete_aces with a Mock that raises LdbError.
+        # Patch SDUtils.dacl_delete_aces with a Mock that raises ModelError.
         with patch.object(SDUtils, "dacl_delete_aces") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "policy", "delete",
                                            "--name", "deleteForceFail",
                                            "--force")
@@ -573,9 +573,9 @@ class AuthPolicyCmdTestCase(BaseAuthCmdTest):
         policy = self.get_authentication_policy("regularPolicy")
         self.assertIsNotNone(policy)
 
-        # Raise LdbError when ldb.delete() is called.
+        # Raise ModelError when ldb.delete() is called.
         with patch.object(SamDB, "delete") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "policy", "delete",
                                            "--name", "regularPolicy")
             self.assertEqual(result, -1)
@@ -594,9 +594,9 @@ class AuthPolicyCmdTestCase(BaseAuthCmdTest):
         policy = self.get_authentication_policy("protectedPolicy")
         self.assertIsNotNone(policy)
 
-        # Raise LdbError when ldb.delete() is called.
+        # Raise ModelError when ldb.delete() is called.
         with patch.object(SamDB, "delete") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "policy", "delete",
                                            "--name", "protectedPolicy",
                                            "--force")
index 47d0ea84fdedb10579aae62418a37d4c17730590..b9f008f779e49936a15ecbfe07792292f956f5dc 100644 (file)
@@ -24,7 +24,7 @@ import json
 from collections import defaultdict
 from unittest.mock import patch
 
-from ldb import LdbError
+from samba.netcmd.domain.models.exceptions import ModelError
 from samba.samdb import SamDB
 from samba.sd_utils import SDUtils
 
@@ -203,9 +203,9 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
 
     def test_authentication_silo_create_fails(self):
         """Test creating an authentication silo, but it fails."""
-        # Raise LdbError when ldb.add() is called.
+        # Raise ModelError when ldb.add() is called.
         with patch.object(SamDB, "add") as add_mock:
-            add_mock.side_effect = LdbError("Custom error message")
+            add_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "silo", "create",
                                            "--name", "createFails",
                                            "--policy", "Single Policy")
@@ -302,9 +302,9 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
 
     def test_authentication_silo_modify_fails(self):
         """Test modify authentication silo, but it fails."""
-        # Raise LdbError when ldb.modify() is called.
+        # Raise ModelError when ldb.modify() is called.
         with patch.object(SamDB, "modify") as add_mock:
-            add_mock.side_effect = LdbError("Custom error message")
+            add_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "silo", "modify",
                                            "--name", "developers",
                                            "--description", "Devs")
@@ -384,9 +384,9 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
         self.assertIsNotNone(silo)
 
         # Try delete with --force.
-        # Patch SDUtils.dacl_delete_aces with a Mock that raises LdbError.
+        # Patch SDUtils.dacl_delete_aces with a Mock that raises ModelError.
         with patch.object(SDUtils, "dacl_delete_aces") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "silo", "delete",
                                            "--name", "deleteForceFail",
                                            "--force")
@@ -403,9 +403,9 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
         silo = self.get_authentication_silo("regularSilo")
         self.assertIsNotNone(silo)
 
-        # Raise LdbError when ldb.delete() is called.
+        # Raise ModelError when ldb.delete() is called.
         with patch.object(SamDB, "delete") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "silo", "delete",
                                            "--name", "regularSilo")
             self.assertEqual(result, -1)
@@ -425,9 +425,9 @@ class AuthSiloCmdTestCase(BaseAuthCmdTest):
         silo = self.get_authentication_silo("protectedSilo")
         self.assertIsNotNone(silo)
 
-        # Raise LdbError when ldb.delete() is called.
+        # Raise ModelError when ldb.delete() is called.
         with patch.object(SamDB, "delete") as delete_mock:
-            delete_mock.side_effect = LdbError("Custom error message")
+            delete_mock.side_effect = ModelError("Custom error message")
             result, out, err = self.runcmd("domain", "auth", "silo", "delete",
                                            "--name", "protectedSilo",
                                            "--force")