From: David Mulder Date: Tue, 29 Jun 2021 19:38:02 +0000 (+0000) Subject: samba-tool: gpo load/remove commands X-Git-Tag: tevent-0.14.1~40 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ee37e3cd32e98088cf3b4faba4cc2a2ce3a70175;p=thirdparty%2Fsamba.git samba-tool: gpo load/remove commands These commands allow the setting of various group policies on the sysvol. Signed-off-by: David Mulder Reviewed-by: Andrew Bartlett Tested-by: Kees van Vloten --- diff --git a/python/samba/netcmd/gpo.py b/python/samba/netcmd/gpo.py index 6e64cc46a47..e790f2e8f4b 100644 --- a/python/samba/netcmd/gpo.py +++ b/python/samba/netcmd/gpo.py @@ -19,6 +19,7 @@ # along with this program. If not, see . # import os +import sys import samba.getopt as options import ldb import re @@ -76,6 +77,7 @@ from samba.ntstatus import ( NT_STATUS_ACCESS_DENIED ) from samba.netcmd.gpcommon import create_directory_hier, smb_connection +from samba.policies import RegistryGroupPolicies def gpo_flags_string(value): @@ -672,6 +674,128 @@ class cmd_show(GPOCommand): self.outf.write("\n") +class cmd_load(GPOCommand): + """Load policies onto a GPO. + + Reads json from standard input until EOF, unless a json formatted + file is provided via --content. + + Example json_input: + [ + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "StartPage", + "class": "USER", + "type": "REG_SZ", + "data": "homepage" + }, + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "URL", + "class": "USER", + "type": "REG_SZ", + "data": "google.com" + }, + ] + """ + + synopsis = "%prog [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "versionopts": options.VersionOptions, + "credopts": options.CredentialsOptions, + } + + takes_args = ['gpo'] + + takes_options = [ + Option("-H", help="LDB URL for database or target server", type=str), + Option("--content", help="JSON file of policy inputs", type=str) + ] + + def run(self, gpo, H=None, content=None, sambaopts=None, credopts=None, + versionopts=None): + if content is None: + policy_defs = json.loads(sys.stdin.read()) + elif os.path.exists(content): + with open(content, 'rb') as r: + policy_defs = json.load(r) + else: + raise CommandError("The JSON content file does not exist") + + self.lp = sambaopts.get_loadparm() + self.creds = credopts.get_credentials(self.lp, fallback_machine=True) + reg = RegistryGroupPolicies(gpo, self.lp, self.creds, H) + try: + reg.merge_s(policy_defs) + except NTSTATUSError as e: + if e.args[0] == NT_STATUS_ACCESS_DENIED: + raise CommandError("The authenticated user does " + "not have sufficient privileges") + else: + raise + + +class cmd_remove(GPOCommand): + """Remove policies from a GPO. + + Reads json from standard input until EOF, unless a json formatted + file is provided via --content. + + Example json_input: + [ + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "StartPage", + "class": "USER", + }, + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "URL", + "class": "USER", + }, + ] + """ + + synopsis = "%prog [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "versionopts": options.VersionOptions, + "credopts": options.CredentialsOptions, + } + + takes_args = ['gpo'] + + takes_options = [ + Option("-H", help="LDB URL for database or target server", type=str), + Option("--content", help="JSON file of policy inputs", type=str) + ] + + def run(self, gpo, H=None, content=None, sambaopts=None, credopts=None, + versionopts=None): + if content is None: + policy_defs = json.loads(sys.stdin.read()) + elif os.path.exists(content): + with open(content, 'rb') as r: + policy_defs = json.load(r) + else: + raise CommandError("The JSON content file does not exist") + + self.lp = sambaopts.get_loadparm() + self.creds = credopts.get_credentials(self.lp, fallback_machine=True) + reg = RegistryGroupPolicies(gpo, self.lp, self.creds, H) + try: + reg.remove_s(policy_defs) + except NTSTATUSError as e: + if e.args[0] == NT_STATUS_ACCESS_DENIED: + raise CommandError("The authenticated user does " + "not have sufficient privileges") + else: + raise + + class cmd_getlink(GPOCommand): """List GPO Links for a container.""" @@ -4092,6 +4216,8 @@ class cmd_gpo(SuperCommand): subcommands["listall"] = cmd_listall() subcommands["list"] = cmd_list() subcommands["show"] = cmd_show() + subcommands["load"] = cmd_load() + subcommands["remove"] = cmd_remove() subcommands["getlink"] = cmd_getlink() subcommands["setlink"] = cmd_setlink() subcommands["dellink"] = cmd_dellink() diff --git a/python/samba/policies.py b/python/samba/policies.py new file mode 100644 index 00000000000..5751c7aea0f --- /dev/null +++ b/python/samba/policies.py @@ -0,0 +1,226 @@ +# Utilities for working with policies in SYSVOL Registry.pol files +# +# Copyright (C) David Mulder 2022 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import json +from samba.ndr import ndr_unpack, ndr_pack +from samba.dcerpc import preg +from samba.netcmd.common import netcmd_finddc +from samba.netcmd.gpcommon import create_directory_hier, smb_connection +from samba import NTSTATUSError +from numbers import Number +from samba.registry import str_regtype +from samba.ntstatus import ( + NT_STATUS_OBJECT_NAME_INVALID, + NT_STATUS_OBJECT_NAME_NOT_FOUND, + NT_STATUS_OBJECT_PATH_NOT_FOUND +) + +class RegistryGroupPolicies(object): + def __init__(self, gpo, lp, creds, host=None): + self.gpo = gpo + self.lp = lp + self.creds = creds + realm = self.lp.get('realm') + self.pol_dir = '\\'.join([realm.lower(), 'Policies', gpo, '%s']) + self.pol_file = '\\'.join([self.pol_dir, 'Registry.pol']) + + + if host and host.startswith('ldap://'): + dc_hostname = host[7:] + else: + dc_hostname = netcmd_finddc(self.lp, self.creds) + + self.conn = smb_connection(dc_hostname, + 'sysvol', + lp=self.lp, + creds=self.creds) + + def __load_registry_pol(self, pol_file): + try: + pol_data = ndr_unpack(preg.file, self.conn.loadfile(pol_file)) + except NTSTATUSError as e: + if e.args[0] in [NT_STATUS_OBJECT_NAME_INVALID, + NT_STATUS_OBJECT_NAME_NOT_FOUND, + NT_STATUS_OBJECT_PATH_NOT_FOUND]: + pol_data = preg.file() # The file doesn't exist + else: + raise + return pol_data + + def __save_registry_pol(self, pol_dir, pol_file, pol_data): + create_directory_hier(self.conn, pol_dir) + self.conn.savefile(pol_file, ndr_pack(pol_data)) + + def __validate_json(self, json_input, remove=False): + if type(json_input) != list: + raise SyntaxError('JSON not formatted correctly') + for entry in json_input: + if type(entry) != dict: + raise SyntaxError('JSON not formatted correctly') + keys = ['keyname', 'valuename', 'class'] + if not remove: + keys.extend(['data', 'type']) + if not all([k in entry for k in keys]): + raise SyntaxError('JSON not formatted correctly') + + def __determine_data_type(self, entry): + if isinstance(entry['type'], Number): + return entry['type'] + else: + for i in range(12): + if str_regtype(i) == entry['type'].upper(): + return i + return 0 # REG_NONE + + def __pol_replace(self, pol_data, entry): + for e in pol_data.entries: + if e.keyname == entry['keyname'] and \ + e.valuename == entry['valuename']: + e.data = entry['data'] + break + else: + e = preg.entry() + e.keyname = entry['keyname'] + e.valuename = entry['valuename'] + e.type = self.__determine_data_type(entry) + e.data = entry['data'] + entries = list(pol_data.entries) + entries.append(e) + pol_data.entries = entries + pol_data.num_entries = len(entries) + + def __pol_remove(self, pol_data, entry): + entries = [] + for e in pol_data.entries: + if not (e.keyname == entry['keyname'] and \ + e.valuename == entry['valuename']): + entries.append(e) + pol_data.entries = entries + pol_data.num_entries = len(entries) + + def remove_s(self, json_input): + '''remove_s + json_input: JSON list of entries to remove from GPO + + Example json_input: + [ + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "StartPage", + "class": "USER", + }, + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "URL", + "class": "USER", + }, + ] + ''' + self.__validate_json(json_input, remove=True) + user_pol_data = self.__load_registry_pol(self.pol_file % 'User') + machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine') + + for entry in json_input: + cls = entry['class'].lower() + if cls == 'machine' or cls == 'both': + self.__pol_remove(machine_pol_data, entry) + if cls == 'user' or cls == 'both': + self.__pol_remove(user_pol_data, entry) + self.__save_registry_pol(self.pol_dir % 'User', + self.pol_file % 'User', + user_pol_data) + self.__save_registry_pol(self.pol_dir % 'Machine', + self.pol_file % 'Machine', + machine_pol_data) + + def merge_s(self, json_input): + '''merge_s + json_input: JSON list of entries to merge into GPO + + Example json_input: + [ + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "StartPage", + "class": "USER", + "type": "REG_SZ", + "data": "homepage" + }, + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "URL", + "class": "USER", + "type": "REG_SZ", + "data": "google.com" + }, + ] + ''' + self.__validate_json(json_input) + user_pol_data = self.__load_registry_pol(self.pol_file % 'User') + machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine') + + for entry in json_input: + cls = entry['class'].lower() + if cls == 'machine' or cls == 'both': + self.__pol_replace(machine_pol_data, entry) + if cls == 'user' or cls == 'both': + self.__pol_replace(user_pol_data, entry) + self.__save_registry_pol(self.pol_dir % 'User', + self.pol_file % 'User', + user_pol_data) + self.__save_registry_pol(self.pol_dir % 'Machine', + self.pol_file % 'Machine', + machine_pol_data) + + def replace_s(self, json_input): + '''replace_s + json_input: JSON list of entries to replace entries in GPO + + Example json_input: + [ + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "StartPage", + "class": "USER", + "data": "homepage" + }, + { + "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage", + "valuename": "URL", + "class": "USER", + "data": "google.com" + }, + ] + ''' + self.__validate_json(json_input) + user_pol_data = preg.file() + machine_pol_data = preg.file() + + for entry in json_input: + cls = entry['class'].lower() + if cls == 'machine' or cls == 'both': + self.__pol_replace(machine_pol_data, entry) + if cls == 'user' or cls == 'both': + self.__pol_replace(user_pol_data, entry) + if user_pol_data.num_entries > 0: + self.__save_registry_pol(self.pol_dir % 'User', + self.pol_file % 'User', + user_pol_data) + if machine_pol_data.num_entries > 0: + self.__save_registry_pol(self.pol_dir % 'Machine', + self.pol_file % 'Machine', + machine_pol_data)