From: David Mulder Date: Tue, 25 Jul 2023 19:23:10 +0000 (-0600) Subject: gp: Ensure centrify crontab user policy performs proper cleanup X-Git-Tag: tevent-0.16.0~1309 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ab2cda79280e3a3ce4b806f96a5896d2a463b5ea;p=thirdparty%2Fsamba.git gp: Ensure centrify crontab user policy performs proper cleanup This resolves cleanup issues for user and group centrify compatible policies. It also ensures the crontab policies use functions from the scripts policy, to avoid code duplication and simplify cleanup. Signed-off-by: David Mulder Reviewed-by: Douglas Bagnall Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/gp/gp_centrify_crontab_ext.py b/python/samba/gp/gp_centrify_crontab_ext.py index e8ffd8bec9f..e532416d224 100644 --- a/python/samba/gp/gp_centrify_crontab_ext.py +++ b/python/samba/gp/gp_centrify_crontab_ext.py @@ -16,9 +16,11 @@ import os, re from subprocess import Popen, PIPE -from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier -from hashlib import blake2b +from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier, \ + gp_misc_applier from tempfile import NamedTemporaryFile +from samba.gp.gp_scripts_ext import fetch_crontab, install_crontab, \ + install_user_crontab intro = ''' ### autogenerated by samba @@ -92,73 +94,44 @@ class gp_centrify_crontab_ext(gp_pol_ext, gp_file_applier): output[str(self)].append(e.data) return output -def fetch_crontab(username): - p = Popen(['crontab', '-l', '-u', username], stdout=PIPE, stderr=PIPE) - out, err = p.communicate() - if p.returncode != 0: - raise RuntimeError('Failed to read the crontab: %s' % err) - m = re.findall('%s(.*)%s' % (intro, end), out.decode(), re.DOTALL) - if len(m) == 1: - entries = m[0].strip().split('\n') - else: - entries = [] - m = re.findall('(.*)%s.*%s(.*)' % (intro, end), out.decode(), re.DOTALL) - if len(m) == 1: - others = '\n'.join([l.strip() for l in m[0]]) - else: - others = out.decode() - return others, entries +class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext, gp_misc_applier): + def unapply(self, guid, attribute, entry): + others, entries = fetch_crontab(self.username) + if entry in entries: + entries.remove(entry) + install_user_crontab(self.username, others, entries) + self.cache_remove_attribute(guid, attribute) -def install_crontab(fname, username): - p = Popen(['crontab', fname, '-u', username], stdout=PIPE, stderr=PIPE) - _, err = p.communicate() - if p.returncode != 0: - raise RuntimeError('Failed to install crontab: %s' % err) + def apply(self, guid, attribute, entry): + old_val = self.cache_get_attribute_value(guid, attribute) + others, entries = fetch_crontab(self.username) + if not old_val or entry not in entries: + entries.append(entry) + install_user_crontab(self.username, others, entries) + self.cache_add_attribute(guid, attribute, entry) -class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext): def process_group_policy(self, deleted_gpo_list, changed_gpo_list): for guid, settings in deleted_gpo_list: - self.gp_db.set_guid(guid) if str(self) in settings: - others, entries = fetch_crontab(self.username) for attribute, entry in settings[str(self)].items(): - if entry in entries: - entries.remove(entry) - self.gp_db.delete(str(self), attribute) - with NamedTemporaryFile() as f: - if len(entries) > 0: - f.write('\n'.join([others, intro, - '\n'.join(entries), end]).encode()) - else: - f.write(others.encode()) - f.flush() - install_crontab(f.name, self.username) - self.gp_db.commit() + self.unapply(guid, attribute, entry) for gpo in changed_gpo_list: if gpo.file_sys_path: section = \ 'Software\\Policies\\Centrify\\UnixSettings\\CrontabEntries' - self.gp_db.set_guid(gpo.name) pol_file = 'USER/Registry.pol' path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = drop_privileges('root', self.parse, path) if not pol_conf: continue + attrs = [] for e in pol_conf.entries: if e.keyname == section and e.data.strip(): - attribute = blake2b(e.data.encode()).hexdigest() - old_val = self.gp_db.retrieve(str(self), attribute) - others, entries = fetch_crontab(self.username) - if not old_val or e.data not in entries: - entries.append(e.data) - with NamedTemporaryFile() as f: - f.write('\n'.join([others, intro, - '\n'.join(entries), end]).encode()) - f.flush() - install_crontab(f.name, self.username) - self.gp_db.store(str(self), attribute, e.data) - self.gp_db.commit() + attribute = self.generate_attribute(e.data) + attrs.append(attribute) + self.apply(gpo.name, attribute, e.data) + self.clean(gpo.name, keep=attrs) def rsop(self, gpo): return super().rsop(gpo, target='USER') diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo deleted file mode 100644 index 94ba8c2b5e0..00000000000 --- a/selftest/knownfail.d/gpo +++ /dev/null @@ -1 +0,0 @@ -^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_user_centrify_crontab_ext