]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: wait for 'policy-loader' to finish on the resolver startup
authorAleš Mrázek <ales.mrazek@nic.cz>
Fri, 17 May 2024 13:09:44 +0000 (15:09 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Tue, 2 Jul 2024 12:07:48 +0000 (14:07 +0200)
When starting the resolver, we wait for policy-loader until policy rules are successfully loaded into the cache where the rules are shared between kred workers. After that, other processes are started. Otherwise, the workers might start without the configured rules in the cache while they are already resolving DNS traffic.

manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/interface.py

index 29e43f7a201fbefdd59f4569e806179900352713..e0d6c84ed6a2dcc6be3c1ed36da5e19d4f66f493 100644 (file)
@@ -19,6 +19,10 @@ from knot_resolver_manager.kresd_controller.interface import (
     SubprocessStatus,
     SubprocessType,
 )
+from knot_resolver_manager.kresd_controller.registered_workers import (
+    command_registered_workers,
+    get_registered_workers_kresids,
+)
 from knot_resolver_manager.utils.functional import Result
 from knot_resolver_manager.utils.modeling.types import NoneType
 
@@ -110,16 +114,22 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         logger.debug("Looking for already running workers")
         await self._collect_already_running_workers()
 
-        # registering the function calls them immediately, therefore after this, the config is applied
+        # register and immediately call a callback that applies policy rules configuration
+        await config_store.register_on_change_callback(
+            only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
+                self.apply_policy_rules_config
+            )
+        )
+
+        # register and immediately call a verififier that validates config with 'canary' kresd process
         await config_store.register_verifier(self.validate_config)
 
-        # register callback to apply config for workers and cache-gc
+        # register and immediately call a callback to apply config to all 'kresd' workers and 'cache-gc'
         await config_store.register_on_change_callback(
             only_on_real_changes(
                 lambda config: [
                     config.nsid,
                     config.hostname,
-                    config.rundir,
                     config.workers,
                     config.max_workers,
                     config.webmgmt,
@@ -136,13 +146,6 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
             )(self.apply_config)
         )
 
-        # register callback for policy rules config sections
-        await config_store.register_on_change_callback(
-            only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
-                self.apply_policy_loader_config
-            )
-        )
-
         # register controller config change listeners
         await config_store.register_verifier(_deny_max_worker_changes)
 
@@ -186,16 +189,16 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
 
     async def _run_policy_loader(self, config: KresConfig) -> None:
         if self._policy_loader:
-            await self._policy_loader.start()
+            await self._policy_loader.start(config)
         else:
             subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER)
             await subprocess.start()
             self._policy_loader = subprocess
 
-    async def _stop_policy_loader(self) -> None:
-        assert self._policy_loader is not None
-        await self._policy_loader.stop()
-        self._policy_loader = None
+    def _is_policy_loader_exited(self) -> bool:
+        if self._policy_loader:
+            return self._policy_loader.status() is SubprocessStatus.EXITED
+        return False
 
     def _is_gc_running(self) -> bool:
         return self._gc is not None
@@ -261,24 +264,38 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 await self._reload_system_state()
                 await self.apply_config(config, _noretry=True)
 
-    async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None:
+    async def apply_policy_rules_config(self, config: KresConfig, _noretry: bool = False) -> None:
         try:
             async with self._manager_lock:
-                logger.debug("Running kresd policy loader")
+                logger.debug("Running kresd 'policy-loader'")
                 await self._run_policy_loader(config)
+
+                # wait for 'policy-loader' to finish
+                logger.debug("Waiting for 'policy-loader' to finish loading policy rules")
+                while not self._is_policy_loader_exited():
+                    await asyncio.sleep(1)
+
+                # command all running 'kresd' workers to reset their old policy rules,
+                # unless we're just starting up and there are none to reset
+                if get_registered_workers_kresids():
+                    logger.debug("Resetting policy rules for all running 'kresd' workers")
+                    cmd_results = await command_registered_workers("require('ffi').C.kr_rules_reset()")
+                    for worker, res in cmd_results.items():
+                        if res != 0:
+                            logger.error("Failed to reset policy rules in %s: %s", worker, res)
         except SubprocessControllerException as e:
             if _noretry:
                 raise e
             elif self._fix_counter.is_too_high():
-                logger.error(f"Failed to apply kresd policy loader config: {e}")
+                logger.error(f"Failed to apply configured policy rules: {e}")
                 logger.error("There have already been problems recently, refusing to try to fix it.")
                 await self.forced_shutdown()  # possible improvement - the person who requested this change won't get a response this way
             else:
-                logger.error(f"Failed to apply kresd policy loader config: {e}")
+                logger.error(f"Failed to apply configured policy rules: {e}")
                 logger.warning("Reloading system state and trying again.")
                 self._fix_counter.increase()
                 await self._reload_system_state()
-                await self.apply_policy_loader_config(config, _noretry=True)
+                await self.apply_policy_rules_config(config, _noretry=True)
 
     async def stop(self):
         if self._watchdog_task is not None:
index bd1ba59fdb4916b47323238436d101ba41462a54..63caea49367fe4351531cd1b4940e8a0c52c08fc 100644 (file)
@@ -113,7 +113,9 @@ class Subprocess(ABC):
         self._config = config
         self._registered_worker: bool = False
 
-    async def start(self) -> None:
+    async def start(self, new_config: Optional[KresConfig] = None) -> None:
+        if new_config:
+            self._config = new_config
 
         config_file: Optional[Path] = None
         if self.type is SubprocessType.KRESD: