From cd73e4101347f1e3c1bb865f9a9c361b3771fd34 Mon Sep 17 00:00:00 2001 From: David Mulder Date: Tue, 12 Oct 2021 12:54:09 -0600 Subject: [PATCH] gp: Test Firewalld Group Policy Apply Signed-off-by: David Mulder Reviewed-by: Jeremy Allison --- python/samba/gp_firewalld_ext.py | 25 +++++++ python/samba/tests/bin/firewall-cmd | 110 +++++++++++++++++++++++++++ python/samba/tests/gpo.py | 111 ++++++++++++++++++++++++++++ selftest/knownfail.d/gpo | 1 + 4 files changed, 247 insertions(+) create mode 100644 python/samba/gp_firewalld_ext.py create mode 100755 python/samba/tests/bin/firewall-cmd create mode 100644 selftest/knownfail.d/gpo diff --git a/python/samba/gp_firewalld_ext.py b/python/samba/gp_firewalld_ext.py new file mode 100644 index 00000000000..e6dede47d69 --- /dev/null +++ b/python/samba/gp_firewalld_ext.py @@ -0,0 +1,25 @@ +# gp_firewalld_ext samba gpo policy +# Copyright (C) David Mulder 2021 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from samba.gpclass import gp_pol_ext + +class gp_firewalld_ext(gp_pol_ext): + def process_group_policy(self, deleted_gpo_list, changed_gpo_list): + pass + + def rsop(self, gpo): + output = {} + return output diff --git a/python/samba/tests/bin/firewall-cmd b/python/samba/tests/bin/firewall-cmd new file mode 100755 index 00000000000..503ae9a772c --- /dev/null +++ b/python/samba/tests/bin/firewall-cmd @@ -0,0 +1,110 @@ +#!/usr/bin/python3 +import optparse +import os, sys, re +import pickle +try: + from firewall.core.rich import Rich_Rule +except ImportError: + Rich_Rule = None + +sys.path.insert(0, "bin/python") + +if __name__ == "__main__": + parser = optparse.OptionParser('firewall-cmd [options]') + parser.add_option('--list-interfaces', default=False, action="store_true") + parser.add_option('--permanent', default=False, action="store_true") + parser.add_option('--new-zone') + parser.add_option('--get-zones', default=False, action="store_true") + parser.add_option('--delete-zone') + parser.add_option('--zone') + parser.add_option('--add-interface') + parser.add_option('--add-rich-rule') + parser.add_option('--remove-rich-rule') + parser.add_option('--list-rich-rules', default=False, action="store_true") + + (opts, args) = parser.parse_args() + + # Use a dir we can write to in the testenv + if 'LOCAL_PATH' in os.environ: + data_dir = os.path.realpath(os.environ.get('LOCAL_PATH')) + else: + data_dir = os.path.dirname(os.path.realpath(__file__)) + dump_file = os.path.join(data_dir, 'firewall-cmd.dump') + if os.path.exists(dump_file): + with open(dump_file, 'rb') as r: + data = pickle.load(r) + else: + data = {} + + if opts.list_interfaces: + if not opts.zone: # default zone dummy interface + print('eth0') + else: + assert 'zone_interfaces' in data + assert opts.zone in data['zone_interfaces'].keys() + for interface in data['zone_interfaces'][opts.zone]: + sys.stdout.write('%s ' % interface) + print() + elif opts.new_zone: + if 'zones' not in data: + data['zones'] = [] + data['zones'].append(opts.new_zone) + elif opts.get_zones: + if 'zones' in data: + for zone in data['zones']: + sys.stdout.write('%s ' % zone) + print() + elif opts.delete_zone: + assert 'zones' in data + assert opts.delete_zone in data['zones'] + data['zones'].remove(opts.delete_zone) + if len(data['zones']) == 0: + del data['zones'] + if 'zone_interfaces' in data and opts.zone in data['zone_interfaces'].keys(): + del data['zone_interfaces'][opts.zone] + elif opts.add_interface: + assert opts.zone + assert 'zones' in data + assert opts.zone in data['zones'] + if 'zone_interfaces' not in data: + data['zone_interfaces'] = {} + if opts.zone not in data['zone_interfaces'].keys(): + data['zone_interfaces'][opts.zone] = [] + data['zone_interfaces'][opts.zone].append(opts.add_interface) + elif opts.add_rich_rule: + assert opts.zone + if 'rules' not in data: + data['rules'] = {} + if opts.zone not in data['rules']: + data['rules'][opts.zone] = [] + # Test rule parsing if firewalld is installed + if Rich_Rule: + # Parsing failure will throw an exception + data['rules'][opts.zone].append(str(Rich_Rule(rule_str=opts.add_rich_rule))) + else: + data['rules'][opts.zone].append(opts.add_rich_rule) + elif opts.remove_rich_rule: + assert opts.zone + assert 'rules' in data + assert opts.zone in data['rules'].keys() + if Rich_Rule: + rich_rule = str(Rich_Rule(rule_str=opts.remove_rich_rule)) + assert rich_rule in data['rules'][opts.zone] + data['rules'][opts.zone].remove(rich_rule) + else: + assert opts.remove_rich_rule in data['rules'][opts.zone] + data['rules'][opts.zone].remove(opts.remove_rich_rule) + elif opts.list_rich_rules: + assert opts.zone + assert 'rules' in data + assert opts.zone in data['rules'].keys() + for rule in data['rules'][opts.zone]: + print(rule) + + if opts.permanent: + if data == {}: + if os.path.exists(dump_file): + os.unlink(dump_file) + else: + with open(dump_file, 'wb') as w: + pickle.dump(data, w) diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py index 05f12312c6e..35399a63043 100644 --- a/python/samba/tests/gpo.py +++ b/python/samba/tests/gpo.py @@ -44,6 +44,7 @@ from samba.gp_gnome_settings_ext import gp_gnome_settings_ext from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext from samba.gp_firefox_ext import gp_firefox_ext from samba.gp_chromium_ext import gp_chromium_ext +from samba.gp_firewalld_ext import gp_firewalld_ext import logging from samba.credentials import Credentials from samba.gp_msgs_ext import gp_msgs_ext @@ -61,6 +62,7 @@ from samba.gpclass import get_dc_hostname from samba import Ldb from samba.auth import system_session import json +from shutil import which realm = os.environ.get('REALM') policies = realm + '/POLICIES' @@ -6832,6 +6834,43 @@ b""" } """ +firewalld_reg_pol = \ +b""" + + + + Software\Policies\Samba\Unix Settings\Firewalld + Zones + 1 + + + Software\Policies\Samba\Unix Settings\Firewalld + Rules + 1 + + + Software\Policies\Samba\Unix Settings\Firewalld\Rules + Rules + {"work": [{"rule": {"family": "ipv4"}, "source address": "172.25.1.7", "service name": "ftp", "reject": {}}]} + + + Software\Policies\Samba\Unix Settings\Firewalld\Zones + **delvals. + + + + Software\Policies\Samba\Unix Settings\Firewalld\Zones + work + work + + + Software\Policies\Samba\Unix Settings\Firewalld\Zones + home + home + + +""" + def days2rel_nttime(val): seconds = 60 minutes = 60 @@ -8805,3 +8844,75 @@ class GPOTests(tests.TestCase): # Unstage the Registry.pol file unstage_file(reg_pol) + + def test_gp_firewalld_ext(self): + local_path = self.lp.cache_path('gpo_cache') + guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}' + reg_pol = os.path.join(local_path, policies, guid, + 'MACHINE/REGISTRY.POL') + logger = logging.getLogger('gpo_tests') + cache_dir = self.lp.get('cache directory') + store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb')) + + machine_creds = Credentials() + machine_creds.guess(self.lp) + machine_creds.set_machine_account() + + # Initialize the group policy extension + ext = gp_firewalld_ext(logger, self.lp, machine_creds, + machine_creds.get_username(), store) + + ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds) + if ads.connect(): + gpos = ads.get_gpo_list(machine_creds.get_username()) + + # Stage the Registry.pol file with test data + parser = GPPolParser() + parser.load_xml(etree.fromstring(firewalld_reg_pol.strip())) + ret = stage_file(reg_pol, ndr_pack(parser.pol_file)) + self.assertTrue(ret, 'Could not create the target %s' % reg_pol) + + ext.process_group_policy([], gpos) + + # Check that the policy was applied + firewall_cmd = which('firewall-cmd') + cmd = [firewall_cmd, '--get-zones'] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + self.assertIn(b'work', out, 'Failed to apply zones') + self.assertIn(b'home', out, 'Failed to apply zones') + + cmd = [firewall_cmd, '--zone=work', '--list-interfaces'] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + self.assertIn(b'eth0', out, 'Failed to set interface on zone') + + cmd = [firewall_cmd, '--zone=home', '--list-interfaces'] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + self.assertIn(b'eth0', out, 'Failed to set interface on zone') + + cmd = [firewall_cmd, '--zone=work', '--list-rich-rules'] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + rule = b'rule family=ipv4 source address=172.25.1.7 ' + \ + b'service name=ftp reject' + self.assertEquals(rule, out.strip(), 'Failed to set rich rule') + + # Verify RSOP does not fail + ext.rsop([g for g in gpos if g.name == guid][0]) + + # Unapply the policy + gp_db = store.get_gplog(machine_creds.get_username()) + del_gpos = get_deleted_gpos_list(gp_db, []) + ext.process_group_policy(del_gpos, []) + + # Check that the policy was unapplied + cmd = [firewall_cmd, '--get-zones'] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + self.assertNotIn(b'work', out, 'Failed to unapply zones') + self.assertNotIn(b'home', out, 'Failed to unapply zones') + + # Unstage the Registry.pol file + unstage_file(reg_pol) diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo new file mode 100644 index 00000000000..74e2de0dd39 --- /dev/null +++ b/selftest/knownfail.d/gpo @@ -0,0 +1 @@ +^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_firewalld_ext -- 2.47.2