def normalise_int32(ivalue):
- '''normalise a ldap integer to signed 32 bit'''
+ """normalise a ldap integer to signed 32 bit"""
if int(ivalue) & 0x80000000 and int(ivalue) > 0:
return str(int(ivalue) - 0x100000000)
return str(ivalue)
def check_database(self, DN=None, scope=ldb.SCOPE_SUBTREE, controls=None,
attrs=None):
- '''perform a database check, returning the number of errors found'''
+ """perform a database check, returning the number of errors found"""
res = self.samdb.search(base=DN, scope=scope, attrs=['dn'], controls=controls)
self.report('Checking %u objects' % len(res))
error_count = 0
return error_count
def report(self, msg):
- '''print a message unless quiet is set'''
+ """print a message unless quiet is set"""
if self.quiet:
return
if self.colour:
print(msg)
def confirm(self, msg, allow_all=False, forced=False):
- '''confirm a change'''
+ """confirm a change"""
if not self.fix:
return False
if self.quiet:
################################################################
# a local confirm function with support for 'all'
def confirm_all(self, msg, all_attr):
- '''confirm a change with support for "all" '''
+ """confirm a change with support for "all" """
if not self.fix:
return False
if getattr(self, all_attr) == 'NONE':
return c
def do_delete(self, dn, controls, msg):
- '''delete dn with optional verbose output'''
+ """delete dn with optional verbose output"""
if self.verbose:
self.report("delete DN %s" % dn)
try:
return True
def do_modify(self, m, controls, msg, validate=True):
- '''perform a modify with optional verbose output'''
+ """perform a modify with optional verbose output"""
controls = controls + ["local_oid:%s:0" % dsdb.DSDB_CONTROL_DBCHECK]
if self.verbose:
self.report(self.samdb.write_ldif(m, ldb.CHANGETYPE_MODIFY))
return True
def do_rename(self, from_dn, to_rdn, to_base, controls, msg):
- '''perform a rename with optional verbose output'''
+ """perform a rename with optional verbose output"""
if self.verbose:
self.report("""dn: %s
changeType: modrdn
return linkID, revname
def err_empty_attribute(self, dn, attrname):
- '''fix empty attributes'''
+ """fix empty attributes"""
self.report("ERROR: Empty attribute %s in %s" % (attrname, dn))
if not self.confirm_all('Remove empty attribute %s from %s?' % (attrname, dn), 'remove_all_empty_attributes'):
self.report("Not fixing empty attribute %s" % attrname)
self.report("Removed empty attribute %s" % attrname)
def err_normalise_mismatch(self, dn, attrname, values):
- '''fix attribute normalisation errors, without altering sort order'''
+ """fix attribute normalisation errors, without altering sort order"""
self.report("ERROR: Normalisation error for attribute %s in %s" % (attrname, dn))
mod_list = []
for val in values:
self.report("Normalised attribute %s" % attrname)
def err_normalise_mismatch_replace(self, dn, attrname, values):
- '''fix attribute normalisation and/or sort errors'''
+ """fix attribute normalisation and/or sort errors"""
normalised = self.samdb.dsdb_normalise_attributes(self.samdb_schema, attrname, values)
if list(normalised) == values:
# how we got here is a mystery.
self.report("Normalised attribute %s" % attrname)
def err_duplicate_values(self, dn, attrname, dup_values, values):
- '''fix duplicate attribute values'''
+ """fix duplicate attribute values"""
self.report("ERROR: Duplicate values for attribute '%s' in '%s'" % (attrname, dn))
self.report("Values contain a duplicate: [%s]/[%s]!" %
(dump_attr_values(dup_values), dump_attr_values(values)))
self.report("Removed duplicate value on attribute %s" % attrname)
def is_deleted_objects_dn(self, dsdb_dn):
- '''see if a dsdb_Dn is the special Deleted Objects DN'''
+ """see if a dsdb_Dn is the special Deleted Objects DN"""
return dsdb_dn.prefix == "B:32:%s:" % dsdb.DS_GUID_DELETED_OBJECTS_CONTAINER
def err_missing_objectclass(self, dn):
self.report("Fixed missing DN SID on attribute %s" % (attrname))
def err_unknown_attribute(self, obj, attrname):
- '''handle an unknown attribute error'''
+ """handle an unknown attribute error"""
self.report("ERROR: unknown attribute '%s' in %s" % (attrname, obj.dn))
if not self.confirm_all('Remove unknown attribute %s' % attrname, 'remove_all_unknown_attributes'):
self.report("Not removing %s" % attrname)
self.report("Removed unknown attribute %s" % (attrname))
def err_undead_linked_attribute(self, obj, attrname, val):
- '''handle a link that should not be there on a deleted object'''
+ """handle a link that should not be there on a deleted object"""
self.report("ERROR: linked attribute '%s' to '%s' is present on "
"deleted object %s" % (attrname, val, obj.dn))
if not self.confirm_all('Remove linked attribute %s' % attrname, 'fix_undead_linked_attributes'):
self.report("Fixed undead forward link %s" % (attrname))
def err_missing_backlink(self, obj, attrname, val, backlink_name, target_dn):
- '''handle a missing backlink value'''
+ """handle a missing backlink value"""
self.report("ERROR: missing backlink attribute '%s' in %s for link %s in %s" % (backlink_name, target_dn, attrname, obj.dn))
if not self.confirm_all('Fix missing backlink %s' % backlink_name, 'fix_all_missing_backlinks'):
self.report("Not fixing missing backlink %s" % backlink_name)
self.report("Fixed missing backlink %s" % (backlink_name))
def err_incorrect_rmd_flags(self, obj, attrname, revealed_dn):
- '''handle a incorrect RMD_FLAGS value'''
+ """handle a incorrect RMD_FLAGS value"""
rmd_flags = int(revealed_dn.dn.get_extended_component("RMD_FLAGS"))
self.report("ERROR: incorrect RMD_FLAGS value %u for attribute '%s' in %s for link %s" % (rmd_flags, attrname, obj.dn, revealed_dn.dn.extended_str()))
if not self.confirm_all('Fix incorrect RMD_FLAGS %u' % rmd_flags, 'fix_rmd_flags'):
def err_orphaned_backlink(self, obj_dn, backlink_attr, backlink_val,
target_dn, forward_attr, forward_syntax,
check_duplicates=True):
- '''handle a orphaned backlink value'''
+ """handle a orphaned backlink value"""
if check_duplicates is True and self.has_duplicate_links(target_dn, forward_attr, forward_syntax):
self.report("WARNING: Keep orphaned backlink attribute " +
"'%s' in '%s' for link '%s' in '%s'" % (
self.report("Fixed orphaned backlink %s" % (backlink_attr))
def err_recover_forward_links(self, obj, forward_attr, forward_vals):
- '''handle a duplicate links value'''
+ """handle a duplicate links value"""
self.report("RECHECK: 'Missing/Duplicate/Correct link' lines above for attribute '%s' in '%s'" % (forward_attr, obj.dn))
self.duplicate_link_cache[duplicate_cache_key] = False
def err_no_fsmoRoleOwner(self, obj):
- '''handle a missing fSMORoleOwner'''
+ """handle a missing fSMORoleOwner"""
self.report("ERROR: fSMORoleOwner not found for role %s" % (obj.dn))
res = self.samdb.search("",
scope=ldb.SCOPE_BASE, attrs=["dsServiceName"])
self.report("Seized role %s onto current DC by adding fSMORoleOwner=%s" % (obj.dn, serviceName))
def err_missing_parent(self, obj):
- '''handle a missing parent'''
+ """handle a missing parent"""
self.report("ERROR: parent object not found for %s" % (obj.dn))
if not self.confirm_all('Move object %s into LostAndFound?' % (obj.dn), 'move_to_lost_and_found'):
self.report('Not moving object %s into LostAndFound' % (obj.dn))
self.samdb.transaction_cancel()
def err_wrong_dn(self, obj, new_dn, rdn_attr, rdn_val, name_val, controls):
- '''handle a wrong dn'''
+ """handle a wrong dn"""
new_rdn = ldb.Dn(self.samdb, str(new_dn))
new_rdn.remove_base_components(len(new_rdn) - 1)
self.report("Renamed %s into %s" % (obj.dn, new_dn))
def err_wrong_instancetype(self, obj, calculated_instancetype):
- '''handle a wrong instanceType'''
+ """handle a wrong instanceType"""
self.report("ERROR: wrong instanceType %s on %s, should be %d" % (obj["instanceType"], obj.dn, calculated_instancetype))
if not self.confirm_all('Change instanceType from %s to %d on %s?' % (obj["instanceType"], calculated_instancetype, obj.dn), 'fix_instancetype'):
self.report('Not changing instanceType from %s to %d on %s' % (obj["instanceType"], calculated_instancetype, obj.dn))
self.report("ERROR: incorrect userParameters value on object %s. If you have another working DC that does not give this warning, please run 'samba-tool drs replicate --full-sync --local <destinationDC> <sourceDC> %s'" % (obj.dn, self.samdb.get_nc_root(obj.dn)))
def err_base64_userParameters(self, obj, attrname, value):
- '''handle a userParameters that is wrongly base64 encoded'''
+ """handle a userParameters that is wrongly base64 encoded"""
self.report("ERROR: wrongly formatted userParameters %s on %s, should not be base64-encoded" % (value, obj.dn))
if not self.confirm_all('Convert userParameters from base64 encoding on %s?' % (obj.dn), 'fix_base64_userparameters'):
self.report('Not changing userParameters from base64 encoding on %s' % (obj.dn))
self.report("Corrected base64-encoded userParameters on %s by converting from base64" % (obj.dn))
def err_utf8_userParameters(self, obj, attrname, value):
- '''handle a userParameters that is wrongly utf-8 encoded'''
+ """handle a userParameters that is wrongly utf-8 encoded"""
self.report("ERROR: wrongly formatted userParameters on %s, "
"should not be pseudo-UTF8 encoded" % (obj.dn))
if not self.confirm_all('Convert userParameters from UTF8 encoding on %s?' % (obj.dn), 'fix_utf8_userparameters'):
self.report("Corrected psudo-UTF8 encoded userParameters on %s by converting from UTF8" % (obj.dn))
def err_doubled_userParameters(self, obj, attrname, value):
- '''handle a userParameters that has been utf-16 encoded twice'''
+ """handle a userParameters that has been utf-16 encoded twice"""
self.report("ERROR: wrongly formatted userParameters on %s, should not be double UTF16 encoded" % (obj.dn))
if not self.confirm_all('Convert userParameters from doubled UTF-16 encoding on %s?' % (obj.dn), 'fix_doubled_userparameters'):
self.report('Not changing userParameters from doubled UTF-16 encoding on %s' % (obj.dn))
self.report("ERROR: incorrect userParameters value on object %s (odd length). If you have another working DC that does not give this warning, please run 'samba-tool drs replicate --full-sync --local <destinationDC> <sourceDC> %s'" % (obj.dn, self.samdb.get_nc_root(obj.dn)))
def find_revealed_link(self, dn, attrname, guid):
- '''return a revealed link in an object'''
+ """return a revealed link in an object"""
res = self.samdb.search(base=dn, scope=ldb.SCOPE_BASE, attrs=[attrname],
controls=["show_deleted:0", "extended_dn:0", "reveal_internals:0"])
syntax_oid = self.samdb_schema.get_syntax_oid_from_lDAPDisplayName(attrname)
return None
def check_duplicate_links(self, obj, forward_attr, forward_syntax, forward_linkID, backlink_attr):
- '''check a linked values for duplicate forward links'''
+ """check a linked values for duplicate forward links"""
error_count = 0
duplicate_dict = dict()
return (error_count, duplicate_dict, unique_dict)
def has_duplicate_links(self, dn, forward_attr, forward_syntax):
- '''check a linked values for duplicate forward links'''
+ """check a linked values for duplicate forward links"""
error_count = 0
duplicate_cache_key = "%s:%s" % (str(dn), forward_attr)
forward_syntax,
backlink_attr,
forward_unique_dict):
- '''Find all backlinks linking to obj_guid_str not already in forward_unique_dict'''
+ """Find all backlinks linking to obj_guid_str not already in forward_unique_dict"""
missing_forward_links = []
error_count = 0
return (missing_forward_links, error_count)
def check_dn(self, obj, attrname, syntax_oid):
- '''check a DN attribute for correctness'''
+ """check a DN attribute for correctness"""
error_count = 0
obj_guid = obj['objectGUID'][0]
return None
def get_originating_time(self, val, attid):
- '''Read metadata properties and return the originating time for
+ """Read metadata properties and return the originating time for
a given attributeId.
:return: the originating time or 0 if not found
- '''
+ """
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, val)
o = self.find_repl_attid(repl, attid)
return 0
def process_metadata(self, dn, val):
- '''Read metadata properties and list attributes in it.
- raises KeyError if the attid is unknown.'''
+ """Read metadata properties and list attributes in it.
+ raises KeyError if the attid is unknown."""
set_att = set()
wrong_attids = set()
return (set_att, list_attid, wrong_attids)
def fix_metadata(self, obj, attr):
- '''re-write replPropertyMetaData elements for a single attribute for a
- object. This is used to fix missing replPropertyMetaData elements'''
+ """re-write replPropertyMetaData elements for a single attribute for a
+ object. This is used to fix missing replPropertyMetaData elements"""
guid_str = str(ndr_unpack(misc.GUID, obj['objectGUID'][0]))
dn = ldb.Dn(self.samdb, "<GUID=%s>" % guid_str)
res = self.samdb.search(base=dn, scope=ldb.SCOPE_BASE, attrs=[attr],
return (sd, None)
def err_wrong_sd(self, dn, sd, sd_broken):
- '''re-write the SD due to incorrect inherited ACEs'''
+ """re-write the SD due to incorrect inherited ACEs"""
sd_attr = "nTSecurityDescriptor"
sd_val = ndr_pack(sd)
sd_flags = security.SECINFO_DACL | security.SECINFO_SACL
self.report("Fixed attribute '%s' of '%s'\n" % (sd_attr, dn))
def err_wrong_default_sd(self, dn, sd, diff):
- '''re-write the SD due to not matching the default (optional mode for fixing an incorrect provision)'''
+ """re-write the SD due to not matching the default (optional mode for fixing an incorrect provision)"""
sd_attr = "nTSecurityDescriptor"
sd_val = ndr_pack(sd)
sd_flags = security.SECINFO_DACL | security.SECINFO_SACL
self.report("Fixed attribute '%s' of '%s'\n" % (sd_attr, dn))
def err_missing_sd_owner(self, dn, sd):
- '''re-write the SD due to a missing owner or group'''
+ """re-write the SD due to a missing owner or group"""
sd_attr = "nTSecurityDescriptor"
sd_val = ndr_pack(sd)
sd_flags = security.SECINFO_OWNER | security.SECINFO_GROUP
continue
found = True
- self.report('''ERROR: on replPropertyMetaData of %s, the instanceType on attribute 0x%08x,
+ self.report("""ERROR: on replPropertyMetaData of %s, the instanceType on attribute 0x%08x,
version %d changed at %s is 00000000-0000-0000-0000-000000000000,
- but should be non-zero. Proposed fix is to set to our invocationID (%s).'''
+ but should be non-zero. Proposed fix is to set to our invocationID (%s)."""
% (dn, o.attid, o.version,
time.ctime(samba.nttime2unix(o.originating_change_time)),
self.samdb.get_invocation_id()))
return attrs, lc_attrs
def check_object(self, dn, requested_attrs=None):
- '''check one object'''
+ """check one object"""
if self.verbose:
self.report("Checking object %s" % dn)
################################################################
# check special @ROOTDSE attributes
def check_rootdse(self):
- '''check the @ROOTDSE special object'''
+ """check the @ROOTDSE special object"""
dn = ldb.Dn(self.samdb, '@ROOTDSE')
if self.verbose:
self.report("Checking object %s" % dn)
# re-index the database
def reindex_database(self):
- '''re-index the whole database'''
+ """re-index the whole database"""
m = ldb.Message()
m.dn = ldb.Dn(self.samdb, "@ATTRIBUTES")
m['add'] = ldb.MessageElement('NONE', ldb.FLAG_MOD_ADD, 'force_reindex')
###############################################
# reset @MODULES
def reset_modules(self):
- '''reset @MODULES to that needed for current sam.ldb (to read a very old database)'''
+ """reset @MODULES to that needed for current sam.ldb (to read a very old database)"""
m = ldb.Message()
m.dn = ldb.Dn(self.samdb, "@MODULES")
m['@LIST'] = ldb.MessageElement('samba_dsdb', ldb.FLAG_MOD_REPLACE, '@LIST')
def drs_DsBind(drs):
- '''make a DsBind call, returning the binding handle'''
+ """make a DsBind call, returning the binding handle"""
bind_info = drsuapi.DsBindInfoCtr()
bind_info.length = 28
bind_info.info = drsuapi.DsBindInfo28()
def drs_get_rodc_partial_attribute_set(samdb):
- '''get a list of attributes for RODC replication'''
+ """get a list of attributes for RODC replication"""
partial_attribute_set = drsuapi.DsPartialAttributeSet()
partial_attribute_set.version = 1
class drs_Replicate(object):
- '''DRS replication calls'''
+ """DRS replication calls"""
def __init__(self, binding_string, lp, creds, samdb, invocation_id):
self.drs = drsuapi.drsuapi(binding_string, lp, creds)
def process_chunk(self, level, ctr, schema, req_level, req, first_chunk):
- '''Processes a single chunk of received replication data'''
+ """Processes a single chunk of received replication data"""
# pass the replication into the py_net.c python bindings for processing
self.net.replicate_chunk(self.replication_state, level, ctr,
schema=schema, req_level=req_level, req=req)
def replicate(self, dn, source_dsa_invocation_id, destination_dsa_guid,
schema=False, exop=drsuapi.DRSUAPI_EXOP_NONE, rodc=False,
replica_flags=None, full_sync=True, sync_forced=False, more_flags=0):
- '''replicate a single DN'''
+ """replicate a single DN"""
# setup for a GetNCChanges call
if self.supports_ext & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V10:
# Handles the special case of creating a new clone of a DB, while also renaming
# the entire DB's objects on the way through
class drs_ReplicateRenamer(drs_Replicate):
- '''Uses DRS replication to rename the entire DB'''
+ """Uses DRS replication to rename the entire DB"""
def __init__(self, binding_string, lp, creds, samdb, invocation_id,
old_base_dn, new_base_dn):
self.more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
def rename_dn(self, dn_str):
- '''Uses string substitution to replace the base DN'''
+ """Uses string substitution to replace the base DN"""
return re.sub('%s$' % self.old_base_dn, self.new_base_dn, dn_str)
def update_name_attr(self, base_obj):
- '''Updates the 'name' attribute for the base DN object'''
+ """Updates the 'name' attribute for the base DN object"""
for attr in base_obj.attribute_ctr.attributes:
if attr.attid == DRSUAPI_ATTID_name:
base_dn = ldb.Dn(self.samdb, base_obj.identifier.dn)
attr.value_ctr.values[0].blob = new_name.encode('utf-16-le')
def rename_top_level_object(self, first_obj):
- '''Renames the first/top-level object in a partition'''
+ """Renames the first/top-level object in a partition"""
old_dn = first_obj.identifier.dn
first_obj.identifier.dn = self.rename_dn(first_obj.identifier.dn)
print("Renaming partition %s --> %s" % (old_dn,
self.update_name_attr(first_obj)
def process_chunk(self, level, ctr, schema, req_level, req, first_chunk):
- '''Processes a single chunk of received replication data'''
+ """Processes a single chunk of received replication data"""
# we need to rename the NC in every chunk - this gets used in searches
# when applying the chunk
class gp_access_ext(gp_inf_ext):
- '''This class takes the .inf file parameter (essentially a GPO file mapped
+ """This class takes the .inf file parameter (essentially a GPO file mapped
to a GUID), hashmaps it to the Samba parameter, which then uses an ldb
object to update the parameter to Samba4. Not registry oriented whatsoever.
- '''
+ """
def load_ldb(self):
try:
self.ldb.set_pwdProperties(val)
def mapper(self):
- '''ldap value : samba setter'''
+ """ldap value : samba setter"""
return {"minPwdAge": (self.ch_minPwdAge, days2rel_nttime),
"maxPwdAge": (self.ch_maxPwdAge, days2rel_nttime),
# Could be none, but I like the method assignment in
class gp_log:
- ''' Log settings overwritten by gpo apply
+ """ Log settings overwritten by gpo apply
The gp_log is an xml file that stores a history of gpo changes (and the
original setting value).
The applylog keeps track of the order in which the GPOs were applied, so
that they can be rolled back in reverse, returning the machine to the state
prior to policy application.
- '''
+ """
def __init__(self, user, gpostore, db_log=None):
- ''' Initialize the gp_log
+ """ Initialize the gp_log
param user - the username (or machine name) that policies are
being applied to
param gpostore - the GPOStorage obj which references the tdb which
contains gp_logs
param db_log - (optional) a string to initialize the gp_log
- '''
+ """
self._state = GPOSTATE.APPLY
self.gpostore = gpostore
self.username = user
user_obj.attrib['name'] = user
def state(self, value):
- ''' Policy application state
+ """ Policy application state
param value - APPLY, ENFORCE, or UNAPPLY
The behavior of the gp_log depends on whether we are applying policy,
but the gp_log does not change. During an unapply, additions to the log
should be ignored (since function calls to apply settings are actually
reverting policy), but removals from the log are allowed.
- '''
+ """
# If we're enforcing, but we've unapplied, apply instead
if value == GPOSTATE.ENFORCE:
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
self._state = value
def get_state(self):
- '''Check the GPOSTATE
- '''
+ """Check the GPOSTATE
+ """
return self._state
def set_guid(self, guid):
- ''' Log to a different GPO guid
+ """ Log to a different GPO guid
param guid - guid value of the GPO from which we're applying
policy
- '''
+ """
self.guid = guid
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
obj = user_obj.find('guid[@value="%s"]' % guid)
item.attrib['value'] = guid
def store(self, gp_ext_name, attribute, old_val):
- ''' Store an attribute in the gp_log
+ """ Store an attribute in the gp_log
param gp_ext_name - Name of the extension applying policy
param attribute - The attribute being modified
param old_val - The value of the attribute prior to policy
application
- '''
+ """
if self._state == GPOSTATE.UNAPPLY or self._state == GPOSTATE.ENFORCE:
return None
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
attr.text = old_val
def retrieve(self, gp_ext_name, attribute):
- ''' Retrieve a stored attribute from the gp_log
+ """ Retrieve a stored attribute from the gp_log
param gp_ext_name - Name of the extension which applied policy
param attribute - The attribute being retrieved
return - The value of the attribute prior to policy
application
- '''
+ """
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
guid_obj = user_obj.find('guid[@value="%s"]' % self.guid)
assert guid_obj is not None, "gpo guid was not set"
return None
def retrieve_all(self, gp_ext_name):
- ''' Retrieve all stored attributes for this user, GPO guid, and CSE
+ """ Retrieve all stored attributes for this user, GPO guid, and CSE
param gp_ext_name - Name of the extension which applied policy
return - The values of the attributes prior to policy
application
- '''
+ """
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
guid_obj = user_obj.find('guid[@value="%s"]' % self.guid)
assert guid_obj is not None, "gpo guid was not set"
return {}
def get_applied_guids(self):
- ''' Return a list of applied ext guids
+ """ Return a list of applied ext guids
return - List of guids for gpos that have applied settings
to the system.
- '''
+ """
guids = []
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
if user_obj is not None:
return guids
def get_applied_settings(self, guids):
- ''' Return a list of applied ext guids
+ """ Return a list of applied ext guids
return - List of tuples containing the guid of a gpo, then
a dictionary of policies and their values prior
policy application. These are sorted so that the
most recently applied settings are removed first.
- '''
+ """
ret = []
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
for guid in guids:
return ret
def delete(self, gp_ext_name, attribute):
- ''' Remove an attribute from the gp_log
+ """ Remove an attribute from the gp_log
param gp_ext_name - name of extension from which to remove the
attribute
param attribute - attribute to remove
- '''
+ """
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
guid_obj = user_obj.find('guid[@value="%s"]' % self.guid)
assert guid_obj is not None, "gpo guid was not set"
guid_obj.remove(ext)
def commit(self):
- ''' Write gp_log changes to disk '''
+ """ Write gp_log changes to disk """
self.gpostore.store(self.username, etree.tostring(self.gpdb, 'utf-8'))
class gp_applier(object):
- '''Group Policy Applier/Unapplier/Modifier
+ """Group Policy Applier/Unapplier/Modifier
The applier defines functions for monitoring policy application,
removal, and modification. It must be a multi-derived class paired
with a subclass of gp_ext.
- '''
+ """
__metaclass__ = ABCMeta
def cache_add_attribute(self, guid, attribute, value):
- '''Add an attribute and value to the Group Policy cache
+ """Add an attribute and value to the Group Policy cache
guid - The GPO guid which applies this policy
attribute - The attribute name of the policy being applied
value - The value of the policy being applied
Normally called by the subclass apply() function after applying policy.
- '''
+ """
self.gp_db.set_guid(guid)
self.gp_db.store(str(self), attribute, value)
self.gp_db.commit()
def cache_remove_attribute(self, guid, attribute):
- '''Remove an attribute from the Group Policy cache
+ """Remove an attribute from the Group Policy cache
guid - The GPO guid which applies this policy
attribute - The attribute name of the policy being unapplied
Normally called by the subclass unapply() function when removing old
policy.
- '''
+ """
self.gp_db.set_guid(guid)
self.gp_db.delete(str(self), attribute)
self.gp_db.commit()
def cache_get_attribute_value(self, guid, attribute):
- '''Retrieve the value stored in the cache for the given attribute
+ """Retrieve the value stored in the cache for the given attribute
guid - The GPO guid which applies this policy
attribute - The attribute name of the policy
- '''
+ """
self.gp_db.set_guid(guid)
return self.gp_db.retrieve(str(self), attribute)
def cache_get_all_attribute_values(self, guid):
- '''Retrieve all attribute/values currently stored for this gpo+policy
+ """Retrieve all attribute/values currently stored for this gpo+policy
guid - The GPO guid which applies this policy
- '''
+ """
self.gp_db.set_guid(guid)
return self.gp_db.retrieve_all(str(self))
def cache_get_apply_state(self):
- '''Return the current apply state
+ """Return the current apply state
return - APPLY|ENFORCE|UNAPPLY
- '''
+ """
return self.gp_db.get_state()
def generate_attribute(self, name, *args):
- '''Generate an attribute name from arbitrary data
+ """Generate an attribute name from arbitrary data
name - A name to ensure uniqueness
args - Any arbitrary set of args, str or bytes
return - A blake2b digest of the data, the attribute
reproducible and uniquely identifies it. Hashing the name with
the data ensures we don't falsely identify a match which is the same
text in a different file. Using this attribute generator is optional.
- '''
+ """
data = b''.join([get_bytes(arg) for arg in [*args]])
return blake2b(get_bytes(name)+data).hexdigest()
def generate_value_hash(self, *args):
- '''Generate a unique value which identifies value changes
+ """Generate a unique value which identifies value changes
args - Any arbitrary set of args, str or bytes
return - A blake2b digest of the data, the value represented
- '''
+ """
data = b''.join([get_bytes(arg) for arg in [*args]])
return blake2b(data).hexdigest()
@abstractmethod
def unapply(self, guid, attribute, value):
- '''Group Policy Unapply
+ """Group Policy Unapply
guid - The GPO guid which applies this policy
attribute - The attribute name of the policy being unapplied
value - The value of the policy being unapplied
- '''
+ """
pass
@abstractmethod
def apply(self, guid, attribute, applier_func, *args):
- '''Group Policy Apply
+ """Group Policy Apply
guid - The GPO guid which applies this policy
attribute - The attribute name of the policy being applied
applier_func - An applier function which takes variable args
first unapply any changed policy. See for example calls to
`cache_get_all_attribute_values()` which searches for all policies
applied by this GPO for this Client Side Extension (CSE).
- '''
+ """
pass
def clean(self, guid, keep=None, remove=None, **kwargs):
- '''Cleanup old removed attributes
+ """Cleanup old removed attributes
keep - A list of attributes to keep
remove - A single attribute to remove, or a list of attributes to
remove
function
This is only necessary for CSEs which provide multiple attributes.
- '''
+ """
# Clean syntax is, either provide a single remove attribute,
# or a list of either removal attributes or keep attributes.
if keep is None:
class gp_misc_applier(gp_applier):
- '''Group Policy Miscellaneous Applier/Unapplier/Modifier
- '''
+ """Group Policy Miscellaneous Applier/Unapplier/Modifier
+ """
def generate_value(self, **kwargs):
data = etree.Element('data')
class gp_file_applier(gp_applier):
- '''Group Policy File Applier/Unapplier/Modifier
+ """Group Policy File Applier/Unapplier/Modifier
Subclass of abstract class gp_applier for monitoring policy applied
via a file.
- '''
+ """
def __generate_value(self, value_hash, files, sep):
data = [value_hash]
return sep.join(data)
def __parse_value(self, value, sep):
- '''Parse a value
+ """Parse a value
return - A unique HASH, followed by the file list
- '''
+ """
if value is None:
return None, []
data = value.split(sep)
self.cache_remove_attribute(guid, attribute)
def apply(self, guid, attribute, value_hash, applier_func, *args, sep=':'):
- '''
+ """
applier_func MUST return a list of files created by the applier.
This applier is for policies which only apply to a single file (with
a couple small exceptions). This applier will remove any policy applied
by this GPO which doesn't match the new policy.
- '''
+ """
# If the policy has changed, unapply, then apply new policy
old_val = self.cache_get_attribute_value(guid, attribute)
# Ignore removal if this policy is applied and hasn't changed
self.cache_add_attribute(guid, attribute, new_value)
-''' Fetch the hostname of a writable DC '''
+""" Fetch the hostname of a writable DC """
def get_dc_hostname(creds, lp):
return cldap_ret.pdc_name
-''' Fetch a list of GUIDs for applicable GPOs '''
+""" Fetch a list of GUIDs for applicable GPOs """
def get_gpo(samdb, gpo_dn):
return site_dn
def get_gpo_list(dc_hostname, creds, lp, username):
- '''Get the full list of GROUP_POLICY_OBJECTs for a given username.
+ """Get the full list of GROUP_POLICY_OBJECTs for a given username.
Push GPOs to gpo_list so that the traversal order of the list matches
the order of application:
(L)ocal (S)ite (D)omain (O)rganizational(U)nit
pushed in the opposite order of application (OUs first, local last,
child-to-parent).
Forced GPOs are appended in the end since they override all others.
- '''
+ """
gpo_list = []
forced_gpo_list = []
url = 'ldap://' + dc_hostname
def set_privileges(username, uid, gid):
- '''
+ """
Set current process privileges
- '''
+ """
os.setegid(gid)
os.seteuid(uid)
def drop_privileges(username, func, *args):
- '''
+ """
Run supplied function with privileges for specified username.
- '''
+ """
current_uid = os.getuid()
if not current_uid == 0:
logger.setLevel(logging.DEBUG)
class slogm(object):
- '''
+ """
Structured log message class
- '''
+ """
def __init__(self, message, kwargs=None):
if kwargs is None:
kwargs = {}
return str(res[0]["dnsHostName"][0])
def get_domain_name(ctx):
- '''get netbios name of the domain from the partitions record'''
+ """get netbios name of the domain from the partitions record"""
partitions_dn = ctx.samdb.get_partitions_dn()
res = ctx.samdb.search(base=partitions_dn, scope=ldb.SCOPE_ONELEVEL, attrs=["nETBIOSName"],
expression='ncName=%s' % ldb.binary_encode(str(ctx.samdb.get_default_basedn())))
return str(res[0]["nETBIOSName"][0])
def get_forest_domain_name(ctx):
- '''get netbios name of the domain from the partitions record'''
+ """get netbios name of the domain from the partitions record"""
partitions_dn = ctx.samdb.get_partitions_dn()
res = ctx.samdb.search(base=partitions_dn, scope=ldb.SCOPE_ONELEVEL, attrs=["nETBIOSName"],
expression='ncName=%s' % ldb.binary_encode(str(ctx.samdb.get_root_basedn())))
return str(res[0]["nETBIOSName"][0])
def get_parent_partition_dn(ctx):
- '''get the parent domain partition DN from parent DNS name'''
+ """get the parent domain partition DN from parent DNS name"""
res = ctx.samdb.search(base=ctx.config_dn, attrs=[],
expression='(&(objectclass=crossRef)(dnsRoot=%s)(systemFlags:%s:=%u))' %
(ldb.binary_encode(ctx.parent_dnsdomain),
return str(res[0].dn)
def get_mysid(ctx):
- '''get the SID of the connected user. Only works with w2k8 and later,
- so only used for RODC join'''
+ """get the SID of the connected user. Only works with w2k8 and later,
+ so only used for RODC join"""
res = ctx.samdb.search(base="", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
binsid = res[0]["tokenGroups"][0]
return get_string(ctx.samdb.schema_format_value("objectSID", binsid))
def dn_exists(ctx, dn):
- '''check if a DN exists'''
+ """check if a DN exists"""
try:
res = ctx.samdb.search(base=dn, scope=ldb.SCOPE_BASE, attrs=[])
except ldb.LdbError as e5:
return True
def add_krbtgt_account(ctx):
- '''RODCs need a special krbtgt account'''
+ """RODCs need a special krbtgt account"""
print("Adding %s" % ctx.krbtgt_dn)
rec = {
"dn": ctx.krbtgt_dn,
ctx.samdb.rename(ctx.krbtgt_dn, ctx.new_krbtgt_dn)
def drsuapi_connect(ctx):
- '''make a DRSUAPI connection to the naming master'''
+ """make a DRSUAPI connection to the naming master"""
binding_options = "seal"
if ctx.lp.log_level() >= 9:
binding_options += ",print"
(ctx.drsuapi_handle, ctx.bind_supported_extensions) = drs_utils.drs_DsBind(ctx.drsuapi)
def create_tmp_samdb(ctx):
- '''create a temporary samdb object for schema queries'''
+ """create a temporary samdb object for schema queries"""
ctx.tmp_schema = Schema(ctx.domsid,
schemadn=ctx.schema_dn)
ctx.tmp_samdb = SamDB(session_info=system_session(), url=None, auto_connect=False,
ctx.tmp_samdb.set_schema(ctx.tmp_schema)
def DsAddEntry(ctx, recs):
- '''add a record via the DRSUAPI DsAddEntry call'''
+ """add a record via the DRSUAPI DsAddEntry call"""
if ctx.drsuapi is None:
ctx.drsuapi_connect()
if ctx.tmp_samdb is None:
return ctr.objects
def join_ntdsdsa_obj(ctx):
- '''return the ntdsdsa object to add'''
+ """return the ntdsdsa object to add"""
print("Adding %s" % ctx.ntds_dn)
return rec
def join_add_ntdsdsa(ctx):
- '''add the ntdsdsa object'''
+ """add the ntdsdsa object"""
rec = ctx.join_ntdsdsa_obj()
if ctx.forced_local_samdb:
ctx.ntds_guid = misc.GUID(ctx.samdb.schema_format_value("objectGUID", res[0]["objectGUID"][0]))
def join_add_objects(ctx, specified_sid=None):
- '''add the various objects needed for the join'''
+ """add the various objects needed for the join"""
if ctx.acct_dn:
print("Adding %s" % ctx.acct_dn)
rec = {
print("Provision OK for domain %s" % ctx.names.dnsdomain)
def create_replicator(ctx, repl_creds, binding_options):
- '''Creates a new DRS object for managing replications'''
+ """Creates a new DRS object for managing replications"""
return drs_utils.drs_Replicate(
"ncacn_ip_tcp:%s[%s]" % (ctx.server, binding_options),
ctx.lp, repl_creds, ctx.local_samdb, ctx.invocation_id)
ctx.base_dn, ctx.new_base_dn)
def create_non_global_lp(ctx, global_lp):
- '''Creates a non-global LoadParm based on the global LP's settings'''
+ """Creates a non-global LoadParm based on the global LP's settings"""
# the samba code shares a global LoadParm by default. Here we create a
# new LoadParm that retains the global settings, but any changes we
return local_lp
def rename_dn(ctx, dn_str):
- '''Uses string substitution to replace the base DN'''
+ """Uses string substitution to replace the base DN"""
old_base_dn = ctx.base_dn
return re.sub('%s$' % old_base_dn, ctx.new_base_dn, dn_str)
self.nc_type = NCType.unknown
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s:" % (self.__class__.__name__,) +\
"\n\tnc_dnstr=%s" % self.nc_dnstr +\
"\n\tnc_guid=%s" % str(self.nc_guid)
assert self.nc_guid is not None
def is_config(self):
- '''Return True if NC is config'''
+ """Return True if NC is config"""
assert self.nc_type != NCType.unknown
return self.nc_type == NCType.config
NamingContext.__init__(self, nc_dnstr)
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s:" % self.__class__.__name__ +\
"\n\tdsa_dnstr=%s" % self.rep_dsa_dnstr +\
"\n\tdsa_guid=%s" % self.rep_dsa_guid +\
return "%s\n%s" % (NamingContext.__str__(self), text)
def set_instantiated_flags(self, flags=0):
- '''Set or clear NC replica instantiated flags'''
+ """Set or clear NC replica instantiated flags"""
self.rep_instantiated_flags = flags
def identify_by_dsa_attr(self, samdb, attr):
return self.rep_default
def is_ro(self):
- '''Return True if NC replica is read only'''
+ """Return True if NC replica is read only"""
return self.rep_ro
def is_partial(self):
- '''Return True if NC replica is partial'''
+ """Return True if NC replica is partial"""
return self.rep_partial
def is_present(self):
self.connect_table = {}
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s:" % self.__class__.__name__
if self.dsa_dnstr is not None:
return self.current_rep_table.get(nc_dnstr)
def is_istg(self):
- '''Returns True if dsa is intersite topology generator for it's site'''
+ """Returns True if dsa is intersite topology generator for it's site"""
# The KCC on an RODC always acts as an ISTG for itself
return self.dsa_is_istg or self.dsa_is_ro
def is_ro(self):
- '''Returns True if dsa a read only domain controller'''
+ """Returns True if dsa a read only domain controller"""
return self.dsa_is_ro
def is_gc(self):
- '''Returns True if dsa hosts a global catalog'''
+ """Returns True if dsa hosts a global catalog"""
if (self.options & dsdb.DS_NTDSDSA_OPT_IS_GC) != 0:
return True
return False
return answer
def dumpstr_current_replica_table(self):
- '''Debug dump string output of current replica table'''
+ """Debug dump string output of current replica table"""
return '\n'.join(str(x) for x in self.current_rep_table)
def dumpstr_needed_replica_table(self):
- '''Debug dump string output of needed replica table'''
+ """Debug dump string output of needed replica table"""
return '\n'.join(str(x) for x in self.needed_rep_table)
def dumpstr_connect_table(self):
- '''Debug dump string output of connect table'''
+ """Debug dump string output of connect table"""
return '\n'.join(str(x) for x in self.connect_table)
def new_connection(self, options, system_flags, transport, from_dnstr,
self.schedule = None
def __str__(self):
- '''Debug dump string output of NTDSConnection object'''
+ """Debug dump string output of NTDSConnection object"""
text = "%s:\n\tdn=%s" % (self.__class__.__name__, self.dnstr) +\
"\n\tenabled=%s" % self.enabled +\
return self.enabled
def get_from_dnstr(self):
- '''Return fromServer dn string attribute'''
+ """Return fromServer dn string attribute"""
return self.from_dnstr
return needed, ro, partial
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s" % NamingContext.__str__(self) +\
"\n\tpartdn=%s" % self.partstr +\
"".join("\n\tmsDS-NC-Replica-Locations=%s" % k for k in self.rw_location_list) +\
return True
def is_intrasite_topology_disabled(self):
- '''Returns True if intra-site topology is disabled for site'''
+ """Returns True if intra-site topology is disabled for site"""
return (self.site_options &
dsdb.DS_NTDSSETTINGS_OPT_IS_AUTO_TOPOLOGY_DISABLED) != 0
def is_intersite_topology_disabled(self):
- '''Returns True if inter-site topology is disabled for site'''
+ """Returns True if inter-site topology is disabled for site"""
return ((self.site_options &
dsdb.DS_NTDSSETTINGS_OPT_IS_INTER_SITE_AUTO_TOPOLOGY_DISABLED)
!= 0)
def is_random_bridgehead_disabled(self):
- '''Returns True if selection of random bridgehead is disabled'''
+ """Returns True if selection of random bridgehead is disabled"""
return (self.site_options &
dsdb.DS_NTDSSETTINGS_OPT_IS_RAND_BH_SELECTION_DISABLED) != 0
def is_detect_stale_disabled(self):
- '''Returns True if detect stale is disabled for site'''
+ """Returns True if detect stale is disabled for site"""
return (self.site_options &
dsdb.DS_NTDSSETTINGS_OPT_IS_TOPL_DETECT_STALE_DISABLED) != 0
def is_cleanup_ntdsconn_disabled(self):
- '''Returns True if NTDS Connection cleanup is disabled for site'''
+ """Returns True if NTDS Connection cleanup is disabled for site"""
return (self.site_options &
dsdb.DS_NTDSSETTINGS_OPT_IS_TOPL_CLEANUP_DISABLED) != 0
def same_site(self, dsa):
- '''Return True if dsa is in this site'''
+ """Return True if dsa is in this site"""
if self.get_dsa(dsa.dsa_dnstr):
return True
return False
return False
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s:" % self.__class__.__name__ +\
"\n\tdn=%s" % self.site_dnstr +\
"\n\toptions=0x%X" % self.site_options +\
dsa.new_connection(opt, flags, transport, edge_dnstr, None)
def has_sufficient_edges(self):
- '''Return True if we have met the maximum "from edges" criteria'''
+ """Return True if we have met the maximum "from edges" criteria"""
if len(self.edge_from) >= self.max_edges:
return True
return False
self.bridgehead_list = []
def __str__(self):
- '''Debug dump string output of Transport object'''
+ """Debug dump string output of Transport object"""
text = "%s:\n\tdn=%s" % (self.__class__.__name__, self.dnstr) +\
"\n\tguid=%s" % str(self.guid) +\
self.__dict__['dns_name2'] = ndr_blob.ctr.other_info.dns_name2
def __str__(self):
- '''Debug dump string output of class'''
+ """Debug dump string output of class"""
text = "%s:" % self.__class__.__name__ +\
"\n\tdnstr=%s" % self.nc_dnstr +\
self.site_list = []
def __str__(self):
- '''Debug dump string output of Transport object'''
+ """Debug dump string output of Transport object"""
text = "%s:\n\tdn=%s" % (self.__class__.__name__, self.dnstr) +\
"\n\toptions=%d" % self.options +\
def netcmd_dnsname(lp):
- '''return the full DNS name of our own host. Used as a default
- for hostname when running status queries'''
+ """return the full DNS name of our own host. Used as a default
+ for hostname when running status queries"""
return lp.get('netbios name').lower() + "." + lp.get('realm').lower()
def netcmd_finddc(lp, creds, realm=None):
- '''Return domain-name of a writable/ldap-capable DC for the default
+ """Return domain-name of a writable/ldap-capable DC for the default
domain (parameter "realm" in smb.conf) unless another realm has been
- specified as argument'''
+ specified as argument"""
net = Net(creds=creds, lp=lp)
if realm is None:
realm = lp.get('realm')
def netcmd_get_domain_infos_via_cldap(lp, creds, address=None):
- '''Return domain information (CLDAP record) of the ldap-capable
- DC with the specified address'''
+ """Return domain information (CLDAP record) of the ldap-capable
+ DC with the specified address"""
net = Net(creds=creds, lp=lp)
cldap_ret = net.finddc(address=address,
flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS)
def attr_default(msg, attrname, default):
- '''get an attribute from a ldap msg with a default'''
+ """get an attribute from a ldap msg with a default"""
if attrname in msg:
return msg[attrname][0]
return default
class cmd_domain_backup_online(samba.netcmd.Command):
- '''Copy a running DC's current DB into a backup tar file.
+ """Copy a running DC's current DB into a backup tar file.
Takes a backup copy of the current domain from a running DC. If the domain
were to undergo a catastrophic failure, then the backup file can be used to
- all the domain's secrets are included in the backup file.
- although the DB contents can be untarred and examined manually, you need
to run 'samba-tool domain backup restore' before you can start a Samba DC
- from the backup file.'''
+ from the backup file."""
synopsis = "%prog --server=<DC-to-backup> --targetdir=<output-dir>"
takes_optiongroups = {
class cmd_domain_backup_restore(cmd_fsmo_seize):
- '''Restore the domain's DB from a backup-file.
+ """Restore the domain's DB from a backup-file.
This restores a previously backed up copy of the domain's DB on a new DC.
be joined to the new DC to recover the network.
Note that this command should be run as the root user - it will fail
- otherwise.'''
+ otherwise."""
synopsis = ("%prog --backup-file=<tar-file> --targetdir=<output-dir> "
"--newservername=<DC-name>")
def register_dns_zone(self, logger, samdb, lp, ntdsguid, host_ip,
host_ip6, site):
- '''
+ """
Registers the new realm's DNS objects when a renamed domain backup
is restored.
- '''
+ """
names = guess_names(lp)
domaindn = names.domaindn
forestdn = samdb.get_root_basedn().get_linearized()
dnsadmins_sid, add_root=False)
def fix_old_dc_references(self, samdb):
- '''Fixes attributes that reference the old/removed DCs'''
+ """Fixes attributes that reference the old/removed DCs"""
# we just want to fix up DB problems here that were introduced by us
# removing the old DCs. We restrict what we fix up so that the restored
samdb.transaction_commit()
def create_default_site(self, samdb, logger):
- '''Creates the default site, if it doesn't already exist'''
+ """Creates the default site, if it doesn't already exist"""
sitename = DEFAULTSITE
search_expr = "(&(cn={0})(objectclass=site))".format(sitename)
class cmd_domain_backup_rename(samba.netcmd.Command):
- '''Copy a running DC's DB to backup file, renaming the domain in the process.
+ """Copy a running DC's DB to backup file, renaming the domain in the process.
Where <new-domain> is the new domain's NetBIOS name, and <new-dnsrealm> is
the new domain's realm in DNS form.
in order to work (they will still refer to the old DC's IP instead of the
new DC's address).
- we recommend that you only use this option if you know what you're doing.
- '''
+ """
synopsis = ("%prog <new-domain> <new-dnsrealm> --server=<DC-to-backup> "
"--targetdir=<output-dir>")
takes_args = ["new_domain_name", "new_dns_realm"]
def update_dns_root(self, logger, samdb, old_realm, delete_old_dns):
- '''Updates dnsRoot for the partition objects to reflect the rename'''
+ """Updates dnsRoot for the partition objects to reflect the rename"""
# lookup the crossRef objects that hold the old realm's dnsRoot
partitions_dn = samdb.get_partitions_dn()
# Updates the CN=<domain>,CN=Partitions,CN=Configuration,... object to
# reflect the domain rename
def rename_domain_partition(self, logger, samdb, new_netbios_name):
- '''Renames the domain partition object and updates its nETBIOSName'''
+ """Renames the domain partition object and updates its nETBIOSName"""
# lookup the crossRef object that holds the nETBIOSName (nCName has
# already been updated by this point, but the netBIOS hasn't)
samdb.delete(dn, ["tree_delete:1"])
def fix_old_dn_attributes(self, samdb):
- '''Fixes attributes (i.e. objectCategory) that still use the old DN'''
+ """Fixes attributes (i.e. objectCategory) that still use the old DN"""
samdb.transaction_start()
# Just fix any mismatches in DN detected (leave any other errors)
class cmd_domain_backup_offline(samba.netcmd.Command):
- '''Backup the local domain directories safely into a tar file.
+ """Backup the local domain directories safely into a tar file.
Takes a backup copy of the current domain from the local files on disk,
with proper locking of the DB to ensure consistency. If the domain were to
- a backup can be created even if the DC isn't currently running.
- includes non-replicated attributes that an online backup wouldn't store.
- takes a copy of the raw database files, which has the risk that any
- hidden problems in the DB are preserved in the backup.'''
+ hidden problems in the DB are preserved in the backup."""
synopsis = "%prog [options]"
takes_optiongroups = {
class cmd_domain_backup(samba.netcmd.SuperCommand):
- '''Create or restore a backup of the domain.'''
+ """Create or restore a backup of the domain."""
subcommands = {'offline': cmd_domain_backup_offline(),
'online': cmd_domain_backup_online(),
'rename': cmd_domain_backup_rename(),
from samba.samdb import get_default_backend_store
def drsuapi_connect(ctx):
- '''make a DRSUAPI connection to the server'''
+ """make a DRSUAPI connection to the server"""
try:
(ctx.drsuapi, ctx.drsuapi_handle, ctx.bind_supported_extensions) = drs_utils.drsuapi_connect(ctx.server, ctx.lp, ctx.creds)
except Exception as e:
def samdb_connect(ctx):
- '''make a ldap connection to the server'''
+ """make a ldap connection to the server"""
try:
ctx.samdb = SamDB(url="ldap://%s" % ctx.server,
session_info=system_session(),
def drs_errmsg(werr):
- '''return "was successful" or an error string'''
+ """return "was successful" or an error string"""
(ecode, estring) = werr
if ecode == 0:
return "was successful"
def drs_parse_ntds_dn(ntds_dn):
- '''parse a NTDS DN returning a site and server'''
+ """parse a NTDS DN returning a site and server"""
a = ntds_dn.split(',')
if a[0] != "CN=NTDS Settings" or a[2] != "CN=Servers" or a[4] != 'CN=Sites':
raise RuntimeError("bad NTDS DN %s" % ntds_dn)
return d
def print_neighbour(self, d):
- '''print one set of neighbour information'''
+ """print one set of neighbour information"""
self.message("%s" % d['NC dn'])
if 'DSA' in d:
self.message("\t%s via RPC" % d['DSA'])
def drs_local_replicate(self, SOURCE_DC, NC, full_sync=False,
single_object=False,
sync_forced=False):
- '''replicate from a source DC to the local SAM'''
+ """replicate from a source DC to the local SAM"""
self.server = SOURCE_DC
drsuapi_connect(self)
from samba.netcmd import CommandError
def get_gpo_dn(samdb, gpo):
- '''Construct the DN for gpo'''
+ """Construct the DN for gpo"""
dn = samdb.get_default_basedn()
dn.add_child(ldb.Dn(samdb, "CN=Policies,CN=System"))
def gpo_flags_string(value):
- '''return gpo flags string'''
+ """return gpo flags string"""
flags = policy.get_gpo_flags(value)
if not flags:
ret = 'NONE'
def gplink_options_string(value):
- '''return gplink options string'''
+ """return gplink options string"""
options = policy.get_gplink_options(value)
if not options:
ret = 'NONE'
def parse_gplink(gplink):
- '''parse a gPLink into an array of dn and options'''
+ """parse a gPLink into an array of dn and options"""
ret = []
if gplink.strip() == '':
def encode_gplink(gplist):
- '''Encode an array of dn and options into gPLink string'''
+ """Encode an array of dn and options into gPLink string"""
ret = "".join("[LDAP://%s;%d]" % (g['dn'], g['options']) for g in gplist)
return ret
def dc_url(lp, creds, url=None, dc=None):
- '''If URL is not specified, return URL for writable DC.
- If dc is provided, use that to construct ldap URL'''
+ """If URL is not specified, return URL for writable DC.
+ If dc is provided, use that to construct ldap URL"""
if url is None:
if dc is None:
security.SECINFO_GROUP |
security.SECINFO_DACL |
security.SECINFO_SACL)):
- '''Get GPO information using gpo, displayname or dn'''
+ """Get GPO information using gpo, displayname or dn"""
policies_dn = samdb.get_default_basedn()
policies_dn.add_child(ldb.Dn(samdb, "CN=Policies,CN=System"))
def get_gpo_containers(samdb, gpo):
- '''lists dn of containers for a GPO'''
+ """lists dn of containers for a GPO"""
search_expr = "(&(objectClass=*)(gPLink=*%s*))" % gpo
try:
def del_gpo_link(samdb, container_dn, gpo):
- '''delete GPO link for the container'''
+ """delete GPO link for the container"""
# Check if valid Container DN and get existing GPlinks
try:
msg = samdb.search(base=container_dn, scope=ldb.SCOPE_BASE,
def parse_unc(unc):
- '''Parse UNC string into a hostname, a service, and a filepath'''
+ """Parse UNC string into a hostname, a service, and a filepath"""
tmp = []
if unc.startswith('\\\\'):
tmp = unc[2:].split('\\', 2)
return tmpdir, gpodir
def samdb_connect(self):
- '''make a ldap connection to the server'''
+ """make a ldap connection to the server"""
try:
self.samdb = SamDB(url=self.url,
session_info=system_session(),
takes_args = ["account*"]
def get_dn(self, samdb, account):
- '''work out what DN they meant'''
+ """work out what DN they meant"""
# we accept the account in SID, accountname or DN form
if account[0:2] == 'S-':
def checkset_backend(lp, backend, eadbfile):
- '''return the path to the eadb, or None'''
+ """return the path to the eadb, or None"""
if backend is None:
xattr_tdb = lp.get("xattr_tdb:file")
if xattr_tdb is not None:
self.samdb.modify(m)
def remove_s(self, json_input):
- '''remove_s
+ """remove_s
json_input: JSON list of entries to remove from GPO
Example json_input:
"class": "USER",
},
]
- '''
+ """
self.__validate_json(json_input, remove=True)
user_pol_data = self.__load_registry_pol(self.pol_file % 'User')
machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine')
self.increment_gpt_ini(machine_changed, user_changed)
def merge_s(self, json_input):
- '''merge_s
+ """merge_s
json_input: JSON list of entries to merge into GPO
Example json_input:
"data": "google.com"
},
]
- '''
+ """
self.__validate_json(json_input)
user_pol_data = self.__load_registry_pol(self.pol_file % 'User')
machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine')
self.increment_gpt_ini(machine_changed, user_changed)
def replace_s(self, json_input):
- '''replace_s
+ """replace_s
json_input: JSON list of entries to replace entries in GPO
Example json_input:
"data": "google.com"
},
]
- '''
+ """
self.__validate_json(json_input)
user_pol_data = preg.file()
machine_pol_data = preg.file()
dsdb._dsdb_set_am_rodc(self, am_rodc)
def connect(self, url=None, flags=0, options=None):
- '''connect to the database'''
+ """connect to the database"""
if self.lp is not None and not os.path.exists(url):
url = self.lp.private_path(url)
self.url = url
options=options)
def am_rodc(self):
- '''return True if we are an RODC'''
+ """return True if we are an RODC"""
return dsdb._am_rodc(self)
def am_pdc(self):
- '''return True if we are an PDC emulator'''
+ """return True if we are an PDC emulator"""
return dsdb._am_pdc(self)
def domain_dn(self):
- '''return the domain DN'''
+ """return the domain DN"""
return str(self.get_default_basedn())
def schema_dn(self):
- '''return the schema partition dn'''
+ """return the schema partition dn"""
return str(self.get_schema_basedn())
def disable_account(self, search_filter):
def get_attid_from_lDAPDisplayName(self, ldap_display_name,
is_schema_nc=False):
- '''return the attribute ID for a LDAP attribute as an integer as found in DRSUAPI'''
+ """return the attribute ID for a LDAP attribute as an integer as found in DRSUAPI"""
return dsdb._dsdb_get_attid_from_lDAPDisplayName(self,
ldap_display_name, is_schema_nc)
def get_syntax_oid_from_lDAPDisplayName(self, ldap_display_name):
- '''return the syntax OID for a LDAP attribute as a string'''
+ """return the syntax OID for a LDAP attribute as a string"""
return dsdb._dsdb_get_syntax_oid_from_lDAPDisplayName(self, ldap_display_name)
def get_systemFlags_from_lDAPDisplayName(self, ldap_display_name):
- '''return the systemFlags for a LDAP attribute as a integer'''
+ """return the systemFlags for a LDAP attribute as a integer"""
return dsdb._dsdb_get_systemFlags_from_lDAPDisplayName(self, ldap_display_name)
def get_linkId_from_lDAPDisplayName(self, ldap_display_name):
- '''return the linkID for a LDAP attribute as a integer'''
+ """return the linkID for a LDAP attribute as a integer"""
return dsdb._dsdb_get_linkId_from_lDAPDisplayName(self, ldap_display_name)
def get_lDAPDisplayName_by_attid(self, attid):
- '''return the lDAPDisplayName from an integer DRS attribute ID'''
+ """return the lDAPDisplayName from an integer DRS attribute ID"""
return dsdb._dsdb_get_lDAPDisplayName_by_attid(self, attid)
def get_backlink_from_lDAPDisplayName(self, ldap_display_name):
- '''return the attribute name of the corresponding backlink from the name
- of a forward link attribute. If there is no backlink return None'''
+ """return the attribute name of the corresponding backlink from the name
+ of a forward link attribute. If there is no backlink return None"""
return dsdb._dsdb_get_backlink_from_lDAPDisplayName(self, ldap_display_name)
def set_ntds_settings_dn(self, ntds_settings_dn):
self.modify_ldif(ldif)
def dsdb_DsReplicaAttribute(self, ldb, ldap_display_name, ldif_elements):
- '''convert a list of attribute values to a DRSUAPI DsReplicaAttribute'''
+ """convert a list of attribute values to a DRSUAPI DsReplicaAttribute"""
return dsdb._dsdb_DsReplicaAttribute(ldb, ldap_display_name, ldif_elements)
def dsdb_normalise_attributes(self, ldb, ldap_display_name, ldif_elements):
- '''normalise a list of attribute values'''
+ """normalise a list of attribute values"""
return dsdb._dsdb_normalise_attributes(ldb, ldap_display_name, ldif_elements)
def get_attribute_from_attid(self, attid):
return seq
def get_dsServiceName(self):
- '''get the NTDS DN from the rootDSE'''
+ """get the NTDS DN from the rootDSE"""
res = self.search(base="", scope=ldb.SCOPE_BASE, attrs=["dsServiceName"])
return str(res[0]["dsServiceName"][0])
def get_serverName(self):
- '''get the server DN from the rootDSE'''
+ """get the server DN from the rootDSE"""
res = self.search(base="", scope=ldb.SCOPE_BASE, attrs=["serverName"])
return str(res[0]["serverName"][0])
def dns_lookup(self, dns_name, dns_partition=None):
- '''Do a DNS lookup in the database, returns the NDR database structures'''
+ """Do a DNS lookup in the database, returns the NDR database structures"""
if dns_partition is None:
return dsdb_dns.lookup(self, dns_name)
else:
dns_partition=dns_partition)
def dns_extract(self, el):
- '''Return the NDR database structures from a dnsRecord element'''
+ """Return the NDR database structures from a dnsRecord element"""
return dsdb_dns.extract(self, el)
def dns_replace(self, dns_name, new_records):
- '''Do a DNS modification on the database, sets the NDR database
+ """Do a DNS modification on the database, sets the NDR database
structures on a DNS name
- '''
+ """
return dsdb_dns.replace(self, dns_name, new_records)
def dns_replace_by_dn(self, dn, new_records):
- '''Do a DNS modification on the database, sets the NDR database
+ """Do a DNS modification on the database, sets the NDR database
structures on a LDB DN
This routine is important because if the last record on the DN
is removed, this routine will put a tombstone in the record.
- '''
+ """
return dsdb_dns.replace_by_dn(self, dn, new_records)
def garbage_collect_tombstones(self, dn, current_time,
tombstone_lifetime=None):
- '''garbage_collect_tombstones(lp, samdb, [dn], current_time, tombstone_lifetime)
- -> (num_objects_expunged, num_links_expunged)'''
+ """garbage_collect_tombstones(lp, samdb, [dn], current_time, tombstone_lifetime)
+ -> (num_objects_expunged, num_links_expunged)"""
if not is_ad_dc_built():
raise SamDBError('Cannot garbage collect tombstones: '
tombstone_lifetime)
def create_own_rid_set(self):
- '''create a RID set for this DSA'''
+ """create a RID set for this DSA"""
return dsdb._dsdb_create_own_rid_set(self)
def allocate_rid(self):
- '''return a new RID from the RID Pool on this DSA'''
+ """return a new RID from the RID Pool on this DSA"""
return dsdb._dsdb_allocate_rid(self)
def next_free_rid(self):
- '''return the next free RID from the RID Pool on this DSA.
+ """return the next free RID from the RID Pool on this DSA.
:note: This function is not intended for general use, and care must be
taken if it is used to generate objectSIDs. The returned RID is not
formally reserved for use, creating the possibility of duplicate
objectSIDs.
- '''
+ """
rid, _ = self.free_rid_bounds()
return rid
def free_rid_bounds(self):
- '''return the low and high bounds (inclusive) of RIDs that are
+ """return the low and high bounds (inclusive) of RIDs that are
available for use in this DSA's current RID pool.
:note: This function is not intended for general use, and care must be
taken if it is used to generate objectSIDs. The returned range of
RIDs is not formally reserved for use, creating the possibility of
duplicate objectSIDs.
- '''
+ """
# Get DN of this server's RID Set
server_name_dn = ldb.Dn(self, self.get_serverName())
res = self.search(base=server_name_dn,
return next_rid, prev_pool_hi
def normalize_dn_in_domain(self, dn):
- '''return a new DN expanded by adding the domain DN
+ """return a new DN expanded by adding the domain DN
If the dn is already a child of the domain DN, just
return it as-is.
:param dn: relative dn
- '''
+ """
domain_dn = ldb.Dn(self, self.domain_dn())
if isinstance(dn, ldb.Dn):
return full_dn
class dsdb_Dn(object):
- '''a class for binary DN'''
+ """a class for binary DN"""
def __init__(self, samdb, dnstring, syntax_oid=None):
- '''create a dsdb_Dn'''
+ """create a dsdb_Dn"""
if syntax_oid is None:
# auto-detect based on string
if dnstring.startswith("B:"):
return self.prefix + str(self.dn.extended_str(mode=1))
def __cmp__(self, other):
- ''' compare dsdb_Dn values similar to parsed_dn_compare()'''
+ """ compare dsdb_Dn values similar to parsed_dn_compare()"""
dn1 = self
dn2 = other
guid1 = dn1.dn.get_extended_component("GUID")
return self.__cmp__(other) >= 0
def get_binary_integer(self):
- '''return binary part of a dsdb_Dn as an integer, or None'''
+ """return binary part of a dsdb_Dn as an integer, or None"""
if self.prefix == '':
return None
return int(self.binary, 16)
def get_bytes(self):
- '''return binary as a byte string'''
+ """return binary as a byte string"""
return binascii.unhexlify(self.binary)