from samba.gp.gp_centrify_sudoers_ext import gp_centrify_sudoers_ext
from samba.gp.gp_centrify_crontab_ext import gp_centrify_crontab_ext, \
gp_user_centrify_crontab_ext
+from samba.gp.gp_drive_maps_ext import gp_drive_maps_user_ext
from samba.common import get_bytes
from samba.dcerpc import preg
from samba.ndr import ndr_pack
from samba.gp_parse.gp_pol import GPPolParser
from glob import glob
from configparser import ConfigParser
-from samba.gp.gpclass import get_dc_hostname
+from samba.gp.gpclass import get_dc_hostname, expand_pref_variables
from samba import Ldb
import ldb as _ldb
from samba.auth import system_session
</PolFile>
"""
+drive_maps_xml = b"""<?xml version="1.0" encoding="utf-8"?>
+<Drives clsid="{8FDDCC1A-0C3C-43cd-A6B4-71A6DF20DA8C}"><Drive clsid="{935D1B74-9CB8-4e3c-9914-7DD559B7A417}" name="A:" status="A:" image="2" changed="2023-03-08 19:23:02" uid="{1641E121-DEF3-418D-A428-2D8DF4749504}" bypassErrors="1"><Properties action="U" thisDrive="NOCHANGE" allDrives="NOCHANGE" userName="" path="\\\\example.com\\test" label="TEST" persistent="1" useLetter="0" letter="A"/></Drive>
+</Drives>
+"""
+
def days2rel_nttime(val):
seconds = 60
minutes = 60
# Unstage the Registry.pol files
unstage_file(reg_pol)
unstage_file(reg_pol2)
+
+ def test_gp_drive_maps_user_ext(self):
+ local_path = self.lp.cache_path('gpo_cache')
+ guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+ xml_path = os.path.join(local_path, policies, guid,
+ 'USER/PREFERENCES/DRIVES/DRIVES.XML')
+ cache_dir = self.lp.get('cache directory')
+ store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+ machine_creds = Credentials()
+ machine_creds.guess(self.lp)
+ machine_creds.set_machine_account()
+
+ # Initialize the group policy extension
+ ext = gp_drive_maps_user_ext(self.lp, machine_creds,
+ os.environ.get('DC_USERNAME'), store)
+
+ ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+ if ads.connect():
+ gpos = ads.get_gpo_list(machine_creds.get_username())
+
+ # Stage the Drives.xml file with test data
+ ret = stage_file(xml_path, drive_maps_xml)
+ self.assertTrue(ret, 'Could not create the target %s' % xml_path)
+
+ # Process all gpos, intentionally skipping the privilege drop
+ ext.process_group_policy([], gpos)
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ entry = b'@hourly gio mount smb://example.com/test'
+ self.assertIn(entry, crontab,
+ 'The crontab entry was not installed')
+
+ # Check that a call to gpupdate --rsop also succeeds
+ ret = rsop(self.lp)
+ self.assertEquals(ret, 0, 'gpupdate --rsop failed!')
+
+ # Unstage the Drives.xml
+ unstage_file(xml_path)
+
+ # Modify the policy and ensure it is updated
+ xml_conf = etree.fromstring(drive_maps_xml.strip())
+ drives = xml_conf.findall('Drive')
+ props = drives[0].find('Properties')
+ props.attrib['action'] = 'D'
+ ret = stage_file(xml_path,
+ etree.tostring(xml_conf, encoding='unicode'))
+ self.assertTrue(ret, 'Could not create the target %s' % xml_path)
+
+ # Process all gpos, intentionally skipping the privilege drop
+ ext.process_group_policy([], gpos)
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ self.assertNotIn(entry+b'\n', crontab,
+ 'The old crontab entry was not removed')
+ entry = entry + b' --unmount'
+ self.assertIn(entry, crontab,
+ 'The crontab entry was not installed')
+
+ # Remove policy
+ gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
+ del_gpos = get_deleted_gpos_list(gp_db, [])
+ ext.process_group_policy(del_gpos, [])
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ self.assertNotIn(entry, crontab,
+ 'Unapply failed to cleanup crontab entry')
+
+ # Unstage the Drives.xml
+ unstage_file(xml_path)
+
+ # Modify the policy to set 'run once', ensure there is no cron entry
+ xml_conf = etree.fromstring(drive_maps_xml.strip())
+ drives = xml_conf.findall('Drive')
+ filters = etree.SubElement(drives[0], 'Filters')
+ etree.SubElement(filters, 'FilterRunOnce')
+ ret = stage_file(xml_path,
+ etree.tostring(xml_conf, encoding='unicode'))
+ self.assertTrue(ret, 'Could not create the target %s' % xml_path)
+
+ # Process all gpos, intentionally skipping the privilege drop
+ ext.process_group_policy([], gpos)
+ # Dump the fake crontab setup for testing
+ p = Popen(['crontab', '-l'], stdout=PIPE)
+ crontab, _ = p.communicate()
+ entry = b'@hourly gio mount smb://example.com/test'
+ self.assertNotIn(entry, crontab,
+ 'The crontab entry was added despite run-once request')
+
+ # Remove policy
+ gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
+ del_gpos = get_deleted_gpos_list(gp_db, [])
+ ext.process_group_policy(del_gpos, [])
+
+ # Unstage the Drives.xml
+ unstage_file(xml_path)
+
+ def test_expand_pref_variables(self):
+ cache_path = self.lp.cache_path(os.path.join('gpo_cache'))
+ gpt_path = 'TEST'
+ username = 'test_uname'
+ test_vars = { 'AppDataDir': os.path.expanduser('~/.config'),
+ 'ComputerName': self.lp.get('netbios name'),
+ 'DesktopDir': os.path.expanduser('~/Desktop'),
+ 'DomainName': self.lp.get('realm'),
+ 'GptPath': os.path.join(cache_path,
+ check_safe_path(gpt_path).upper()),
+ 'LogonDomain': self.lp.get('realm'),
+ 'LogonUser': username,
+ 'SystemDrive': '/',
+ 'TempDir': '/tmp'
+ }
+ for exp_var, val in test_vars.items():
+ self.assertEqual(expand_pref_variables('%%%s%%' % exp_var,
+ gpt_path,
+ self.lp,
+ username),
+ val, 'Failed to expand variable %s' % exp_var)
+ # With the time variables, we can't test for an exact time, so let's do
+ # simple checks instead.
+ time_vars = ['DateTime', 'DateTimeEx', 'LocalTime',
+ 'LocalTimeEx', 'TimeStamp']
+ for time_var in time_vars:
+ self.assertNotEqual(expand_pref_variables('%%%s%%' % time_var,
+ gpt_path,
+ self.lp,
+ username),
+ None, 'Failed to expand variable %s' % time_var)
+
+ # Here we test to ensure undefined preference variables cause an error.
+ # The reason for testing these is to ensure we don't apply nonsense
+ # policies when they can't be defined. Also, these tests will fail if
+ # one of these is implemented in the future (forcing us to write a test
+ # anytime these are implemented).
+ undef_vars = ['BinaryComputerSid',
+ 'BinaryUserSid',
+ 'CommonAppdataDir',
+ 'CommonDesktopDir',
+ 'CommonFavoritesDir',
+ 'CommonProgramsDir',
+ 'CommonStartUpDir',
+ 'CurrentProccessId',
+ 'CurrentThreadId',
+ 'FavoritesDir',
+ 'GphPath',
+ 'GroupPolicyVersion',
+ 'LastDriveMapped',
+ 'LastError',
+ 'LastErrorText',
+ 'LdapComputerSid',
+ 'LdapUserSid',
+ 'LogonServer',
+ 'LogonUserSid',
+ 'MacAddress',
+ 'NetPlacesDir',
+ 'OsVersion',
+ 'ProgramFilesDir',
+ 'ProgramsDir',
+ 'RecentDocumentsDir',
+ 'ResultCode',
+ 'ResultText',
+ 'ReversedComputerSid',
+ 'ReversedUserSid',
+ 'SendToDir',
+ 'StartMenuDir',
+ 'StartUpDir',
+ 'SystemDir',
+ 'TraceFile',
+ 'WindowsDir'
+ ]
+ for undef_var in undef_vars:
+ try:
+ expand_pref_variables('%%%s%%' % undef_var, gpt_path, self.lp)
+ except NameError:
+ pass
+ else:
+ self.fail('Undefined variable %s caused no error' % undef_var)