from typing import Callable, List, Optional
from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.config_store import ConfigStore
+from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
from knot_resolver_manager.constants import (
FIX_COUNTER_DECREASE_INTERVAL_SEC,
MANAGER_FIX_ATTEMPT_MAX_COUNTER,
self._workers: List[Subprocess] = []
self._gc: Optional[Subprocess] = None
+ self._policy_loader: Optional[Subprocess] = None
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
self._watchdog_task: Optional["asyncio.Task[None]"] = None
# registering the function calls them immediately, therefore after this, the config is applied
await config_store.register_verifier(self.validate_config)
- await config_store.register_on_change_callback(self.apply_config)
+
+ # register callback to apply config for workers and cache-gc
+ await config_store.register_on_change_callback(
+ only_on_real_changes(
+ lambda config: [
+ config.nsid,
+ config.hostname,
+ config.rundir,
+ config.workers,
+ config.max_workers,
+ config.webmgmt,
+ config.options,
+ config.network,
+ config.cache,
+ config.dnssec,
+ config.dns64,
+ config.logging,
+ config.monitoring,
+ config.lua,
+ ]
+ )(self.apply_config)
+ )
+
+ # register callback for policy rules config sections
+ await config_store.register_on_change_callback(
+ only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
+ self.apply_policy_loader_config
+ )
+ )
# register controller config change listeners
await config_store.register_verifier(_deny_max_worker_changes)
elif subp.type == SubprocessType.GC:
assert self._gc is None
self._gc = subp
+ elif subp.type == SubprocessType.POLICY_LOADER:
+ assert self._policy_loader is None
+ self._policy_loader = subp
else:
raise RuntimeError("unexpected subprocess type")
while len(self._workers) < n:
await self._spawn_new_worker(config)
+ async def _run_policy_loader(self, config: KresConfig) -> None:
+ if self._policy_loader:
+ await self._policy_loader.start()
+ else:
+ subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER)
+ await subprocess.start()
+ self._policy_loader = subprocess
+
+ async def _stop_policy_loader(self) -> None:
+ assert self._policy_loader is not None
+ await self._policy_loader.stop()
+ self._policy_loader = None
+
def _is_gc_running(self) -> bool:
return self._gc is not None
async def _reload_system_state(self) -> None:
async with self._manager_lock:
self._workers = []
+ self._policy_loader = None
self._gc = None
await self._collect_already_running_workers()
await self._reload_system_state()
await self.apply_config(config, _noretry=True)
+ async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None:
+ try:
+ async with self._manager_lock:
+ logger.debug("Running kresd policy loader")
+ await self._run_policy_loader(config)
+ except SubprocessControllerException as e:
+ if _noretry:
+ raise e
+ elif self._fix_counter.is_too_high():
+ logger.error(f"Failed to apply kresd policy loader config: {e}")
+ logger.error("There have already been problems recently, refusing to try to fix it.")
+ await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way
+ else:
+ logger.error(f"Failed to apply kresd policy loader config: {e}")
+ logger.warning("Reloading system state and trying again.")
+ self._fix_counter.increase()
+ await self._reload_system_state()
+ await self.apply_policy_loader_config(config, _noretry=True)
+
async def stop(self):
if self._watchdog_task is not None:
self._watchdog_task.cancel() # cancel it
expected_ids = [x.id for x in self._workers]
if self._gc:
expected_ids.append(self._gc.id)
+ if self._policy_loader:
+ expected_ids.append(self._policy_loader.id)
+
invoke_callback = False
for eid in expected_ids:
non_registered_ids = detected_subprocesses.keys() - set(expected_ids)
if len(non_registered_ids) != 0:
logger.error(
- "Found additional kresd instances in the system, which shouldn't be there - %s",
+ "Found additional process in the system, which shouldn't be there - %s",
non_registered_ids,
)
invoke_callback = True