#
import samba.getopt as options
-from ldb import LdbError
from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationPolicy
from samba.netcmd.domain.models.auth_policy import MIN_TGT_LIFETIME,\
MAX_TGT_LIFETIME, StrongNTLMPolicy
+from samba.netcmd.domain.models.exceptions import ModelError
from samba.netcmd.validators import Range
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Authentication policies grouped by cn.
- policies = {policy.cn: policy.as_dict()
- for policy in AuthenticationPolicy.query(ldb)}
+ try:
+ policies = {policy.cn: policy.as_dict()
+ for policy in AuthenticationPolicy.query(ldb)}
+ except ModelError as e:
+ raise CommandError(e)
# Using json output format gives more detail.
if output_format == "json":
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ policy = AuthenticationPolicy.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication policy exists first.
- policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ policy = AuthenticationPolicy.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Make sure authentication policy doesn't already exist.
- policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is not None:
raise CommandError(f"Authentication policy {name} already exists.")
if protect:
policy.protect(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
# Authentication policy created successfully.
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ policy = AuthenticationPolicy.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication policy exists.
- policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
policy.protect(ldb)
elif unprotect:
policy.unprotect(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
# Authentication policy updated successfully.
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ policy = AuthenticationPolicy.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication policy exists first.
- policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
policy.unprotect(ldb)
policy.delete(ldb)
- except LdbError as e:
+ except ModelError as e:
if not force:
raise CommandError(
f"{e}\nTry --force to delete protected authentication policies.")
#
import samba.getopt as options
-from ldb import LdbError
from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationPolicy, AuthenticationSilo
+from samba.netcmd.domain.models.exceptions import ModelError
from .silo_member import cmd_domain_auth_silo_member
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Authentication silos grouped by cn.
- silos = {silo.cn: silo.as_dict()
- for silo in AuthenticationSilo.query(ldb)}
+ try:
+ silos = {silo.cn: silo.as_dict()
+ for silo in AuthenticationSilo.query(ldb)}
+ except ModelError as e:
+ raise CommandError(e)
# Using json output format gives more detail.
if output_format == "json":
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Make sure silo doesn't already exist.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is not None:
raise CommandError(f"Authentication silo {name} already exists.")
if protect:
silo.protect(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
# Authentication silo created successfully.
"""
try:
return AuthenticationPolicy.lookup(ldb, name)
- except (LookupError, ValueError) as e:
+ except (LookupError, ModelError, ValueError) as e:
raise CommandError(e)
def run(self, ldap_url=None, sambaopts=None, credopts=None, name=None,
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
silo.protect(ldb)
elif unprotect:
silo.unprotect(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
# Silo updated successfully.
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
silo.unprotect(ldb)
silo.delete(ldb)
- except LdbError as e:
+ except ModelError as e:
if not force:
raise CommandError(
f"{e}\nTry --force to delete protected authentication silos.")
#
import samba.getopt as options
-from ldb import Dn, LdbError
+from ldb import Dn
from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationSilo, User
+from samba.netcmd.domain.models.exceptions import ModelError
class cmd_domain_auth_silo_member_add(Command):
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Try a Dn first, then sAMAccountName.
try:
- user = User.get(ldb, dn=Dn(ldb, member))
+ user_query = {"dn": Dn(ldb, member)}
except ValueError:
- user = User.get(ldb, username=member)
+ user_query = {"username": member}
+
+ try:
+ user = User.get(ldb, **user_query)
+ except ModelError as e:
+ raise CommandError(e)
# Ensure the user actually exists first.
if user is None:
try:
silo.add_member(ldb, user)
user.save(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
self.outf.write(f"User '{user.name}' added to the {name} silo.\n")
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Fetch all members.
- members = [User.get(ldb, dn=dn) for dn in silo.members]
+ try:
+ members = [User.get(ldb, dn=dn) for dn in silo.members]
+ except ModelError as e:
+ raise CommandError(e)
# Using json output format gives more detail.
if output_format == "json":
ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ try:
+ silo = AuthenticationSilo.get(ldb, cn=name)
+ except ModelError as e:
+ raise CommandError(e)
+
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Try a Dn first, then sAMAccountName.
try:
- user = User.get(ldb, dn=Dn(ldb, member))
+ user_query = {"dn": Dn(ldb, member)}
except ValueError:
- user = User.get(ldb, username=member)
+ user_query = {"username": member}
+
+ try:
+ user = User.get(ldb, **user_query)
+ except ModelError as e:
+ raise CommandError(e)
# Ensure the user actually exists first.
if user is None:
try:
silo.remove_member(ldb, user)
user.save(ldb)
- except LdbError as e:
+ except ModelError as e:
raise CommandError(e)
self.outf.write(f"User '{user.name}' removed from the {name} silo.\n")
from optparse import OptionValueError
from unittest.mock import patch
-from ldb import LdbError
from samba.netcmd import CommandError
+from samba.netcmd.domain.models.exceptions import ModelError
from samba.samdb import SamDB
from samba.sd_utils import SDUtils
def test_authentication_policy_create_fails(self):
"""Test creating an authentication policy, but it fails."""
- # Raise LdbError when ldb.add() is called.
+ # Raise ModelError when ldb.add() is called.
with patch.object(SamDB, "add") as add_mock:
- add_mock.side_effect = LdbError("Custom error message")
+ add_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "policy", "create",
"--name", "createFails")
self.assertEqual(result, -1)
def test_authentication_policy_modify_fails(self):
"""Test modifying an authentication policy, but it fails."""
- # Raise LdbError when ldb.add() is called.
+ # Raise ModelError when ldb.add() is called.
with patch.object(SamDB, "modify") as modify_mock:
- modify_mock.side_effect = LdbError("Custom error message")
+ modify_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "policy", "modify",
"--name", "Single Policy",
"--description", "New description")
self.assertIsNotNone(policy)
# Try delete with --force.
- # Patch SDUtils.dacl_delete_aces with a Mock that raises LdbError.
+ # Patch SDUtils.dacl_delete_aces with a Mock that raises ModelError.
with patch.object(SDUtils, "dacl_delete_aces") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "policy", "delete",
"--name", "deleteForceFail",
"--force")
policy = self.get_authentication_policy("regularPolicy")
self.assertIsNotNone(policy)
- # Raise LdbError when ldb.delete() is called.
+ # Raise ModelError when ldb.delete() is called.
with patch.object(SamDB, "delete") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "policy", "delete",
"--name", "regularPolicy")
self.assertEqual(result, -1)
policy = self.get_authentication_policy("protectedPolicy")
self.assertIsNotNone(policy)
- # Raise LdbError when ldb.delete() is called.
+ # Raise ModelError when ldb.delete() is called.
with patch.object(SamDB, "delete") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "policy", "delete",
"--name", "protectedPolicy",
"--force")
from collections import defaultdict
from unittest.mock import patch
-from ldb import LdbError
+from samba.netcmd.domain.models.exceptions import ModelError
from samba.samdb import SamDB
from samba.sd_utils import SDUtils
def test_authentication_silo_create_fails(self):
"""Test creating an authentication silo, but it fails."""
- # Raise LdbError when ldb.add() is called.
+ # Raise ModelError when ldb.add() is called.
with patch.object(SamDB, "add") as add_mock:
- add_mock.side_effect = LdbError("Custom error message")
+ add_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "silo", "create",
"--name", "createFails",
"--policy", "Single Policy")
def test_authentication_silo_modify_fails(self):
"""Test modify authentication silo, but it fails."""
- # Raise LdbError when ldb.modify() is called.
+ # Raise ModelError when ldb.modify() is called.
with patch.object(SamDB, "modify") as add_mock:
- add_mock.side_effect = LdbError("Custom error message")
+ add_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "silo", "modify",
"--name", "developers",
"--description", "Devs")
self.assertIsNotNone(silo)
# Try delete with --force.
- # Patch SDUtils.dacl_delete_aces with a Mock that raises LdbError.
+ # Patch SDUtils.dacl_delete_aces with a Mock that raises ModelError.
with patch.object(SDUtils, "dacl_delete_aces") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "silo", "delete",
"--name", "deleteForceFail",
"--force")
silo = self.get_authentication_silo("regularSilo")
self.assertIsNotNone(silo)
- # Raise LdbError when ldb.delete() is called.
+ # Raise ModelError when ldb.delete() is called.
with patch.object(SamDB, "delete") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "silo", "delete",
"--name", "regularSilo")
self.assertEqual(result, -1)
silo = self.get_authentication_silo("protectedSilo")
self.assertIsNotNone(silo)
- # Raise LdbError when ldb.delete() is called.
+ # Raise ModelError when ldb.delete() is called.
with patch.object(SamDB, "delete") as delete_mock:
- delete_mock.side_effect = LdbError("Custom error message")
+ delete_mock.side_effect = ModelError("Custom error message")
result, out, err = self.runcmd("domain", "auth", "silo", "delete",
"--name", "protectedSilo",
"--force")