SubprocessStatus,
SubprocessType,
)
+from knot_resolver_manager.kresd_controller.registered_workers import (
+ command_registered_workers,
+ get_registered_workers_kresids,
+)
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.modeling.types import NoneType
self._gc: Optional[Subprocess] = None
self._policy_loader: Optional[Subprocess] = None
self._manager_lock = asyncio.Lock()
+ self._workers_reset_needed: bool = False
self._controller: SubprocessController
self._watchdog_task: Optional["asyncio.Task[None]"] = None
self._fix_counter: _FixCounter = _FixCounter()
# register and immediately call a callback to apply config to all 'kresd' workers and 'cache-gc'
await config_store.register_on_change_callback(only_on_real_changes_update(config_nodes)(self.apply_config))
+ # register callback to reset policy rules for each 'kresd' worker
+ await config_store.register_on_change_callback(self.reset_workers_policy_rules)
+
# register controller config change listeners
await config_store.register_verifier(_deny_max_worker_changes)
self._gc = None
await self._collect_already_running_workers()
+ async def reset_workers_policy_rules(self, _config: KresConfig) -> None:
+
+ # command all running 'kresd' workers to reset their old policy rules,
+ # unless the workers have already been started with a new config so reset is not needed
+ if self._workers_reset_needed and get_registered_workers_kresids():
+ logger.debug("Resetting policy rules for all running 'kresd' workers")
+ cmd_results = await command_registered_workers("require('ffi').C.kr_rules_reset()")
+ for worker, res in cmd_results.items():
+ if res != 0:
+ logger.error("Failed to reset policy rules in %s: %s", worker, res)
+ else:
+ logger.debug(
+ "Skipped resetting policy rules for all running 'kresd' workers:"
+ " the workers are already running with new configuration"
+ )
+
async def apply_config(self, config: KresConfig, _noretry: bool = False) -> None:
try:
async with self._manager_lock:
await self._reload_system_state()
await self.apply_config(config, _noretry=True)
+ self._workers_reset_needed = False
+
async def load_policy_rules(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
try:
async with self._manager_lock:
logger.error(f"Failed to load policy rules: {e}")
return Result.err("kresd 'policy-loader' process failed to start. Config might be invalid.")
+ self._workers_reset_needed = True
logger.debug("Loading policy rules has been successfully completed")
return Result.ok(None)