]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
gpo: Test Group Policy User Scripts
authorDavid Mulder <dmulder@suse.com>
Tue, 20 Jul 2021 17:13:21 +0000 (11:13 -0600)
committerJeremy Allison <jra@samba.org>
Fri, 13 Aug 2021 19:14:30 +0000 (19:14 +0000)
Signed-off-by: David Mulder <dmulder@suse.com>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/gp_scripts_ext.py
python/samba/tests/bin/crontab [new file with mode: 0755]
python/samba/tests/gpo.py
selftest/knownfail.d/gpo [new file with mode: 0644]
source4/scripting/bin/samba-gpupdate

index 80e2262019d681b412922381a130a6693dc3cfc9..70e668159a2b2db71d957cc2df763f0127b579aa 100644 (file)
@@ -73,9 +73,9 @@ class gp_scripts_ext(gp_pol_ext):
                                 self.gp_db.store(str(self), attribute, f.name)
                         self.gp_db.commit()
 
-    def rsop(self, gpo):
+    def rsop(self, gpo, target='MACHINE'):
         output = {}
-        pol_file = 'MACHINE/Registry.pol'
+        pol_file = '%s/Registry.pol' % target
         if gpo.file_sys_path:
             path = os.path.join(gpo.file_sys_path, pol_file)
             pol_conf = self.parse(path)
@@ -88,3 +88,10 @@ class gp_scripts_ext(gp_pol_ext):
                         output[key] = []
                     output[key].append(e.data)
         return output
+
+class gp_user_scripts_ext(gp_scripts_ext):
+    def process_group_policy(self, deleted_gpo_list, changed_gpo_list):
+        pass
+
+    def rsop(self, gpo):
+        return super().rsop(gpo, target='USER')
diff --git a/python/samba/tests/bin/crontab b/python/samba/tests/bin/crontab
new file mode 100755 (executable)
index 0000000..764d584
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/python3
+import optparse
+import os, sys
+from shutil import copy
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+    parser = optparse.OptionParser('crontab <file> [options]')
+    parser.add_option('-l', action="store_true")
+    parser.add_option('-u')
+
+    (opts, args) = parser.parse_args()
+
+    # Use a dir we can write to in the testenv
+    if 'LOCAL_PATH' in os.environ:
+        data_dir = os.path.realpath(os.environ.get('LOCAL_PATH'))
+    else:
+        data_dir = os.path.dirname(os.path.realpath(__file__))
+    dump_file = os.path.join(data_dir, 'crontab.dump')
+    if opts.u:
+        assert opts.u == os.environ.get('DC_USERNAME')
+    if len(args) == 1:
+        assert os.path.exists(args[0])
+        copy(args[0], dump_file)
+    elif opts.l:
+        if os.path.exists(dump_file):
+            with open(dump_file, 'r') as r:
+                print(r.read())
index 52510e505952f541d06500d8a39798af10f02add..6fdf9664f48f79e8f4055a7a625cc85be9c2516a 100644 (file)
@@ -24,8 +24,11 @@ from samba.gpclass import check_refresh_gpo_list, check_safe_path, \
     check_guid, parse_gpext_conf, atomic_write_conf, get_deleted_gpos_list
 from subprocess import Popen, PIPE
 from tempfile import NamedTemporaryFile, TemporaryDirectory
+from samba import gpclass
+# Disable privilege dropping for testing
+gpclass.drop_privileges = lambda _, func, *args : func(*args)
 from samba.gp_sec_ext import gp_krb_ext, gp_access_ext
-from samba.gp_scripts_ext import gp_scripts_ext
+from samba.gp_scripts_ext import gp_scripts_ext, gp_user_scripts_ext
 from samba.gp_sudoers_ext import gp_sudoers_ext
 from samba.vgp_sudoers_ext import vgp_sudoers_ext
 from samba.vgp_symlink_ext import vgp_symlink_ext
@@ -2002,3 +2005,64 @@ class GPOTests(tests.TestCase):
 
         # Unstage the Registry.pol file
         unstage_file(reg_pol)
+
+    def test_gp_user_scripts_ext(self):
+        local_path = self.lp.cache_path('gpo_cache')
+        guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+        reg_pol = os.path.join(local_path, policies, guid,
+                               'USER/REGISTRY.POL')
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        # Initialize the group policy extension
+        ext = gp_user_scripts_ext(logger, self.lp, machine_creds,
+                                  os.environ.get('DC_USERNAME'), store)
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        reg_key = b'Software\\Policies\\Samba\\Unix Settings'
+        sections = { b'%s\\Daily Scripts' % reg_key : b'@daily',
+                     b'%s\\Monthly Scripts' % reg_key : b'@monthly',
+                     b'%s\\Weekly Scripts' % reg_key : b'@weekly',
+                     b'%s\\Hourly Scripts' % reg_key : b'@hourly' }
+        for keyname in sections.keys():
+            # Stage the Registry.pol file with test data
+            stage = preg.file()
+            e = preg.entry()
+            e.keyname = keyname
+            e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
+            e.type = 1
+            e.data = b'echo hello world'
+            stage.num_entries = 1
+            stage.entries = [e]
+            ret = stage_file(reg_pol, ndr_pack(stage))
+            self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+            # Process all gpos, intentionally skipping the privilege drop
+            ext.process_group_policy([], gpos)
+            # Dump the fake crontab setup for testing
+            p = Popen(['crontab', '-l'], stdout=PIPE)
+            crontab, _ = p.communicate()
+            entry = b'%s %s' % (sections[keyname], e.data.encode())
+            self.assertIn(entry, crontab,
+                'The crontab entry was not installed')
+
+            # Remove policy
+            gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
+            del_gpos = get_deleted_gpos_list(gp_db, [])
+            ext.process_group_policy(del_gpos, [])
+            # Dump the fake crontab setup for testing
+            p = Popen(['crontab', '-l'], stdout=PIPE)
+            crontab, _ = p.communicate()
+            self.assertNotIn(entry, crontab,
+                'Unapply failed to cleanup crontab entry')
+
+            # Unstage the Registry.pol file
+            unstage_file(reg_pol)
diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo
new file mode 100644 (file)
index 0000000..138d933
--- /dev/null
@@ -0,0 +1 @@
+^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_user_scripts_ext
index 4d13f9dae10acfd90240fe76b1618b2950eb4575..01c50f0b6b97980d9d7964a24d7f7db390e404a7 100755 (executable)
@@ -32,7 +32,7 @@ from samba import getopt as options
 from samba.gpclass import apply_gp, unapply_gp, GPOStorage, rsop
 from samba.gp_sec_ext import gp_krb_ext, gp_access_ext
 from samba.gp_ext_loader import get_gp_client_side_extensions
-from samba.gp_scripts_ext import gp_scripts_ext
+from samba.gp_scripts_ext import gp_scripts_ext, gp_user_scripts_ext
 from samba.gp_sudoers_ext import gp_sudoers_ext
 from samba.vgp_sudoers_ext import vgp_sudoers_ext
 from samba.gp_smb_conf_ext import gp_smb_conf_ext
@@ -123,6 +123,7 @@ if __name__ == "__main__":
         gp_extensions.append(gp_cert_auto_enroll_ext)
         gp_extensions.extend(machine_exts)
     elif opts.target == 'User':
+        gp_extensions.append(gp_user_scripts_ext)
         gp_extensions.extend(user_exts)
 
     if opts.rsop: