+++ /dev/null
-#!/usr/bin/python3
-import optparse
-import os, sys, re
-import pickle
-try:
- from firewall.core.rich import Rich_Rule
-except ImportError:
- Rich_Rule = None
-
-sys.path.insert(0, "bin/python")
-
-if __name__ == "__main__":
- parser = optparse.OptionParser('firewall-cmd [options]')
- parser.add_option('--list-interfaces', default=False, action="store_true")
- parser.add_option('--permanent', default=False, action="store_true")
- parser.add_option('--new-zone')
- parser.add_option('--get-zones', default=False, action="store_true")
- parser.add_option('--delete-zone')
- parser.add_option('--zone')
- parser.add_option('--add-interface')
- parser.add_option('--add-rich-rule')
- parser.add_option('--remove-rich-rule')
- parser.add_option('--list-rich-rules', default=False, action="store_true")
-
- (opts, args) = parser.parse_args()
-
- # Use a dir we can write to in the testenv
- if 'LOCAL_PATH' in os.environ:
- data_dir = os.path.realpath(os.environ.get('LOCAL_PATH'))
- else:
- data_dir = os.path.dirname(os.path.realpath(__file__))
- dump_file = os.path.join(data_dir, 'firewall-cmd.dump')
- if os.path.exists(dump_file):
- with open(dump_file, 'rb') as r:
- data = pickle.load(r)
- else:
- data = {}
-
- if opts.list_interfaces:
- if not opts.zone: # default zone dummy interface
- print('eth0')
- else:
- assert 'zone_interfaces' in data
- assert opts.zone in data['zone_interfaces'].keys()
- for interface in data['zone_interfaces'][opts.zone]:
- sys.stdout.write('%s ' % interface)
- print()
- elif opts.new_zone:
- if 'zones' not in data:
- data['zones'] = []
- data['zones'].append(opts.new_zone)
- elif opts.get_zones:
- if 'zones' in data:
- for zone in data['zones']:
- sys.stdout.write('%s ' % zone)
- print()
- elif opts.delete_zone:
- assert 'zones' in data
- assert opts.delete_zone in data['zones']
- data['zones'].remove(opts.delete_zone)
- if len(data['zones']) == 0:
- del data['zones']
- if 'zone_interfaces' in data and opts.zone in data['zone_interfaces'].keys():
- del data['zone_interfaces'][opts.zone]
- elif opts.add_interface:
- assert opts.zone
- assert 'zones' in data
- assert opts.zone in data['zones']
- if 'zone_interfaces' not in data:
- data['zone_interfaces'] = {}
- if opts.zone not in data['zone_interfaces'].keys():
- data['zone_interfaces'][opts.zone] = []
- data['zone_interfaces'][opts.zone].append(opts.add_interface)
- elif opts.add_rich_rule:
- assert opts.zone
- if 'rules' not in data:
- data['rules'] = {}
- if opts.zone not in data['rules']:
- data['rules'][opts.zone] = []
- # Test rule parsing if firewalld is installed
- if Rich_Rule:
- # Parsing failure will throw an exception
- data['rules'][opts.zone].append(str(Rich_Rule(rule_str=opts.add_rich_rule)))
- else:
- data['rules'][opts.zone].append(opts.add_rich_rule)
- elif opts.remove_rich_rule:
- assert opts.zone
- assert 'rules' in data
- assert opts.zone in data['rules'].keys()
- if Rich_Rule:
- rich_rule = str(Rich_Rule(rule_str=opts.remove_rich_rule))
- assert rich_rule in data['rules'][opts.zone]
- data['rules'][opts.zone].remove(rich_rule)
- else:
- assert opts.remove_rich_rule in data['rules'][opts.zone]
- data['rules'][opts.zone].remove(opts.remove_rich_rule)
- elif opts.list_rich_rules:
- assert opts.zone
- assert 'rules' in data
- assert opts.zone in data['rules'].keys()
- for rule in data['rules'][opts.zone]:
- print(rule)
-
- if opts.permanent:
- if data == {}:
- if os.path.exists(dump_file):
- os.unlink(dump_file)
- else:
- with open(dump_file, 'wb') as w:
- pickle.dump(data, w)
from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext
from samba.gp_firefox_ext import gp_firefox_ext
from samba.gp_chromium_ext import gp_chromium_ext
-from samba.gp_firewalld_ext import gp_firewalld_ext
import logging
from samba.credentials import Credentials
from samba.gp_msgs_ext import gp_msgs_ext
from samba import Ldb
from samba.auth import system_session
import json
-from shutil import which
realm = os.environ.get('REALM')
policies = realm + '/POLICIES'
}
"""
-firewalld_reg_pol = \
-b"""
-<?xml version="1.0" encoding="utf-8"?>
-<PolFile num_entries="6" signature="PReg" version="1">
- <Entry type="4" type_name="REG_DWORD">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld</Key>
- <ValueName>Zones</ValueName>
- <Value>1</Value>
- </Entry>
- <Entry type="4" type_name="REG_DWORD">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld</Key>
- <ValueName>Rules</ValueName>
- <Value>1</Value>
- </Entry>
- <Entry type="1" type_name="REG_SZ">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld\Rules</Key>
- <ValueName>Rules</ValueName>
- <Value>{"work": [{"rule": {"family": "ipv4"}, "source address": "172.25.1.7", "service name": "ftp", "reject": {}}]}</Value>
- </Entry>
- <Entry type="1" type_name="REG_SZ">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
- <ValueName>**delvals.</ValueName>
- <Value> </Value>
- </Entry>
- <Entry type="1" type_name="REG_SZ">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
- <ValueName>work</ValueName>
- <Value>work</Value>
- </Entry>
- <Entry type="1" type_name="REG_SZ">
- <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
- <ValueName>home</ValueName>
- <Value>home</Value>
- </Entry>
-</PolFile>
-"""
-
def days2rel_nttime(val):
seconds = 60
minutes = 60
def tearDown(self):
super(GPOTests, self).tearDown()
- def nottest_gpo_list(self):
+ def test_gpo_list(self):
global poldir, dspath
ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
if ads.connect():
self.assertEqual(gpos[i].ds_path, ds_paths[i],
'ds_path did not match expected %s' % gpos[i].ds_path)
- def nottest_gpo_ads_does_not_segfault(self):
+ def test_gpo_ads_does_not_segfault(self):
try:
ads = gpo.ADS_STRUCT(self.server, 42, self.creds)
except:
pass
- def nottest_gpt_version(self):
+ def test_gpt_version(self):
global gpt_data
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
self.assertEqual(gpo.gpo_get_sysvol_gpt_version(gpo_path)[1], old_vers,
'gpo_get_sysvol_gpt_version() did not return the expected version')
- def nottest_check_refresh_gpo_list(self):
+ def test_check_refresh_gpo_list(self):
cache = self.lp.cache_path('gpo_cache')
ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
if ads.connect():
self.assertTrue(os.path.exists(gpt_ini),
'GPT.INI was not cached for %s' % guid)
- def nottest_check_refresh_gpo_list_malicious_paths(self):
+ def test_check_refresh_gpo_list_malicious_paths(self):
# the path cannot contain ..
path = '/usr/local/samba/var/locks/sysvol/../../../../../../root/'
self.assertRaises(OSError, check_safe_path, path)
self.assertEqual(result, after, 'check_safe_path() didn\'t'
' correctly convert \\ to /')
- def nottest_check_safe_path_typesafe_name(self):
+ def test_check_safe_path_typesafe_name(self):
path = '\\\\toady.suse.de\\SysVol\\toady.suse.de\\Policies\\' \
'{31B2F340-016D-11D2-945F-00C04FB984F9}\\GPT.INI'
expected_path = 'toady.suse.de/Policies/' \
self.assertEqual(result, expected_path,
'check_safe_path unable to detect variable case sysvol components')
- def nottest_gpt_ext_register(self):
+ def test_gpt_ext_register(self):
this_path = os.path.dirname(os.path.realpath(__file__))
samba_path = os.path.realpath(os.path.join(this_path, '../../../'))
ext_path = os.path.join(samba_path, 'python/samba/gp_sec_ext.py')
parser.remove_section('test_section')
atomic_write_conf(lp, parser)
- def nottest_gp_log_get_applied(self):
+ def test_gp_log_get_applied(self):
local_path = self.lp.get('path', 'sysvol')
guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
'{6AC1786C-016F-11D2-945F-00C04FB984F9}']
ret = gpupdate_unapply(self.lp)
self.assertEqual(ret, 0, 'gpupdate unapply failed')
- def nottest_process_group_policy(self):
+ def test_process_group_policy(self):
local_path = self.lp.cache_path('gpo_cache')
guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
'{6AC1786C-016F-11D2-945F-00C04FB984F9}']
gpttmpl = gpofile % (local_path, guid)
unstage_file(gpttmpl)
- def nottest_gp_scripts(self):
+ def test_gp_scripts(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_gp_sudoers(self):
+ def test_gp_sudoers(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_vgp_sudoers(self):
+ def test_vgp_sudoers(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
# Unstage the Registry.pol file
unstage_file(manifest)
- def nottest_gp_inf_ext_utf(self):
+ def test_gp_inf_ext_utf(self):
logger = logging.getLogger('gpo_tests')
cache_dir = self.lp.get('cache directory')
store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
self.assertEquals(inf_conf.get('Kerberos Policy', 'MaxTicketAge'),
'99', 'MaxTicketAge was not read from the file')
- def nottest_rsop(self):
+ def test_rsop(self):
logger = logging.getLogger('gpo_tests')
cache_dir = self.lp.get('cache directory')
local_path = self.lp.cache_path('gpo_cache')
ret = rsop(self.lp)
self.assertEquals(ret, 0, 'gpupdate --rsop failed!')
- def nottest_gp_unapply(self):
+ def test_gp_unapply(self):
logger = logging.getLogger('gpo_tests')
cache_dir = self.lp.get('cache directory')
local_path = self.lp.cache_path('gpo_cache')
unstage_file(gpofile % guid)
unstage_file(reg_pol % guid)
- def nottest_smb_conf_ext(self):
+ def test_smb_conf_ext(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_gp_motd(self):
+ def test_gp_motd(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_vgp_symlink(self):
+ def test_vgp_symlink(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
# Unstage the manifest.xml file
unstage_file(manifest)
- def nottest_vgp_files(self):
+ def test_vgp_files(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
unstage_file(manifest)
unstage_file(source_file)
- def nottest_vgp_openssh(self):
+ def test_vgp_openssh(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
# Unstage the Registry.pol file
unstage_file(manifest)
- def nottest_vgp_startup_scripts(self):
+ def test_vgp_startup_scripts(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
unstage_file(manifest)
unstage_file(test_script)
- def nottest_vgp_motd(self):
+ def test_vgp_motd(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
# Unstage the Registry.pol file
unstage_file(manifest)
- def nottest_vgp_issue(self):
+ def test_vgp_issue(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
manifest = os.path.join(local_path, policies, guid, 'MACHINE',
# Unstage the manifest.xml file
unstage_file(manifest)
- def nottest_vgp_access(self):
+ def test_vgp_access(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
allow = os.path.join(local_path, policies, guid, 'MACHINE',
unstage_file(allow)
unstage_file(deny)
- def nottest_gnome_settings(self):
+ def test_gnome_settings(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def notest_gp_cert_auto_enroll_ext(self):
+ def test_gp_cert_auto_enroll_ext(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_gp_user_scripts_ext(self):
+ def test_gp_user_scripts_ext(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_gp_firefox_ext(self):
+ def test_gp_firefox_ext(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
- def nottest_gp_chromium_ext(self):
+ def test_gp_chromium_ext(self):
local_path = self.lp.cache_path('gpo_cache')
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
reg_pol = os.path.join(local_path, policies, guid,
# Unstage the Registry.pol file
unstage_file(reg_pol)
-
- def test_gp_firewalld_ext(self):
- local_path = self.lp.cache_path('gpo_cache')
- guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
- reg_pol = os.path.join(local_path, policies, guid,
- 'MACHINE/REGISTRY.POL')
- logger = logging.getLogger('gpo_tests')
- cache_dir = self.lp.get('cache directory')
- store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
-
- machine_creds = Credentials()
- machine_creds.guess(self.lp)
- machine_creds.set_machine_account()
-
- # Initialize the group policy extension
- ext = gp_firewalld_ext(logger, self.lp, machine_creds,
- machine_creds.get_username(), store)
-
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
-
- # Stage the Registry.pol file with test data
- parser = GPPolParser()
- parser.load_xml(etree.fromstring(firewalld_reg_pol.strip()))
- ret = stage_file(reg_pol, ndr_pack(parser.pol_file))
- self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
-
- ext.process_group_policy([], gpos)
-
- # Check that the policy was applied
- firewall_cmd = which('firewall-cmd')
- cmd = [firewall_cmd, '--get-zones']
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- self.assertIn(b'work', out, 'Failed to apply zones')
- self.assertIn(b'home', out, 'Failed to apply zones')
-
- cmd = [firewall_cmd, '--zone=work', '--list-interfaces']
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- self.assertIn(b'eth0', out, 'Failed to set interface on zone')
-
- cmd = [firewall_cmd, '--zone=home', '--list-interfaces']
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- self.assertIn(b'eth0', out, 'Failed to set interface on zone')
-
- cmd = [firewall_cmd, '--zone=work', '--list-rich-rules']
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- rule = b'rule family=ipv4 source address=172.25.1.7 ' + \
- b'service name=ftp reject'
- self.assertEquals(rule, out.strip(), 'Failed to set rich rule')
-
- # Verify RSOP does not fail
- ext.rsop([g for g in gpos if g.name == guid][0])
-
- # Unapply the policy
- gp_db = store.get_gplog(machine_creds.get_username())
- del_gpos = get_deleted_gpos_list(gp_db, [])
- ext.process_group_policy(del_gpos, [])
-
- # Check that the policy was unapplied
- cmd = [firewall_cmd, '--get-zones']
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- self.assertNotIn(b'work', out, 'Failed to unapply zones')
- self.assertNotIn(b'home', out, 'Failed to unapply zones')
-
- # Unstage the Registry.pol file
- unstage_file(reg_pol)