From: Aleš Mrázek Date: Fri, 17 May 2024 13:09:44 +0000 (+0200) Subject: manager: wait for 'policy-loader' to finish on the resolver startup X-Git-Tag: v6.0.8~8^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d62fe8e300c6954a9c8d53fc9a70bd2e5e58d75d;p=thirdparty%2Fknot-resolver.git manager: wait for 'policy-loader' to finish on the resolver startup When starting the resolver, we wait for policy-loader until policy rules are successfully loaded into the cache where the rules are shared between kred workers. After that, other processes are started. Otherwise, the workers might start without the configured rules in the cache while they are already resolving DNS traffic. --- diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 29e43f7a2..e0d6c84ed 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -19,6 +19,10 @@ from knot_resolver_manager.kresd_controller.interface import ( SubprocessStatus, SubprocessType, ) +from knot_resolver_manager.kresd_controller.registered_workers import ( + command_registered_workers, + get_registered_workers_kresids, +) from knot_resolver_manager.utils.functional import Result from knot_resolver_manager.utils.modeling.types import NoneType @@ -110,16 +114,22 @@ class KresManager: # pylint: disable=too-many-instance-attributes logger.debug("Looking for already running workers") await self._collect_already_running_workers() - # registering the function calls them immediately, therefore after this, the config is applied + # register and immediately call a callback that applies policy rules configuration + await config_store.register_on_change_callback( + only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])( + self.apply_policy_rules_config + ) + ) + + # register and immediately call a verififier that validates config with 'canary' kresd process await config_store.register_verifier(self.validate_config) - # register callback to apply config for workers and cache-gc + # register and immediately call a callback to apply config to all 'kresd' workers and 'cache-gc' await config_store.register_on_change_callback( only_on_real_changes( lambda config: [ config.nsid, config.hostname, - config.rundir, config.workers, config.max_workers, config.webmgmt, @@ -136,13 +146,6 @@ class KresManager: # pylint: disable=too-many-instance-attributes )(self.apply_config) ) - # register callback for policy rules config sections - await config_store.register_on_change_callback( - only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])( - self.apply_policy_loader_config - ) - ) - # register controller config change listeners await config_store.register_verifier(_deny_max_worker_changes) @@ -186,16 +189,16 @@ class KresManager: # pylint: disable=too-many-instance-attributes async def _run_policy_loader(self, config: KresConfig) -> None: if self._policy_loader: - await self._policy_loader.start() + await self._policy_loader.start(config) else: subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER) await subprocess.start() self._policy_loader = subprocess - async def _stop_policy_loader(self) -> None: - assert self._policy_loader is not None - await self._policy_loader.stop() - self._policy_loader = None + def _is_policy_loader_exited(self) -> bool: + if self._policy_loader: + return self._policy_loader.status() is SubprocessStatus.EXITED + return False def _is_gc_running(self) -> bool: return self._gc is not None @@ -261,24 +264,38 @@ class KresManager: # pylint: disable=too-many-instance-attributes await self._reload_system_state() await self.apply_config(config, _noretry=True) - async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None: + async def apply_policy_rules_config(self, config: KresConfig, _noretry: bool = False) -> None: try: async with self._manager_lock: - logger.debug("Running kresd policy loader") + logger.debug("Running kresd 'policy-loader'") await self._run_policy_loader(config) + + # wait for 'policy-loader' to finish + logger.debug("Waiting for 'policy-loader' to finish loading policy rules") + while not self._is_policy_loader_exited(): + await asyncio.sleep(1) + + # command all running 'kresd' workers to reset their old policy rules, + # unless we're just starting up and there are none to reset + if get_registered_workers_kresids(): + logger.debug("Resetting policy rules for all running 'kresd' workers") + cmd_results = await command_registered_workers("require('ffi').C.kr_rules_reset()") + for worker, res in cmd_results.items(): + if res != 0: + logger.error("Failed to reset policy rules in %s: %s", worker, res) except SubprocessControllerException as e: if _noretry: raise e elif self._fix_counter.is_too_high(): - logger.error(f"Failed to apply kresd policy loader config: {e}") + logger.error(f"Failed to apply configured policy rules: {e}") logger.error("There have already been problems recently, refusing to try to fix it.") await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way else: - logger.error(f"Failed to apply kresd policy loader config: {e}") + logger.error(f"Failed to apply configured policy rules: {e}") logger.warning("Reloading system state and trying again.") self._fix_counter.increase() await self._reload_system_state() - await self.apply_policy_loader_config(config, _noretry=True) + await self.apply_policy_rules_config(config, _noretry=True) async def stop(self): if self._watchdog_task is not None: diff --git a/manager/knot_resolver_manager/kresd_controller/interface.py b/manager/knot_resolver_manager/kresd_controller/interface.py index bd1ba59fd..63caea493 100644 --- a/manager/knot_resolver_manager/kresd_controller/interface.py +++ b/manager/knot_resolver_manager/kresd_controller/interface.py @@ -113,7 +113,9 @@ class Subprocess(ABC): self._config = config self._registered_worker: bool = False - async def start(self) -> None: + async def start(self, new_config: Optional[KresConfig] = None) -> None: + if new_config: + self._config = new_config config_file: Optional[Path] = None if self.type is SubprocessType.KRESD: