--- /dev/null
+#!/usr/bin/python3
+import optparse
+import os, sys, re
+import pickle
+try:
+ from firewall.core.rich import Rich_Rule
+except ImportError:
+ Rich_Rule = None
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+ parser = optparse.OptionParser('firewall-cmd [options]')
+ parser.add_option('--list-interfaces', default=False, action="store_true")
+ parser.add_option('--permanent', default=False, action="store_true")
+ parser.add_option('--new-zone')
+ parser.add_option('--get-zones', default=False, action="store_true")
+ parser.add_option('--delete-zone')
+ parser.add_option('--zone')
+ parser.add_option('--add-interface')
+ parser.add_option('--add-rich-rule')
+ parser.add_option('--remove-rich-rule')
+ parser.add_option('--list-rich-rules', default=False, action="store_true")
+
+ (opts, args) = parser.parse_args()
+
+ # Use a dir we can write to in the testenv
+ if 'LOCAL_PATH' in os.environ:
+ data_dir = os.path.realpath(os.environ.get('LOCAL_PATH'))
+ else:
+ data_dir = os.path.dirname(os.path.realpath(__file__))
+ dump_file = os.path.join(data_dir, 'firewall-cmd.dump')
+ if os.path.exists(dump_file):
+ with open(dump_file, 'rb') as r:
+ data = pickle.load(r)
+ else:
+ data = {}
+
+ if opts.list_interfaces:
+ if not opts.zone: # default zone dummy interface
+ print('eth0')
+ else:
+ assert 'zone_interfaces' in data
+ assert opts.zone in data['zone_interfaces'].keys()
+ for interface in data['zone_interfaces'][opts.zone]:
+ sys.stdout.write('%s ' % interface)
+ print()
+ elif opts.new_zone:
+ if 'zones' not in data:
+ data['zones'] = []
+ data['zones'].append(opts.new_zone)
+ elif opts.get_zones:
+ if 'zones' in data:
+ for zone in data['zones']:
+ sys.stdout.write('%s ' % zone)
+ print()
+ elif opts.delete_zone:
+ assert 'zones' in data
+ assert opts.delete_zone in data['zones']
+ data['zones'].remove(opts.delete_zone)
+ if len(data['zones']) == 0:
+ del data['zones']
+ if 'zone_interfaces' in data and opts.zone in data['zone_interfaces'].keys():
+ del data['zone_interfaces'][opts.zone]
+ elif opts.add_interface:
+ assert opts.zone
+ assert 'zones' in data
+ assert opts.zone in data['zones']
+ if 'zone_interfaces' not in data:
+ data['zone_interfaces'] = {}
+ if opts.zone not in data['zone_interfaces'].keys():
+ data['zone_interfaces'][opts.zone] = []
+ data['zone_interfaces'][opts.zone].append(opts.add_interface)
+ elif opts.add_rich_rule:
+ assert opts.zone
+ if 'rules' not in data:
+ data['rules'] = {}
+ if opts.zone not in data['rules']:
+ data['rules'][opts.zone] = []
+ # Test rule parsing if firewalld is installed
+ if Rich_Rule:
+ # Parsing failure will throw an exception
+ data['rules'][opts.zone].append(str(Rich_Rule(rule_str=opts.add_rich_rule)))
+ else:
+ data['rules'][opts.zone].append(opts.add_rich_rule)
+ elif opts.remove_rich_rule:
+ assert opts.zone
+ assert 'rules' in data
+ assert opts.zone in data['rules'].keys()
+ if Rich_Rule:
+ rich_rule = str(Rich_Rule(rule_str=opts.remove_rich_rule))
+ assert rich_rule in data['rules'][opts.zone]
+ data['rules'][opts.zone].remove(rich_rule)
+ else:
+ assert opts.remove_rich_rule in data['rules'][opts.zone]
+ data['rules'][opts.zone].remove(opts.remove_rich_rule)
+ elif opts.list_rich_rules:
+ assert opts.zone
+ assert 'rules' in data
+ assert opts.zone in data['rules'].keys()
+ for rule in data['rules'][opts.zone]:
+ print(rule)
+
+ if opts.permanent:
+ if data == {}:
+ if os.path.exists(dump_file):
+ os.unlink(dump_file)
+ else:
+ with open(dump_file, 'wb') as w:
+ pickle.dump(data, w)
from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext
from samba.gp_firefox_ext import gp_firefox_ext
from samba.gp_chromium_ext import gp_chromium_ext
+from samba.gp_firewalld_ext import gp_firewalld_ext
import logging
from samba.credentials import Credentials
from samba.gp_msgs_ext import gp_msgs_ext
from samba import Ldb
from samba.auth import system_session
import json
+from shutil import which
realm = os.environ.get('REALM')
policies = realm + '/POLICIES'
}
"""
+firewalld_reg_pol = \
+b"""
+<?xml version="1.0" encoding="utf-8"?>
+<PolFile num_entries="6" signature="PReg" version="1">
+ <Entry type="4" type_name="REG_DWORD">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld</Key>
+ <ValueName>Zones</ValueName>
+ <Value>1</Value>
+ </Entry>
+ <Entry type="4" type_name="REG_DWORD">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld</Key>
+ <ValueName>Rules</ValueName>
+ <Value>1</Value>
+ </Entry>
+ <Entry type="1" type_name="REG_SZ">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld\Rules</Key>
+ <ValueName>Rules</ValueName>
+ <Value>{"work": [{"rule": {"family": "ipv4"}, "source address": "172.25.1.7", "service name": "ftp", "reject": {}}]}</Value>
+ </Entry>
+ <Entry type="1" type_name="REG_SZ">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
+ <ValueName>**delvals.</ValueName>
+ <Value> </Value>
+ </Entry>
+ <Entry type="1" type_name="REG_SZ">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
+ <ValueName>work</ValueName>
+ <Value>work</Value>
+ </Entry>
+ <Entry type="1" type_name="REG_SZ">
+ <Key>Software\Policies\Samba\Unix Settings\Firewalld\Zones</Key>
+ <ValueName>home</ValueName>
+ <Value>home</Value>
+ </Entry>
+</PolFile>
+"""
+
def days2rel_nttime(val):
seconds = 60
minutes = 60
# Unstage the Registry.pol file
unstage_file(reg_pol)
+
+ def test_gp_firewalld_ext(self):
+ local_path = self.lp.cache_path('gpo_cache')
+ guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+ reg_pol = os.path.join(local_path, policies, guid,
+ 'MACHINE/REGISTRY.POL')
+ logger = logging.getLogger('gpo_tests')
+ cache_dir = self.lp.get('cache directory')
+ store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+ machine_creds = Credentials()
+ machine_creds.guess(self.lp)
+ machine_creds.set_machine_account()
+
+ # Initialize the group policy extension
+ ext = gp_firewalld_ext(logger, self.lp, machine_creds,
+ machine_creds.get_username(), store)
+
+ ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+ if ads.connect():
+ gpos = ads.get_gpo_list(machine_creds.get_username())
+
+ # Stage the Registry.pol file with test data
+ parser = GPPolParser()
+ parser.load_xml(etree.fromstring(firewalld_reg_pol.strip()))
+ ret = stage_file(reg_pol, ndr_pack(parser.pol_file))
+ self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+ ext.process_group_policy([], gpos)
+
+ # Check that the policy was applied
+ firewall_cmd = which('firewall-cmd')
+ cmd = [firewall_cmd, '--get-zones']
+ p = Popen(cmd, stdout=PIPE, stderr=PIPE)
+ out, err = p.communicate()
+ self.assertIn(b'work', out, 'Failed to apply zones')
+ self.assertIn(b'home', out, 'Failed to apply zones')
+
+ cmd = [firewall_cmd, '--zone=work', '--list-interfaces']
+ p = Popen(cmd, stdout=PIPE, stderr=PIPE)
+ out, err = p.communicate()
+ self.assertIn(b'eth0', out, 'Failed to set interface on zone')
+
+ cmd = [firewall_cmd, '--zone=home', '--list-interfaces']
+ p = Popen(cmd, stdout=PIPE, stderr=PIPE)
+ out, err = p.communicate()
+ self.assertIn(b'eth0', out, 'Failed to set interface on zone')
+
+ cmd = [firewall_cmd, '--zone=work', '--list-rich-rules']
+ p = Popen(cmd, stdout=PIPE, stderr=PIPE)
+ out, err = p.communicate()
+ rule = b'rule family=ipv4 source address=172.25.1.7 ' + \
+ b'service name=ftp reject'
+ self.assertEquals(rule, out.strip(), 'Failed to set rich rule')
+
+ # Verify RSOP does not fail
+ ext.rsop([g for g in gpos if g.name == guid][0])
+
+ # Unapply the policy
+ gp_db = store.get_gplog(machine_creds.get_username())
+ del_gpos = get_deleted_gpos_list(gp_db, [])
+ ext.process_group_policy(del_gpos, [])
+
+ # Check that the policy was unapplied
+ cmd = [firewall_cmd, '--get-zones']
+ p = Popen(cmd, stdout=PIPE, stderr=PIPE)
+ out, err = p.communicate()
+ self.assertNotIn(b'work', out, 'Failed to unapply zones')
+ self.assertNotIn(b'home', out, 'Failed to unapply zones')
+
+ # Unstage the Registry.pol file
+ unstage_file(reg_pol)