This argument is actually an instance of SamDB (which inherits from Ldb).
This should have been called samdb.
Signed-off-by: Rob van der Linde <rob@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
"msDS-ComputerAllowedToAuthenticateTo")
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the AuthenticationPolicy model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child(
"CN=AuthN Policies,CN=AuthN Policy Configuration,CN=Services")
return base_dn
return "msDS-AuthNPolicy"
@staticmethod
- def find(ldb, name):
+ def find(samdb, name):
"""Helper function to return auth policy or raise NotFound.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param name: Either DN or name of Authentication Policy
:raises: NotFound if not found
:raises: ValueError if name is not set
try:
# It's possible name is already a Dn.
- dn = name if isinstance(name, Dn) else Dn(ldb, name)
- policy = AuthenticationPolicy.get(ldb, dn=dn)
+ dn = name if isinstance(name, Dn) else Dn(samdb, name)
+ policy = AuthenticationPolicy.get(samdb, dn=dn)
except ValueError:
- policy = AuthenticationPolicy.get(ldb, cn=name)
+ policy = AuthenticationPolicy.get(samdb, cn=name)
if policy is None:
raise LookupError(f"Authentication policy {name} not found.")
members = DnField("msDS-AuthNPolicySiloMembers", many=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the AuthenticationSilo model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child(
"CN=AuthN Silos,CN=AuthN Policy Configuration,CN=Services")
return base_dn
def get_object_class():
return "msDS-AuthNPolicySilo"
- def grant(self, ldb, member):
+ def grant(self, samdb, member):
"""Grant a member access to the Authentication Silo.
Rather than saving the silo object and writing the entire member
list out again, just add one member only.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param member: Member to grant access to silo
"""
# Create a message with only an add member operation.
# Update authentication silo.
try:
- ldb.modify(message)
+ samdb.modify(message)
except LdbError as e:
raise GrantMemberError(f"Failed to grant access to silo member: {e}")
# If the modify operation was successful refresh members field.
- self.refresh(ldb, fields=["members"])
+ self.refresh(samdb, fields=["members"])
- def revoke(self, ldb, member):
+ def revoke(self, samdb, member):
"""Revoke a member from the Authentication Silo.
Rather than saving the silo object and writing the entire member
list out again, just remove one member only.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param member: Member to revoke from silo
"""
# Create a message with only a remove member operation.
# Update authentication silo.
try:
- ldb.modify(message)
+ samdb.modify(message)
except LdbError as e:
raise RevokeMemberError(f"Failed to revoke silo member: {e}")
# If the modify operation was successful refresh members field.
- self.refresh(ldb, fields=["members"])
+ self.refresh(samdb, fields=["members"])
def get_authentication_sddl(self):
return ('O:SYG:SYD:(XA;OICI;CR;;;WD;(@USER.ad://ext/'
return str(self.display_name)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the ClaimType model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child("CN=Claim Types,CN=Claims Configuration,CN=Services")
return base_dn
return "msDS-ClaimType"
@staticmethod
- def new_claim_type(ldb, attribute, applies_to, display_name=None,
+ def new_claim_type(samdb, attribute, applies_to, display_name=None,
description=None, enabled=True):
"""Creates a ClaimType but does not save the instance.
- :param ldb: SamDB database connection
+ :param samdb: SamDB database connection
:param attribute: AttributeSchema object to use for creating ClaimType
:param applies_to: List of ClassSchema objects ClaimType applies to
:param display_name: Optional display name to use or use attribute name
:param enabled: Create an enabled or disabled claim type (default True)
:raises NotFound: if the ValueType for this attribute doesn't exist
"""
- value_type = ValueType.find(ldb, attribute)
+ value_type = ValueType.find(samdb, attribute)
# Generate the new Claim Type cn.
# Windows creates a random number here containing 16 hex digits.
super().__init__(**kwargs)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return base Dn for Computers.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for searching
"""
- return ldb.get_wellknown_dn(ldb.get_default_basedn(),
- DS_GUID_COMPUTERS_CONTAINER)
+ return samdb.get_wellknown_dn(samdb.get_default_basedn(),
+ DS_GUID_COMPUTERS_CONTAINER)
@staticmethod
def get_object_class():
return "computer"
@classmethod
- def find(cls, ldb, name):
+ def find(cls, samdb, name):
"""Helper function to find a computer, first by Dn then sAMAccountName.
If the Dn can't be parsed use sAMAccountName, automatically add the $.
"""
try:
- query = {"dn": Dn(ldb, name)}
+ query = {"dn": Dn(samdb, name)}
except ValueError:
if name.endswith("$"):
query = {"account_name": name}
else:
query = {"account_name": name + "$"}
- return cls.get(ldb, **query)
+ return cls.get(samdb, **query)
readonly=False):
"""Creates a new field, should be subclassed.
- :param name: Ldb field name.
+ :param name: SamDB field name.
:param many: If true always convert field to a list when loaded.
:param default: Default value or callback method (obj is first argument)
:param hidden: If this is True, exclude the field when calling as_dict()
self.default = default
@abstractmethod
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Converts value read from the database to Python value.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param value: MessageElement value from the database
:returns: Parsed value as Python type
"""
pass
@abstractmethod
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Converts value to database value.
This should return a MessageElement or None, where None means
the field will be unset on the next save.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param value: Input value from Python field
:param flags: MessageElement flags
:returns: MessageElement or None
class IntegerField(Field):
"""A simple integer field, can be an int or list of int."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to int or list of int."""
if value is None:
return
else:
return int(value[0])
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert int or list of int to MessageElement."""
if value is None:
return
This tends to be quite easy because a MessageElement already uses bytes.
"""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to bytes or list of bytes.
The values on the MessageElement should already be bytes so the
else:
return bytes(value[0])
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert bytes or list of bytes to MessageElement."""
if value is None:
return
class StringField(Field):
"""A simple string field, may contain str or list of str."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to str or list of str."""
if value is None:
return
else:
return str(value)
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert str or list of str to MessageElement."""
if value is None:
return
else:
return self.enum(str(value))
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to enum or list of enum."""
if value is None:
return
else:
return self.enum_from_value(value)
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert enum or list of enum to MessageElement."""
if value is None:
return
class DateTimeField(Field):
"""A field for parsing ldb timestamps into Python datetime."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to datetime or list of datetime."""
if value is None:
return
return datetime.fromtimestamp(string_to_time(str(value)),
tz=timezone.utc)
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert datetime or list of datetime to MessageElement."""
if value is None:
return
class NtTimeField(Field):
"""18-digit Active Directory timestamps."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to datetime or list of datetime."""
if value is None:
return
else:
return datetime_from_nt_time(int(value[0]))
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert datetime or list of datetime to MessageElement."""
if value is None:
return
self.model = model
super().__init__(name, many, default)
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert Message element to related object or list of objects.
Note that fetching related items is not using any sort of lazy
if value is None:
return
elif len(value) > 1 or self.many:
- return [self.model.get(ldb, dn=Dn(ldb, str(item))) for item in value]
+ return [self.model.get(samdb, dn=Dn(samdb, str(item))) for item in value]
else:
- return self.model.get(ldb, dn=Dn(ldb, str(value)))
+ return self.model.get(samdb, dn=Dn(samdb, str(value)))
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert related object or list of objects to MessageElement."""
if value is None:
return
class DnField(Field):
"""A Dn field parses the current field into a Dn object."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement to a Dn object or list of Dn objects."""
if value is None:
return
elif isinstance(value, Dn):
return value
elif len(value) > 1 or self.many:
- return [Dn(ldb, str(item)) for item in value]
+ return [Dn(samdb, str(item)) for item in value]
else:
- return Dn(ldb, str(value))
+ return Dn(samdb, str(value))
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert Dn object or list of Dn objects into a MessageElement."""
if value is None:
return
class GUIDField(Field):
"""A GUID field decodes fields containing binary GUIDs."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement with a GUID into a str or list of str."""
if value is None:
return
else:
return str(ndr_unpack(GUID, value[0]))
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert str with GUID into MessageElement."""
if value is None:
return
class SIDField(Field):
"""A SID field encodes and decodes SID data."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement with a GUID into a str or list of str."""
if value is None:
return
else:
return str(ndr_unpack(security.dom_sid, value[0]))
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert str with GUID into MessageElement."""
if value is None:
return
self.allow_device_in_sddl = allow_device_in_sddl
super().__init__(name, many=many, default=default, hidden=hidden)
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
if value is None:
return
elif len(value) > 1 or self.many:
else:
return ndr_unpack(security.descriptor, value[0])
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
if value is None:
return
elif isinstance(value, list):
return MessageElement(
- [self.to_db_value(ldb, item, flags)[0] for item in value],
+ [self.to_db_value(samdb, item, flags)[0] for item in value],
flags,
self.name)
else:
- domain_sid = security.dom_sid(ldb.get_domain_sid())
+ domain_sid = security.dom_sid(samdb.get_domain_sid())
# If this is a SDDL string convert it to a descriptor.
if isinstance(value, str):
class BooleanField(Field):
"""A simple boolean field, can be a bool or list of bool."""
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Convert MessageElement into a bool or list of bool."""
if value is None:
return
else:
return str(value) == "TRUE"
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert bool or list of bool into a MessageElement."""
if value is None:
return
"": "http://schemas.microsoft.com/2010/08/ActiveDirectory/PossibleValues"
}
- def from_db_value(self, ldb, value):
+ def from_db_value(self, samdb, value):
"""Parse MessageElement with XML to list of dicts."""
if value is not None:
root = ElementTree.fromstring(str(value))
return values
- def to_db_value(self, ldb, value, flags):
+ def to_db_value(self, samdb, value, flags):
"""Convert list of dicts back to XML as a MessageElement."""
if value is None:
return
SupportedEncryptionTypes)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return base Dn for Managed Service Accounts.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for searching
"""
- return ldb.get_wellknown_dn(ldb.get_default_basedn(),
- DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER)
+ return samdb.get_wellknown_dn(samdb.get_default_basedn(),
+ DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER)
@staticmethod
def get_object_class():
return self.as_dict(**kwargs)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the container of this model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for new objects
"""
- return ldb.get_default_basedn()
+ return samdb.get_default_basedn()
@classmethod
- def get_search_dn(cls, ldb):
+ def get_search_dn(cls, samdb):
"""Return the DN used for querying.
By default, this just calls get_base_dn, but it is possible to
return a different Dn for querying.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for searching
"""
- return cls.get_base_dn(ldb)
+ return cls.get_base_dn(samdb)
@staticmethod
def get_object_class():
return "top"
@classmethod
- def _from_message(cls, ldb, message):
+ def _from_message(cls, samdb, message):
"""Create a new model instance from the Ldb Message object.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param message: Ldb Message object to create instance from
"""
obj = cls()
- obj._apply(ldb, message)
+ obj._apply(samdb, message)
return obj
- def _apply(self, ldb, message):
+ def _apply(self, samdb, message):
"""Internal method to apply Ldb Message to current object.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param message: Ldb Message object to apply
"""
# Store the ldb Message so that in save we can see what changed.
for attr, field in self.fields.items():
if field.name in message:
- setattr(self, attr, field.from_db_value(ldb, message[field.name]))
+ setattr(self, attr, field.from_db_value(samdb, message[field.name]))
- def refresh(self, ldb, fields=None):
+ def refresh(self, samdb, fields=None):
"""Refresh object from database.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param fields: Optional list of field names to refresh
"""
attrs = [self.fields[f].name for f in fields] if fields else None
# This shouldn't normally happen but in case the object refresh fails.
try:
- res = ldb.search(self.dn, scope=SCOPE_BASE, attrs=attrs)
+ res = samdb.search(self.dn, scope=SCOPE_BASE, attrs=attrs)
except LdbError as e:
if e.args[0] == ERR_NO_SUCH_OBJECT:
raise NotFound(f"Refresh failed, object gone: {self.dn}")
raise
- self._apply(ldb, res[0])
+ self._apply(samdb, res[0])
def as_dict(self, include_hidden=False, **kwargs):
"""Returns a dict representation of the model.
return expression
@classmethod
- def query(cls, ldb, polymorphic=False, base_dn=None, scope=SCOPE_SUBTREE,
+ def query(cls, samdb, polymorphic=False, base_dn=None, scope=SCOPE_SUBTREE,
**kwargs):
"""Returns a search query for this model.
By default, polymorphic querying is disabled, and querying User
will only return User instances.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param polymorphic: If true enables polymorphic querying (see note)
:param base_dn: Optional provide base dn for searching or use the model
:param scope: Ldb search scope (default SCOPE_SUBTREE)
:param kwargs: Search criteria as keyword args
"""
if base_dn is None:
- base_dn = cls.get_search_dn(ldb)
+ base_dn = cls.get_search_dn(samdb)
# If the container does not exist produce a friendly error message.
try:
- result = ldb.search(base_dn,
- scope=scope,
- expression=cls.build_expression(**kwargs))
+ result = samdb.search(base_dn,
+ scope=scope,
+ expression=cls.build_expression(**kwargs))
except LdbError as e:
if e.args[0] == ERR_NO_SUCH_OBJECT:
raise NotFound(f"Container does not exist: {base_dn}")
raise
- return Query(cls, ldb, result, polymorphic)
+ return Query(cls, samdb, result, polymorphic)
@classmethod
- def get(cls, ldb, **kwargs):
+ def get(cls, samdb, **kwargs):
"""Get one object, must always return one item.
Either find object by dn=, or any combination of attributes via kwargs.
If there are more than one result, MultipleObjectsReturned is raised.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param kwargs: Search criteria as keyword args
:returns: Model instance or None if not found
:raises: MultipleObjects returned if there are more than one results
# Handle LDAP error 32 LDAP_NO_SUCH_OBJECT, but raise for the rest.
# Return None if the User does not exist.
try:
- res = ldb.search(dn, scope=SCOPE_BASE)
+ res = samdb.search(dn, scope=SCOPE_BASE)
except LdbError as e:
if e.args[0] == ERR_NO_SUCH_OBJECT:
return None
else:
raise
- return cls._from_message(ldb, res[0])
+ return cls._from_message(samdb, res[0])
else:
- return cls.query(ldb, **kwargs).get()
+ return cls.query(samdb, **kwargs).get()
@classmethod
- def create(cls, ldb, **kwargs):
+ def create(cls, samdb, **kwargs):
"""Create object constructs object and calls save straight after.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param kwargs: Fields to populate object from
:returns: object
"""
obj = cls(**kwargs)
- obj.save(ldb)
+ obj.save(samdb)
return obj
@classmethod
- def get_or_create(cls, ldb, defaults=None, **kwargs):
+ def get_or_create(cls, samdb, defaults=None, **kwargs):
"""Retrieve object and if it doesn't exist create a new instance.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param defaults: Attributes only used for create but not search
:param kwargs: Attributes used for searching existing object
:returns: (object, bool created)
"""
- obj = cls.get(ldb, **kwargs)
+ obj = cls.get(samdb, **kwargs)
if obj is None:
attrs = dict(kwargs)
if defaults is not None:
attrs.update(defaults)
- return cls.create(ldb, **attrs), True
+ return cls.create(samdb, **attrs), True
else:
return obj, False
- def children(self, ldb):
+ def children(self, samdb):
"""Returns a Query of the current models children."""
return Model.query(
- ldb, base_dn=self.dn, scope=SCOPE_ONELEVEL, polymorphic=True)
+ samdb, base_dn=self.dn, scope=SCOPE_ONELEVEL, polymorphic=True)
- def save(self, ldb):
+ def save(self, samdb):
"""Save model to Ldb database.
The save operation will save all fields excluding fields that
After the save operation the object is refreshed from the server,
as often the server will populate some fields.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
"""
if self.dn is None:
- dn = self.get_base_dn(ldb)
+ dn = self.get_base_dn(samdb)
dn.add_child(f"CN={self.cn or self.name}")
self.dn = dn
if attr != "dn" and not field.readonly:
value = getattr(self, attr)
try:
- db_value = field.to_db_value(ldb, value, FLAG_MOD_ADD)
+ db_value = field.to_db_value(samdb, value, FLAG_MOD_ADD)
except ValueError as e:
raise FieldError(e, field=field)
message.add(db_value)
# Create object
- ldb.add(message)
+ samdb.add(message)
# Fetching object refreshes any automatically populated fields.
- res = ldb.search(dn, scope=SCOPE_BASE)
- self._apply(ldb, res[0])
+ res = samdb.search(dn, scope=SCOPE_BASE)
+ self._apply(samdb, res[0])
else:
# Existing Message was stored to work out what fields changed.
- existing_obj = self._from_message(ldb, self._message)
+ existing_obj = self._from_message(samdb, self._message)
# Only modify replace or modify fields that have changed.
# Any fields that are set to None or an empty list get unset.
if value != old_value:
try:
- db_value = field.to_db_value(ldb, value,
+ db_value = field.to_db_value(samdb, value,
FLAG_MOD_REPLACE)
except ValueError as e:
raise FieldError(e, field=field)
# Saving nothing only triggers an error.
if len(message):
- ldb.modify(message)
+ samdb.modify(message)
# Fetching object refreshes any automatically populated fields.
- self.refresh(ldb)
+ self.refresh(samdb)
- def delete(self, ldb):
+ def delete(self, samdb):
"""Delete item from Ldb database.
If self.dn is None then the object has not yet been saved.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
"""
if self.dn is None:
raise DeleteError("Cannot delete object that doesn't have a dn.")
try:
- ldb.delete(self.dn)
+ samdb.delete(self.dn)
except LdbError as e:
raise DeleteError(f"Delete failed: {e}")
- def protect(self, ldb):
+ def protect(self, samdb):
"""Protect object from accidental deletion.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
"""
- utils = SDUtils(ldb)
+ utils = SDUtils(samdb)
try:
utils.dacl_add_ace(self.dn, "(D;;DTSD;;;WD)")
except LdbError as e:
raise ProtectError(f"Failed to protect object: {e}")
- def unprotect(self, ldb):
+ def unprotect(self, samdb):
"""Unprotect object from accidental deletion.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
"""
- utils = SDUtils(ldb)
+ utils = SDUtils(samdb)
try:
utils.dacl_delete_aces(self.dn, "(D;;DTSD;;;WD)")
class Query:
"""Simple Query class used by the `Model.query` method."""
- def __init__(self, model, ldb, result, polymorphic):
+ def __init__(self, model, samdb, result, polymorphic):
self.model = model
- self.ldb = ldb
+ self.samdb = samdb
self.result = result
self.count = result.count
self.name = " ".join(RE_SPLIT_CAMELCASE.findall(model.__name__)).lower()
else:
model = self.model
- return model._from_message(self.ldb, message)
+ return model._from_message(self.samdb, message)
def first(self):
"""Returns the first item in the Query or None for no results."""
system_only = BooleanField("systemOnly", readonly=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the ClassSchema model.
This is the same as AttributeSchema, but the objectClass is different.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- return ldb.get_schema_basedn()
+ return samdb.get_schema_basedn()
@staticmethod
def get_object_class():
return "classSchema"
@classmethod
- def find(cls, ldb, name):
+ def find(cls, samdb, name):
"""Helper function to find class by name or raise NotFound.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param name: Class name
:raises: NotFound if not found
:raises: ValueError if name is not provided
if not name:
raise ValueError("Class name is required.")
- attr = cls.get(ldb, ldap_display_name=name)
+ attr = cls.get(samdb, ldap_display_name=name)
if attr is None:
raise NotFound(f"Could not locate {name} in class schema.")
system_only = BooleanField("systemOnly", readonly=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the AttributeSchema model.
This is the same as ClassSchema, but the objectClass is different.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- return ldb.get_schema_basedn()
+ return samdb.get_schema_basedn()
@staticmethod
def get_object_class():
return "attributeSchema"
@classmethod
- def find(cls, ldb, name):
+ def find(cls, samdb, name):
"""Helper function to find attribute by name or raise NotFound.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param name: Attribute name
:raises: NotFound if not found
:raises: ValueError if name is not provided
if not name:
raise ValueError("Attribute name is required.")
- attr = cls.get(ldb, ldap_display_name=name)
+ attr = cls.get(samdb, ldap_display_name=name)
if attr is None:
raise NotFound(f"Could not locate {name} in attribute schema.")
site_object_bl = DnField("siteObjectBL", readonly=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the Site model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for new objects
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child("CN=Sites")
return base_dn
system_flags = IntegerField("systemFlags", readonly=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the Subnet model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for new objects
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child("CN=Subnets,CN=Sites")
return base_dn
return self.account_name
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the User model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for new objects
"""
- return ldb.get_wellknown_dn(ldb.get_default_basedn(),
- DS_GUID_USERS_CONTAINER)
+ return samdb.get_wellknown_dn(samdb.get_default_basedn(),
+ DS_GUID_USERS_CONTAINER)
@classmethod
- def get_search_dn(cls, ldb):
+ def get_search_dn(cls, samdb):
"""Return Dn used for searching so Computers will also be found.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn to use for searching
"""
- return ldb.get_root_basedn()
+ return samdb.get_root_basedn()
@staticmethod
def get_object_class():
return "user"
@classmethod
- def find(cls, ldb, name):
+ def find(cls, samdb, name):
"""Helper function to find a user by Dn, objectSid, or sAMAccountName.
If the Dn or Sid can't be parsed, use sAMAccountName instead.
"""
try:
- query = {"dn": Dn(ldb, name)}
+ query = {"dn": Dn(samdb, name)}
except ValueError:
try:
query = {"object_sid": dom_sid(name)}
except ValueError:
query = {"account_name": name}
- return cls.get(ldb, **query)
+ return cls.get(samdb, **query)
@classmethod
- def get_sid_for_principal(cls, ldb, principal) -> str:
+ def get_sid_for_principal(cls, samdb, principal) -> str:
"""Return object_sid for the provided principal.
If principal is already an object sid then return without fetching,
try:
return str(dom_sid(principal))
except ValueError:
- user = cls.find(ldb, principal)
+ user = cls.find(samdb, principal)
if user:
return user.object_sid
else:
"msDS-ValueTypeReferenceBL", readonly=True)
@staticmethod
- def get_base_dn(ldb):
+ def get_base_dn(samdb):
"""Return the base DN for the ValueType model.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:return: Dn object of container
"""
- base_dn = ldb.get_config_basedn()
+ base_dn = samdb.get_config_basedn()
base_dn.add_child("CN=Value Types,CN=Claims Configuration,CN=Services")
return base_dn
return "msDS-ValueType"
@classmethod
- def find(cls, ldb, attribute):
+ def find(cls, samdb, attribute):
"""Helper function to get ValueType by attribute or raise NotFound.
- :param ldb: Ldb connection
+ :param samdb: SamDB connection
:param attribute: AttributeSchema object
:raises: NotFound if not found
:raises: ValueError for unknown attribute syntax
raise ValueError(f"Unable to process attribute syntax {syntax}")
# This should always return something but should still be handled.
- value_type = cls.get(ldb, cn=cn)
+ value_type = cls.get(samdb, cn=cn)
if value_type is None:
raise NotFound(f"Could not find claim value type for {attribute}.")