]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: apply policy config separately
authorAleš Mrázek <ales.mrazek@nic.cz>
Fri, 26 Apr 2024 12:33:05 +0000 (14:33 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Tue, 2 Jul 2024 12:07:48 +0000 (14:07 +0200)
manager/knot_resolver_manager/kres_manager.py

index 072c73fc31ff1075147587c9b744233712a92aee..1c2a0831912b52a98def1d369e9f5233e3a08653 100644 (file)
@@ -6,7 +6,7 @@ from subprocess import SubprocessError
 from typing import Callable, List, Optional
 
 from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.config_store import ConfigStore
+from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
 from knot_resolver_manager.constants import (
     FIX_COUNTER_DECREASE_INTERVAL_SEC,
     MANAGER_FIX_ATTEMPT_MAX_COUNTER,
@@ -77,6 +77,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
 
         self._workers: List[Subprocess] = []
         self._gc: Optional[Subprocess] = None
+        self._policy_loader: Optional[Subprocess] = None
         self._manager_lock = asyncio.Lock()
         self._controller: SubprocessController
         self._watchdog_task: Optional["asyncio.Task[None]"] = None
@@ -111,7 +112,35 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
 
         # registering the function calls them immediately, therefore after this, the config is applied
         await config_store.register_verifier(self.validate_config)
-        await config_store.register_on_change_callback(self.apply_config)
+
+        # register callback to apply config for workers and cache-gc
+        await config_store.register_on_change_callback(
+            only_on_real_changes(
+                lambda config: [
+                    config.nsid,
+                    config.hostname,
+                    config.rundir,
+                    config.workers,
+                    config.max_workers,
+                    config.webmgmt,
+                    config.options,
+                    config.network,
+                    config.cache,
+                    config.dnssec,
+                    config.dns64,
+                    config.logging,
+                    config.monitoring,
+                    config.lua,
+                ]
+            )(self.apply_config)
+        )
+
+        # register callback for policy rules config sections
+        await config_store.register_on_change_callback(
+            only_on_real_changes(lambda config: [config.views, config.local_data, config.forward])(
+                self.apply_policy_loader_config
+            )
+        )
 
         # register controller config change listeners
         await config_store.register_verifier(_deny_max_worker_changes)
@@ -135,6 +164,9 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
             elif subp.type == SubprocessType.GC:
                 assert self._gc is None
                 self._gc = subp
+            elif subp.type == SubprocessType.POLICY_LOADER:
+                assert self._policy_loader is None
+                self._policy_loader = subp
             else:
                 raise RuntimeError("unexpected subprocess type")
 
@@ -151,6 +183,19 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         while len(self._workers) < n:
             await self._spawn_new_worker(config)
 
+    async def _run_policy_loader(self, config: KresConfig) -> None:
+        if self._policy_loader:
+            await self._policy_loader.start()
+        else:
+            subprocess = await self._controller.create_subprocess(config, SubprocessType.POLICY_LOADER)
+            await subprocess.start()
+            self._policy_loader = subprocess
+
+    async def _stop_policy_loader(self) -> None:
+        assert self._policy_loader is not None
+        await self._policy_loader.stop()
+        self._policy_loader = None
+
     def _is_gc_running(self) -> bool:
         return self._gc is not None
 
@@ -183,6 +228,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
     async def _reload_system_state(self) -> None:
         async with self._manager_lock:
             self._workers = []
+            self._policy_loader = None
             self._gc = None
             await self._collect_already_running_workers()
 
@@ -214,6 +260,25 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 await self._reload_system_state()
                 await self.apply_config(config, _noretry=True)
 
+    async def apply_policy_loader_config(self, config: KresConfig, _noretry: bool = False) -> None:
+        try:
+            async with self._manager_lock:
+                logger.debug("Running kresd policy loader")
+                await self._run_policy_loader(config)
+        except SubprocessControllerException as e:
+            if _noretry:
+                raise e
+            elif self._fix_counter.is_too_high():
+                logger.error(f"Failed to apply kresd policy loader config: {e}")
+                logger.error("There have already been problems recently, refusing to try to fix it.")
+                await self.forced_shutdown()  # possible improvement - the person who requested this change won't get a response this way
+            else:
+                logger.error(f"Failed to apply kresd policy loader config: {e}")
+                logger.warning("Reloading system state and trying again.")
+                self._fix_counter.increase()
+                await self._reload_system_state()
+                await self.apply_policy_loader_config(config, _noretry=True)
+
     async def stop(self):
         if self._watchdog_task is not None:
             self._watchdog_task.cancel()  # cancel it
@@ -267,6 +332,9 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 expected_ids = [x.id for x in self._workers]
                 if self._gc:
                     expected_ids.append(self._gc.id)
+                if self._policy_loader:
+                    expected_ids.append(self._policy_loader.id)
+
                 invoke_callback = False
 
                 for eid in expected_ids:
@@ -286,7 +354,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 non_registered_ids = detected_subprocesses.keys() - set(expected_ids)
                 if len(non_registered_ids) != 0:
                     logger.error(
-                        "Found additional kresd instances in the system, which shouldn't be there - %s",
+                        "Found additional process in the system, which shouldn't be there - %s",
                         non_registered_ids,
                     )
                     invoke_callback = True