--- /dev/null
+#!/usr/bin/python3
+import optparse
+import os, sys
+from shutil import copy
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+ parser = optparse.OptionParser('crontab <file> [options]')
+ parser.add_option('-l', action="store_true")
+ parser.add_option('-u')
+
+ (opts, args) = parser.parse_args()
+
+ # Use a dir we can write to in the testenv
+ if 'LOCAL_PATH' in os.environ:
+ data_dir = os.path.realpath(os.environ.get('LOCAL_PATH'))
+ else:
+ data_dir = os.path.dirname(os.path.realpath(__file__))
+ dump_file = os.path.join(data_dir, 'crontab.dump')
+ if opts.u:
+ assert opts.u == os.environ.get('DC_USERNAME')
+ if len(args) == 1:
+ assert os.path.exists(args[0])
+ copy(args[0], dump_file)
+ elif opts.l:
+ if os.path.exists(dump_file):
+ with open(dump_file, 'r') as r:
+ print(r.read())
check_guid, parse_gpext_conf, atomic_write_conf, get_deleted_gpos_list
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile, TemporaryDirectory
+from samba import gpclass
+# Disable privilege dropping for testing
+gpclass.drop_privileges = lambda _, func, *args : func(*args)
from samba.gp_sec_ext import gp_krb_ext, gp_access_ext
-from samba.gp_scripts_ext import gp_scripts_ext
+from samba.gp_scripts_ext import gp_scripts_ext, gp_user_scripts_ext
from samba.gp_sudoers_ext import gp_sudoers_ext
from samba.vgp_sudoers_ext import vgp_sudoers_ext
from samba.vgp_symlink_ext import vgp_symlink_ext
# Unstage the Registry.pol file
unstage_file(reg_pol)
+
+ def test_gp_user_scripts_ext(self):
+ local_path = self.lp.cache_path('gpo_cache')
+ guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+ reg_pol = os.path.join(local_path, policies, guid,
+ 'USER/REGISTRY.POL')
+ logger = logging.getLogger('gpo_tests')
+ cache_dir = self.lp.get('cache directory')
+ store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+ machine_creds = Credentials()
+ machine_creds.guess(self.lp)
+ machine_creds.set_machine_account()
+
+ # Initialize the group policy extension
+ ext = gp_user_scripts_ext(logger, self.lp, machine_creds,
+ os.environ.get('DC_USERNAME'), store)
+
+ ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+ if ads.connect():
+ gpos = ads.get_gpo_list(machine_creds.get_username())
+
+ reg_key = b'Software\\Policies\\Samba\\Unix Settings'
+ sections = { b'%s\\Daily Scripts' % reg_key : b'@daily',
+ b'%s\\Monthly Scripts' % reg_key : b'@monthly',
+ b'%s\\Weekly Scripts' % reg_key : b'@weekly',
+ b'%s\\Hourly Scripts' % reg_key : b'@hourly' }
+ for keyname in sections.keys():
+ # Stage the Registry.pol file with test data
+ stage = preg.file()
+ e = preg.entry()
+ e.keyname = keyname
+ e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
+ e.type = 1
+ e.data = b'echo hello world'
+ stage.num_entries = 1
+ stage.entries = [e]
+ ret = stage_file(reg_pol, ndr_pack(stage))
+ self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+ # Process all gpos, intentionally skipping the privilege drop
+ ext.process_group_policy([], gpos)
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ entry = b'%s %s' % (sections[keyname], e.data.encode())
+ self.assertIn(entry, crontab,
+ 'The crontab entry was not installed')
+
+ # Remove policy
+ gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
+ del_gpos = get_deleted_gpos_list(gp_db, [])
+ ext.process_group_policy(del_gpos, [])
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ self.assertNotIn(entry, crontab,
+ 'Unapply failed to cleanup crontab entry')
+
+ # Unstage the Registry.pol file
+ unstage_file(reg_pol)
from samba.gpclass import apply_gp, unapply_gp, GPOStorage, rsop
from samba.gp_sec_ext import gp_krb_ext, gp_access_ext
from samba.gp_ext_loader import get_gp_client_side_extensions
-from samba.gp_scripts_ext import gp_scripts_ext
+from samba.gp_scripts_ext import gp_scripts_ext, gp_user_scripts_ext
from samba.gp_sudoers_ext import gp_sudoers_ext
from samba.vgp_sudoers_ext import vgp_sudoers_ext
from samba.gp_smb_conf_ext import gp_smb_conf_ext
gp_extensions.append(gp_cert_auto_enroll_ext)
gp_extensions.extend(machine_exts)
elif opts.target == 'User':
+ gp_extensions.append(gp_user_scripts_ext)
gp_extensions.extend(user_exts)
if opts.rsop: