SubprocessStatus,
SubprocessType,
)
+from knot_resolver_manager.kresd_controller.registered_workers import (
+ command_registered_workers,
+ get_registered_workers_kresids,
+)
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.modeling.types import NoneType
logger.debug("Looking for already running workers")
await self._collect_already_running_workers()
- # registering the function calls them immediately, therefore after this, the config is applied
+ # register and immediately call a callback that applies policy rules configuration
+ await config_store.register_on_change_callback(
+ only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
+ self.apply_policy_rules_config
+ )
+ )
+
+ # register and immediately call a verififier that validates config with 'canary' kresd process
await config_store.register_verifier(self.validate_config)
- # register callback to apply config for workers and cache-gc
+ # register and immediately call a callback to apply config to all 'kresd' workers and 'cache-gc'
await config_store.register_on_change_callback(
only_on_real_changes(
lambda config: [
config.nsid,
config.hostname,
- config.rundir,
config.workers,
config.max_workers,
config.webmgmt,
)(self.apply_config)
)
- # register callback for policy rules config sections
- await config_store.register_on_change_callback(
- only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
- self.apply_policy_loader_config
- )
- )
-
# register controller config change listeners
await config_store.register_verifier(_deny_max_worker_changes)
async def _run_policy_loader(self, config: KresConfig) -> None:
if self._policy_loader:
- await self._policy_loader.start()
+ await self._policy_loader.start(config)
else:
subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER)
await subprocess.start()
self._policy_loader = subprocess
- async def _stop_policy_loader(self) -> None:
- assert self._policy_loader is not None
- await self._policy_loader.stop()
- self._policy_loader = None
+ def _is_policy_loader_exited(self) -> bool:
+ if self._policy_loader:
+ return self._policy_loader.status() is SubprocessStatus.EXITED
+ return False
def _is_gc_running(self) -> bool:
return self._gc is not None
await self._reload_system_state()
await self.apply_config(config, _noretry=True)
- async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None:
+ async def apply_policy_rules_config(self, config: KresConfig, _noretry: bool = False) -> None:
try:
async with self._manager_lock:
- logger.debug("Running kresd policy loader")
+ logger.debug("Running kresd 'policy-loader'")
await self._run_policy_loader(config)
+
+ # wait for 'policy-loader' to finish
+ logger.debug("Waiting for 'policy-loader' to finish loading policy rules")
+ while not self._is_policy_loader_exited():
+ await asyncio.sleep(1)
+
+ # command all running 'kresd' workers to reset their old policy rules,
+ # unless we're just starting up and there are none to reset
+ if get_registered_workers_kresids():
+ logger.debug("Resetting policy rules for all running 'kresd' workers")
+ cmd_results = await command_registered_workers("require('ffi').C.kr_rules_reset()")
+ for worker, res in cmd_results.items():
+ if res != 0:
+ logger.error("Failed to reset policy rules in %s: %s", worker, res)
except SubprocessControllerException as e:
if _noretry:
raise e
elif self._fix_counter.is_too_high():
- logger.error(f"Failed to apply kresd policy loader config: {e}")
+ logger.error(f"Failed to apply configured policy rules: {e}")
logger.error("There have already been problems recently, refusing to try to fix it.")
await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way
else:
- logger.error(f"Failed to apply kresd policy loader config: {e}")
+ logger.error(f"Failed to apply configured policy rules: {e}")
logger.warning("Reloading system state and trying again.")
self._fix_counter.increase()
await self._reload_system_state()
- await self.apply_policy_loader_config(config, _noretry=True)
+ await self.apply_policy_rules_config(config, _noretry=True)
async def stop(self):
if self._watchdog_task is not None: