From: Christian Merten Date: Thu, 15 Sep 2022 08:08:47 +0000 (+0200) Subject: samba-tool dsacl: Create helper functions to remove code duplication X-Git-Tag: talloc-2.4.0~798 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=c9902b0574fab9b3e10f440898d4980383dfe3c7;p=thirdparty%2Fsamba.git samba-tool dsacl: Create helper functions to remove code duplication Make multiple methods of dsacl command classes separate helper functions to avoid code duplication. Signed-off-by: Christian Merten Reviewed-by: Douglas Bagnall Reviewed-by: Jeremy Allison --- diff --git a/python/samba/netcmd/dsacl.py b/python/samba/netcmd/dsacl.py index b4d3ddb2d3a..7a9f960d6bf 100644 --- a/python/samba/netcmd/dsacl.py +++ b/python/samba/netcmd/dsacl.py @@ -42,6 +42,37 @@ from samba.netcmd import ( Option, ) +def find_trustee_sid(samdb, trusteedn): + res = samdb.search(base=trusteedn, expression="(objectClass=*)", + scope=SCOPE_BASE) + assert(len(res) == 1) + return ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) + + +def modify_descriptor(samdb, object_dn, desc, controls=None): + assert(isinstance(desc, security.descriptor)) + m = ldb.Message() + m.dn = ldb.Dn(samdb, object_dn) + m["nTSecurityDescriptor"] = ldb.MessageElement( + (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE, + "nTSecurityDescriptor") + samdb.modify(m) + + +def read_descriptor(samdb, object_dn): + res = samdb.search(base=object_dn, scope=SCOPE_BASE, + attrs=["nTSecurityDescriptor"]) + # we should theoretically always have an SD + assert(len(res) == 1) + desc = res[0]["nTSecurityDescriptor"][0] + return ndr_unpack(security.descriptor, desc) + + +def get_domain_sid(samdb): + res = samdb.search(base=samdb.domain_dn(), + expression="(objectClass=*)", scope=SCOPE_BASE) + return ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) + class cmd_dsacl_set(Command): """Modify access list on a directory object.""" @@ -82,41 +113,13 @@ class cmd_dsacl_set(Command): type="string"), ] - def find_trustee_sid(self, samdb, trusteedn): - res = samdb.search(base=trusteedn, expression="(objectClass=*)", - scope=SCOPE_BASE) - assert(len(res) == 1) - return ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) - - def modify_descriptor(self, samdb, object_dn, desc, controls=None): - assert(isinstance(desc, security.descriptor)) - m = ldb.Message() - m.dn = ldb.Dn(samdb, object_dn) - m["nTSecurityDescriptor"] = ldb.MessageElement( - (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE, - "nTSecurityDescriptor") - samdb.modify(m) - - def read_descriptor(self, samdb, object_dn): - res = samdb.search(base=object_dn, scope=SCOPE_BASE, - attrs=["nTSecurityDescriptor"]) - # we should theoretically always have an SD - assert(len(res) == 1) - desc = res[0]["nTSecurityDescriptor"][0] - return ndr_unpack(security.descriptor, desc) - - def get_domain_sid(self, samdb): - res = samdb.search(base=samdb.domain_dn(), - expression="(objectClass=*)", scope=SCOPE_BASE) - return ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) - def add_ace(self, samdb, object_dn, new_ace): """Add new ace explicitly.""" - desc = self.read_descriptor(samdb, object_dn) - new_ace = security.descriptor.from_sddl("D:" + new_ace,self.get_domain_sid(samdb)) + desc = read_descriptor(samdb, object_dn) + new_ace = security.descriptor.from_sddl("D:" + new_ace, get_domain_sid(samdb)) new_ace_list = re.findall(r"\(.*?\)",new_ace.as_sddl()) for new_ace in new_ace_list: - desc_sddl = desc.as_sddl(self.get_domain_sid(samdb)) + desc_sddl = desc.as_sddl(get_domain_sid(samdb)) # TODO add bindings for descriptor manipulation and get rid of this desc_aces = re.findall(r"\(.*?\)", desc_sddl) for ace in desc_aces: @@ -128,12 +131,12 @@ class cmd_dsacl_set(Command): desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace + desc_sddl[desc_sddl.index("("):] else: desc_sddl = desc_sddl + new_ace - desc = security.descriptor.from_sddl(desc_sddl, self.get_domain_sid(samdb)) - self.modify_descriptor(samdb, object_dn, desc) + desc = security.descriptor.from_sddl(desc_sddl, get_domain_sid(samdb)) + modify_descriptor(samdb, object_dn, desc) def print_acl(self, samdb, object_dn, new=False): - desc = self.read_descriptor(samdb, object_dn) - desc_sddl = desc.as_sddl(self.get_domain_sid(samdb)) + desc = read_descriptor(samdb, object_dn) + desc_sddl = desc.as_sddl(get_domain_sid(samdb)) if new: self.outf.write("new descriptor for %s:\n" % object_dn) else: @@ -165,7 +168,7 @@ class cmd_dsacl_set(Command): 'repl-sync': GUID_DRS_REPL_SYNCRONIZE, 'ro-repl-secret-sync': GUID_DRS_RO_REPL_SECRET_SYNC, } - sid = self.find_trustee_sid(samdb, trusteedn) + sid = find_trustee_sid(samdb, trusteedn) if sddl: new_ace = sddl elif action == "allow": @@ -198,22 +201,9 @@ class cmd_dsacl_get(Command): type="string"), ] - def read_descriptor(self, samdb, object_dn): - res = samdb.search(base=object_dn, scope=SCOPE_BASE, - attrs=["nTSecurityDescriptor"]) - # we should theoretically always have an SD - assert(len(res) == 1) - desc = res[0]["nTSecurityDescriptor"][0] - return ndr_unpack(security.descriptor, desc) - - def get_domain_sid(self, samdb): - res = samdb.search(base=samdb.domain_dn(), - expression="(objectClass=*)", scope=SCOPE_BASE) - return ndr_unpack( security.dom_sid,res[0]["objectSid"][0]) - def print_acl(self, samdb, object_dn): - desc = self.read_descriptor(samdb, object_dn) - desc_sddl = desc.as_sddl(self.get_domain_sid(samdb)) + desc = read_descriptor(samdb, object_dn) + desc_sddl = desc.as_sddl(get_domain_sid(samdb)) self.outf.write("descriptor for %s:\n" % object_dn) self.outf.write(desc_sddl + "\n") @@ -248,36 +238,14 @@ class cmd_dsacl_delete(Command): ] def print_acl(self, samdb, object_dn, new=False): - desc = self.read_descriptor(samdb, object_dn) - desc_sddl = desc.as_sddl(self.get_domain_sid(samdb)) + desc = read_descriptor(samdb, object_dn) + desc_sddl = desc.as_sddl(get_domain_sid(samdb)) if new: self.outf.write("new descriptor for %s:\n" % object_dn) else: self.outf.write("old descriptor for %s:\n" % object_dn) self.outf.write(desc_sddl + "\n") - def modify_descriptor(self, samdb, object_dn, desc, controls=None): - assert(isinstance(desc, security.descriptor)) - m = ldb.Message() - m.dn = ldb.Dn(samdb, object_dn) - m["nTSecurityDescriptor"] = ldb.MessageElement( - (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE, - "nTSecurityDescriptor") - samdb.modify(m) - - def read_descriptor(self, samdb, object_dn): - res = samdb.search(base=object_dn, scope=SCOPE_BASE, - attrs=["nTSecurityDescriptor"]) - # we should theoretically always have an SD - assert(len(res) == 1) - desc = res[0]["nTSecurityDescriptor"][0] - return ndr_unpack(security.descriptor, desc) - - def get_domain_sid(self, samdb): - res = samdb.search(base=samdb.domain_dn(), - expression="(objectClass=*)", scope=SCOPE_BASE) - return ndr_unpack(security.dom_sid, res[0]["objectSid"][0]) - def run(self, objectdn, sddl, H=None, credopts=None, sambaopts=None, versionopts=None): lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp)