From: Aleš Mrázek Date: Fri, 26 Apr 2024 12:33:05 +0000 (+0200) Subject: manager: apply policy config separately X-Git-Tag: v6.0.8~8^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c0af6f51c4d7bbaaa05060c3fd4fee13678f7bef;p=thirdparty%2Fknot-resolver.git manager: apply policy config separately --- diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 072c73fc3..1c2a08319 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -6,7 +6,7 @@ from subprocess import SubprocessError from typing import Callable, List, Optional from knot_resolver_manager.compat.asyncio import create_task -from knot_resolver_manager.config_store import ConfigStore +from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes from knot_resolver_manager.constants import ( FIX_COUNTER_DECREASE_INTERVAL_SEC, MANAGER_FIX_ATTEMPT_MAX_COUNTER, @@ -77,6 +77,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes self._workers: List[Subprocess] = [] self._gc: Optional[Subprocess] = None + self._policy_loader: Optional[Subprocess] = None self._manager_lock = asyncio.Lock() self._controller: SubprocessController self._watchdog_task: Optional["asyncio.Task[None]"] = None @@ -111,7 +112,35 @@ class KresManager: # pylint: disable=too-many-instance-attributes # registering the function calls them immediately, therefore after this, the config is applied await config_store.register_verifier(self.validate_config) - await config_store.register_on_change_callback(self.apply_config) + + # register callback to apply config for workers and cache-gc + await config_store.register_on_change_callback( + only_on_real_changes( + lambda config: [ + config.nsid, + config.hostname, + config.rundir, + config.workers, + config.max_workers, + config.webmgmt, + config.options, + config.network, + config.cache, + config.dnssec, + config.dns64, + config.logging, + config.monitoring, + config.lua, + ] + )(self.apply_config) + ) + + # register callback for policy rules config sections + await config_store.register_on_change_callback( + only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])( + self.apply_policy_loader_config + ) + ) # register controller config change listeners await config_store.register_verifier(_deny_max_worker_changes) @@ -135,6 +164,9 @@ class KresManager: # pylint: disable=too-many-instance-attributes elif subp.type == SubprocessType.GC: assert self._gc is None self._gc = subp + elif subp.type == SubprocessType.POLICY_LOADER: + assert self._policy_loader is None + self._policy_loader = subp else: raise RuntimeError("unexpected subprocess type") @@ -151,6 +183,19 @@ class KresManager: # pylint: disable=too-many-instance-attributes while len(self._workers) < n: await self._spawn_new_worker(config) + async def _run_policy_loader(self, config: KresConfig) -> None: + if self._policy_loader: + await self._policy_loader.start() + else: + subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER) + await subprocess.start() + self._policy_loader = subprocess + + async def _stop_policy_loader(self) -> None: + assert self._policy_loader is not None + await self._policy_loader.stop() + self._policy_loader = None + def _is_gc_running(self) -> bool: return self._gc is not None @@ -183,6 +228,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes async def _reload_system_state(self) -> None: async with self._manager_lock: self._workers = [] + self._policy_loader = None self._gc = None await self._collect_already_running_workers() @@ -214,6 +260,25 @@ class KresManager: # pylint: disable=too-many-instance-attributes await self._reload_system_state() await self.apply_config(config, _noretry=True) + async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None: + try: + async with self._manager_lock: + logger.debug("Running kresd policy loader") + await self._run_policy_loader(config) + except SubprocessControllerException as e: + if _noretry: + raise e + elif self._fix_counter.is_too_high(): + logger.error(f"Failed to apply kresd policy loader config: {e}") + logger.error("There have already been problems recently, refusing to try to fix it.") + await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way + else: + logger.error(f"Failed to apply kresd policy loader config: {e}") + logger.warning("Reloading system state and trying again.") + self._fix_counter.increase() + await self._reload_system_state() + await self.apply_policy_loader_config(config, _noretry=True) + async def stop(self): if self._watchdog_task is not None: self._watchdog_task.cancel() # cancel it @@ -267,6 +332,9 @@ class KresManager: # pylint: disable=too-many-instance-attributes expected_ids = [x.id for x in self._workers] if self._gc: expected_ids.append(self._gc.id) + if self._policy_loader: + expected_ids.append(self._policy_loader.id) + invoke_callback = False for eid in expected_ids: @@ -286,7 +354,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes non_registered_ids = detected_subprocesses.keys() - set(expected_ids) if len(non_registered_ids) != 0: logger.error( - "Found additional kresd instances in the system, which shouldn't be there - %s", + "Found additional process in the system, which shouldn't be there - %s", non_registered_ids, ) invoke_callback = True