]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: error handling retry logic based on ERROR_HANDLING.md
authorVasek Sraier <git@vakabus.cz>
Fri, 11 Mar 2022 15:28:21 +0000 (16:28 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:54 +0000 (16:17 +0200)
manager/knot_resolver_manager/compat/asyncio.py
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py

index 70485022b58295f3a6fdb36362010c264072ea5e..173e7b9c081f1dc4bea5570997e018cd55416471 100644 (file)
@@ -32,21 +32,7 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
     else:
         loop = asyncio.get_event_loop()
         pfunc = functools.partial(func, *args, **kwargs)
-        exc: Optional[BaseException] = None
-
-        def exc_catcher():
-            nonlocal exc
-
-            try:
-                return pfunc()
-            except BaseException as e:
-                exc = e
-                return None
-
-        res = await loop.run_in_executor(None, exc_catcher)
-        # propagate exception in this thread
-        if exc is not None:
-            raise exc
+        res = await loop.run_in_executor(None, pfunc)
         return res
 
 
index 5bade6bca606f083ce8c866c1c891fd25b36bbfd..7e3e5b75409ca02f13d194965e585a70047c6c58 100644 (file)
@@ -12,6 +12,8 @@ if TYPE_CHECKING:
 
 STARTUP_LOG_LEVEL = logging.DEBUG
 DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml")
+MANAGER_FIX_ATTEMPT_MAX_COUNTER = 2
+FIX_COUNTER_DECREASE_INTERVAL_SEC = 30 * 60
 
 
 def kresd_executable() -> Path:
index ed487197e70523972a2bc78c8a5c1c967e749675..067cb62d109bc1bf47607a384f60c067e8580d83 100644 (file)
@@ -1,13 +1,19 @@
 import asyncio
 import logging
 import sys
+import time
 from subprocess import SubprocessError
-from typing import List, Optional
+from typing import List, NoReturn, Optional
 
 import knot_resolver_manager.kresd_controller
 from knot_resolver_manager.compat.asyncio import create_task
 from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import WATCHDOG_INTERVAL
+from knot_resolver_manager.constants import (
+    FIX_COUNTER_DECREASE_INTERVAL_SEC,
+    MANAGER_FIX_ATTEMPT_MAX_COUNTER,
+    WATCHDOG_INTERVAL,
+)
+from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import (
     Subprocess,
     SubprocessController,
@@ -22,6 +28,31 @@ from .datamodel import KresConfig
 logger = logging.getLogger(__name__)
 
 
+class _FixCounter:
+    def __init__(self) -> None:
+        self._counter = 0
+        self._timestamp = time.time()
+
+    def increase(self) -> None:
+        self._counter += 1
+        self._timestamp = time.time()
+
+    def try_decrease(self) -> None:
+        if time.time() - self._timestamp > FIX_COUNTER_DECREASE_INTERVAL_SEC:
+            if self._counter > 0:
+                logger.info(
+                    f"Enough time has passed since last detected instability, decreasing fix attempt counter to {self._counter}"
+                )
+                self._counter -= 1
+                self._timestamp = time.time()
+
+    def __str__(self) -> str:
+        return str(self._counter)
+
+    def is_too_high(self) -> bool:
+        return self._counter > MANAGER_FIX_ATTEMPT_MAX_COUNTER
+
+
 class KresManager:
     """
     Core of the whole operation. Orchestrates individual instances under some
@@ -43,6 +74,8 @@ class KresManager:
         self._manager_lock = asyncio.Lock()
         self._controller: SubprocessController
         self._watchdog_task: Optional["asyncio.Task[None]"] = None
+        self._fix_counter: _FixCounter = _FixCounter()
+        self._config_store: ConfigStore
 
     @staticmethod
     async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager":
@@ -61,18 +94,15 @@ class KresManager:
             )
         else:
             self._controller = selected_controller
+        self._config_store = config_store
         await self._controller.initialize_controller(config_store.get())
         self._watchdog_task = create_task(self._watchdog())
-        await self._load_system_state()
+        await self._collect_already_running_workers()
 
         # registering the function calls them immediately, therefore after this, the config is applied
         await config_store.register_verifier(self.validate_config)
         await config_store.register_on_change_callback(self.apply_config)
 
-    async def _load_system_state(self) -> None:
-        async with self._manager_lock:
-            await self._collect_already_running_children()
-
     async def _spawn_new_worker(self, config: KresConfig) -> None:
         subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD)
         await subprocess.start()
@@ -85,7 +115,7 @@ class KresManager:
         subprocess = self._workers.pop()
         await subprocess.stop()
 
-    async def _collect_already_running_children(self) -> None:
+    async def _collect_already_running_workers(self) -> None:
         for subp in await self._controller.get_all_running_instances():
             if subp.type == SubprocessType.KRESD:
                 self._workers.append(subp)
@@ -137,19 +167,39 @@ class KresManager:
             logger.debug("Canary process test passed.")
             return Result.ok(None)
 
-    async def apply_config(self, config: KresConfig) -> None:
+    async def _reload_system_state(self) -> None:
         async with self._manager_lock:
-            logger.debug("Applying new config to all workers")
-            await self._ensure_number_of_children(config, int(config.server.workers))
-            await self._rolling_restart(config)
-
-            if self._is_gc_running() != config.server.use_cache_gc:
-                if config.server.use_cache_gc:
-                    logger.debug("Starting cache GC")
-                    await self._start_gc(config)
-                else:
-                    logger.debug("Stopping cache GC")
-                    await self._stop_gc()
+            self._workers = []
+            self._gc = None
+            await self._collect_already_running_workers()
+
+    async def apply_config(self, config: KresConfig, _noretry: bool = False) -> None:
+        try:
+            async with self._manager_lock:
+                logger.debug("Applying config to all workers")
+                await self._ensure_number_of_children(config, int(config.server.workers))
+                await self._rolling_restart(config)
+
+                if self._is_gc_running() != config.server.use_cache_gc:
+                    if config.server.use_cache_gc:
+                        logger.debug("Starting cache GC")
+                        await self._start_gc(config)
+                    else:
+                        logger.debug("Stopping cache GC")
+                        await self._stop_gc()
+        except SubprocessControllerException as e:
+            if _noretry:
+                raise
+            elif self._fix_counter.is_too_high():
+                logger.error(f"Failed to apply config: {e}")
+                logger.error("There have already been problems recently, refusing to try to fix it.")
+                await self.forced_shutdown()  # possible improvement - the person who requested this change won't get a response this way
+            else:
+                logger.error(f"Failed to apply config: {e}")
+                logger.warning("Reloading system state and trying again.")
+                self._fix_counter.increase()
+                await self._reload_system_state()
+                await self.apply_config(config, _noretry=True)
 
     async def stop(self):
         if self._watchdog_task is not None:
@@ -165,16 +215,41 @@ class KresManager:
                 await self._stop_gc()
             await self._controller.shutdown_controller()
 
-    async def _instability_handler(self) -> None:
-        logger.error(
-            "Instability detected. Something is wrong, no idea how to react." " Performing suicide. See you later!"
+    async def forced_shutdown(self) -> NoReturn:
+        logger.warning("Collecting all remaining workers...")
+        await self._reload_system_state()
+
+        logger.warning("Stopping all workers...")
+        await self.stop()
+        logger.warning(
+            "All workers stopped. Terminating. You might see an exception stack trace at the end of the log."
         )
         sys.exit(1)
 
+    async def _instability_handler(self) -> None:
+        if self._fix_counter.is_too_high():
+            logger.error(
+                "Already attempted to many times to fix system state. Refusing to try again and shutting down."
+            )
+            await self.forced_shutdown()
+
+        try:
+            logger.warning("Instability detected. Dropping known list of workers and reloading it from the system.")
+            self._fix_counter.increase()
+            await self._reload_system_state()
+            logger.warning("Workers reloaded. Applying old config....")
+            await self.apply_config(self._config_store.get(), _noretry=True)
+            logger.warning(f"System stability hopefully renewed. Fix attempt counter is currently {self._fix_counter}")
+        except BaseException:
+            logger.error("Failed attempting to fix an error. Forcefully shutting down.", exc_info=True)
+            await self.forced_shutdown()
+
     async def _watchdog(self) -> None:
         while True:
             await asyncio.sleep(WATCHDOG_INTERVAL)
 
+            self._fix_counter.try_decrease()
+
             try:
                 # gather current state
                 async with self._manager_lock:
@@ -205,4 +280,9 @@ class KresManager:
                 logger.error("Knot Resolver watchdog failed with an unexpected exception.", exc_info=True)
 
             if invoke_callback:
-                await self._instability_handler()
+                try:
+                    await self._instability_handler()
+                except Exception:
+                    logger.error("Watchdog failed while invoking instability callback", exc_info=True)
+                    logger.error("Violently terminating!")
+                    sys.exit(1)
index e087f1de410670eb7bbb2d09c7d713c76f76de63..51afef52ac509a084a991eb475d9af0c3e922aa7 100644 (file)
@@ -110,6 +110,7 @@ def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str],
         job_path = job_creating_func()
     except BaseException:
         loop.quit()
+        thread.join()
         raise
 
     # then wait for the results