SubprocessStatus,
SubprocessType,
)
-from knot_resolver_manager.kresd_controller.registered_workers import (
- command_registered_workers,
- get_registered_workers_kresids,
-)
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.modeling.types import NoneType
logger.debug("Looking for already running workers")
await self._collect_already_running_workers()
- # register and immediately call a callback that applies policy rules configuration
- await config_store.register_on_change_callback(self.apply_policy_rules_config)
+ # register and immediately call a verifier that loads policy rules into the rules database
+ await config_store.register_verifier(self.load_policy_rules)
# configuration nodes that are relevant to kresd workers and the cache garbage collector
def config_nodes(config: KresConfig) -> List[Any]:
config.lua,
]
- # register and immediately call a verififier that validates config with 'canary' kresd process
+ # register and immediately call a verifier that validates config with 'canary' kresd process
await config_store.register_verifier(only_on_real_changes_verifier(config_nodes)(self.validate_config))
# register and immediately call a callback to apply config to all 'kresd' workers and 'cache-gc'
await self._reload_system_state()
await self.apply_config(config, _noretry=True)
- async def apply_policy_rules_config(self, config: KresConfig, _noretry: bool = False) -> None:
+ async def load_policy_rules(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
try:
async with self._manager_lock:
logger.debug("Running kresd 'policy-loader'")
- await self._run_policy_loader(config)
+ await self._run_policy_loader(new)
# wait for 'policy-loader' to finish
logger.debug("Waiting for 'policy-loader' to finish loading policy rules")
while not self._is_policy_loader_exited():
await asyncio.sleep(1)
- # command all running 'kresd' workers to reset their old policy rules,
- # unless we're just starting up and there are none to reset
- if get_registered_workers_kresids():
- logger.debug("Resetting policy rules for all running 'kresd' workers")
- cmd_results = await command_registered_workers("require('ffi').C.kr_rules_reset()")
- for worker, res in cmd_results.items():
- if res != 0:
- logger.error("Failed to reset policy rules in %s: %s", worker, res)
- except SubprocessControllerException as e:
- if _noretry:
- raise e
- elif self._fix_counter.is_too_high():
- logger.error(f"Failed to apply configured policy rules: {e}")
- logger.error("There have already been problems recently, refusing to try to fix it.")
- await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way
- else:
- logger.error(f"Failed to apply configured policy rules: {e}")
- logger.warning("Reloading system state and trying again.")
- self._fix_counter.increase()
- await self._reload_system_state()
- await self.apply_policy_rules_config(config, _noretry=True)
+ except (SubprocessError, SubprocessControllerException) as e:
+ logger.error(f"Failed to load policy rules: {e}")
+ return Result.err("kresd 'policy-loader' process failed to start. Config might be invalid.")
+
+ logger.debug("Loading policy rules has been successfully completed")
+ return Result.ok(None)
async def stop(self):
if self._watchdog_task is not None: