There is no point to the base class anymore.
And since the model layer has dramatically simplified the code in the
commands, ldb can just be a local variable.
Signed-off-by: Rob van der Linde <rob@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Joseph Sutton <josephsutton@catalyst.net.nz>
+++ /dev/null
-# Unix SMB/CIFS implementation.
-#
-# authentication silos - base class and common code
-#
-# Copyright (C) Catalyst.Net Ltd. 2023
-#
-# Written by Rob van der Linde <rob@catalyst.net.nz>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-
-from samba.netcmd import Command
-
-
-class SiloCommand(Command):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.ldb = None
import samba.getopt as options
from ldb import LdbError
-from samba.netcmd import CommandError, Option, SuperCommand
+from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationPolicy
from samba.netcmd.domain.models.auth_policy import MIN_TGT_LIFETIME,\
MAX_TGT_LIFETIME, StrongNTLMPolicy
from samba.netcmd.validators import Range
-from .base import SiloCommand
-
-class cmd_domain_auth_policy_list(SiloCommand):
+class cmd_domain_auth_policy_list(Command):
"""List authentication policies on the domain."""
synopsis = "%prog -H <URL> [options]"
def run(self, ldap_url=None, sambaopts=None, credopts=None,
output_format=None):
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Authentication policies grouped by cn.
policies = {policy.cn: policy.as_dict()
- for policy in AuthenticationPolicy.query(self.ldb)}
+ for policy in AuthenticationPolicy.query(ldb)}
# Using json output format gives more detail.
if output_format == "json":
self.outf.write(f"{policy}\n")
-class cmd_domain_auth_policy_view(SiloCommand):
+class cmd_domain_auth_policy_view(Command):
"""View an authentication policy on the domain."""
synopsis = "%prog -H <URL> [options]"
if not name:
raise CommandError("Argument --name is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication policy exists first.
- policy = AuthenticationPolicy.get(self.ldb, cn=name)
+ policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
self.print_json(policy.as_dict())
-class cmd_domain_auth_policy_create(SiloCommand):
+class cmd_domain_auth_policy_create(Command):
"""Create an authentication policy on the domain."""
synopsis = "%prog -H <URL> [options]"
if audit and enforce:
raise CommandError("--audit and --enforce cannot be used together.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Make sure authentication policy doesn't already exist.
- policy = AuthenticationPolicy.get(self.ldb, cn=name)
+ policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is not None:
raise CommandError(f"Authentication policy {name} already exists.")
# Create policy.
try:
- policy.save(self.ldb)
+ policy.save(ldb)
if protect:
- policy.protect(self.ldb)
+ policy.protect(ldb)
except LdbError as e:
raise CommandError(e)
self.outf.write(f"Created authentication policy: {name}\n")
-class cmd_domain_auth_policy_modify(SiloCommand):
+class cmd_domain_auth_policy_modify(Command):
"""Modify authentication policies on the domain."""
synopsis = "%prog -H <URL> [options]"
if audit and enforce:
raise CommandError("--audit and --enforce cannot be used together.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication policy exists.
- policy = AuthenticationPolicy.get(self.ldb, cn=name)
+ policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
# Update policy.
try:
- policy.save(self.ldb)
+ policy.save(ldb)
if protect:
- policy.protect(self.ldb)
+ policy.protect(ldb)
elif unprotect:
- policy.unprotect(self.ldb)
+ policy.unprotect(ldb)
except LdbError as e:
raise CommandError(e)
self.outf.write(f"Updated authentication policy: {name}\n")
-class cmd_domain_auth_policy_delete(SiloCommand):
+class cmd_domain_auth_policy_delete(Command):
"""Delete authentication policies on the domain."""
synopsis = "%prog -H <URL> [options]"
if not name:
raise CommandError("Argument --name is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication policy exists first.
- policy = AuthenticationPolicy.get(self.ldb, cn=name)
+ policy = AuthenticationPolicy.get(ldb, cn=name)
if policy is None:
raise CommandError(f"Authentication policy {name} not found.")
# Delete item, --force removes delete protection first.
try:
if force:
- policy.unprotect(self.ldb)
+ policy.unprotect(ldb)
- policy.delete(self.ldb)
+ policy.delete(ldb)
except LdbError as e:
if not force:
raise CommandError(
import samba.getopt as options
from ldb import LdbError
-from samba.netcmd import CommandError, Option, SuperCommand
+from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationPolicy, AuthenticationSilo
-from .base import SiloCommand
from .silo_member import cmd_domain_auth_silo_member
-class cmd_domain_auth_silo_list(SiloCommand):
+class cmd_domain_auth_silo_list(Command):
"""List authentication silos on the domain."""
synopsis = "%prog -H <URL> [options]"
def run(self, ldap_url=None, sambaopts=None, credopts=None,
output_format=None):
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Authentication silos grouped by cn.
silos = {silo.cn: silo.as_dict()
- for silo in AuthenticationSilo.query(self.ldb)}
+ for silo in AuthenticationSilo.query(ldb)}
# Using json output format gives more detail.
if output_format == "json":
self.outf.write(f"{silo}\n")
-class cmd_domain_auth_silo_view(SiloCommand):
+class cmd_domain_auth_silo_view(Command):
"""View an authentication silo on the domain."""
synopsis = "%prog -H <URL> [options]"
if not name:
raise CommandError("Argument --name is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
self.print_json(silo.as_dict())
-class cmd_domain_auth_silo_create(SiloCommand):
+class cmd_domain_auth_silo_create(Command):
"""Create a new authentication silo on the domain."""
synopsis = "%prog -H <URL> [options]"
service_policy = service_policy or policy
computer_policy = computer_policy or policy
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Make sure silo doesn't already exist.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is not None:
raise CommandError(f"Authentication silo {name} already exists.")
# Set user policy
if user_policy:
- silo.user_policy = self.get_policy(self.ldb, user_policy).dn
+ silo.user_policy = self.get_policy(ldb, user_policy).dn
# Set service policy
if service_policy:
- silo.service_policy = self.get_policy(self.ldb, service_policy).dn
+ silo.service_policy = self.get_policy(ldb, service_policy).dn
# Set computer policy
if computer_policy:
- silo.computer_policy = self.get_policy(self.ldb, computer_policy).dn
+ silo.computer_policy = self.get_policy(ldb, computer_policy).dn
# Either --enforce will be set or --audit but never both.
# The default if both are missing is enforce=True.
# Create silo
try:
- silo.save(self.ldb)
+ silo.save(ldb)
if protect:
- silo.protect(self.ldb)
+ silo.protect(ldb)
except LdbError as e:
raise CommandError(e)
self.outf.write(f"Created authentication silo: {name}\n")
-class cmd_domain_auth_silo_modify(SiloCommand):
+class cmd_domain_auth_silo_modify(Command):
"""Modify an authentication silo on the domain."""
synopsis = "%prog -H <URL> [options]"
service_policy = service_policy or policy
computer_policy = computer_policy or policy
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
if user_policy == "":
silo.user_policy = None
elif user_policy:
- silo.user_policy = self.get_policy(self.ldb, user_policy).dn
+ silo.user_policy = self.get_policy(ldb, user_policy).dn
# Set or unset service policy.
if service_policy == "":
silo.service_policy = None
elif service_policy:
- silo.service_policy = self.get_policy(self.ldb, service_policy).dn
+ silo.service_policy = self.get_policy(ldb, service_policy).dn
# Set or unset computer policy.
if computer_policy == "":
silo.computer_policy = None
elif computer_policy:
- silo.computer_policy = self.get_policy(self.ldb, computer_policy).dn
+ silo.computer_policy = self.get_policy(ldb, computer_policy).dn
# Update silo
try:
- silo.save(self.ldb)
+ silo.save(ldb)
if protect:
- silo.protect(self.ldb)
+ silo.protect(ldb)
elif unprotect:
- silo.unprotect(self.ldb)
+ silo.unprotect(ldb)
except LdbError as e:
raise CommandError(e)
self.outf.write(f"Updated authentication silo: {name}\n")
-class cmd_domain_auth_silo_delete(SiloCommand):
+class cmd_domain_auth_silo_delete(Command):
"""Delete an authentication silo on the domain."""
synopsis = "%prog -H <URL> [options]"
if not name:
raise CommandError("Argument --name is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Delete silo
try:
if force:
- silo.unprotect(self.ldb)
+ silo.unprotect(ldb)
- silo.delete(self.ldb)
+ silo.delete(ldb)
except LdbError as e:
if not force:
raise CommandError(
import samba.getopt as options
from ldb import Dn, LdbError
-from samba.netcmd import CommandError, Option, SuperCommand
+from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.netcmd.domain.models import AuthenticationSilo, User
-from .base import SiloCommand
-
-class cmd_domain_auth_silo_member_add(SiloCommand):
+class cmd_domain_auth_silo_member_add(Command):
"""Add a member to an authentication silo."""
synopsis = "%prog -H <URL> [options]"
if not member:
raise CommandError("Argument --member is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Try a Dn first, then sAMAccountName.
try:
- user = User.get(self.ldb, dn=Dn(self.ldb, member))
+ user = User.get(ldb, dn=Dn(ldb, member))
except ValueError:
- user = User.get(self.ldb, username=member)
+ user = User.get(ldb, username=member)
# Ensure the user actually exists first.
if user is None:
# Check if user isn't already assigned to another silo.
if user.assigned_silo:
- assigned_silo = AuthenticationSilo.get(self.ldb,
- dn=user.assigned_silo)
+ assigned_silo = AuthenticationSilo.get(ldb, dn=user.assigned_silo)
raise CommandError(
f"Member '{member}' is already in the {assigned_silo} silo.")
user.assigned_silo = silo.dn
try:
- silo.save(self.ldb)
- user.save(self.ldb)
+ silo.save(ldb)
+ user.save(ldb)
except LdbError as e:
raise CommandError(e)
self.outf.write(f"User '{user.name}' added to the {name} silo.\n")
-class cmd_domain_auth_silo_member_list(SiloCommand):
+class cmd_domain_auth_silo_member_list(Command):
"""List all members in the authentication silo."""
synopsis = "%prog -H <URL> [options]"
if not name:
raise CommandError("Argument --name is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Fetch all members.
- members = [User.get(self.ldb, dn=dn) for dn in silo.members]
+ members = [User.get(ldb, dn=dn) for dn in silo.members]
# Using json output format gives more detail.
if output_format == "json":
self.outf.write(f"{member.dn}\n")
-class cmd_domain_auth_silo_member_remove(SiloCommand):
+class cmd_domain_auth_silo_member_remove(Command):
"""Remove a member from an authentication silo."""
synopsis = "%prog -H <URL> [options]"
if not member:
raise CommandError("Argument --member is required.")
- self.ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
+ ldb = self.ldb_connect(ldap_url, sambaopts, credopts)
# Check if authentication silo exists first.
- silo = AuthenticationSilo.get(self.ldb, cn=name)
+ silo = AuthenticationSilo.get(ldb, cn=name)
if silo is None:
raise CommandError(f"Authentication silo {name} not found.")
# Try a Dn first, then sAMAccountName.
try:
- user = User.get(self.ldb, dn=Dn(self.ldb, member))
+ user = User.get(ldb, dn=Dn(ldb, member))
except ValueError:
- user = User.get(self.ldb, username=member)
+ user = User.get(ldb, username=member)
# Ensure the user actually exists first.
if user is None:
raise CommandError(f"User '{member}' is not in the {name} silo.")
try:
- silo.save(self.ldb)
- user.save(self.ldb)
+ silo.save(ldb)
+ user.save(ldb)
except LdbError as e:
raise CommandError(e)