From: Vasek Sraier Date: Fri, 11 Mar 2022 15:28:21 +0000 (+0100) Subject: manager: error handling retry logic based on ERROR_HANDLING.md X-Git-Tag: v6.0.0a1~40^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fef19785e60593745223100f8be411a109010dcd;p=thirdparty%2Fknot-resolver.git manager: error handling retry logic based on ERROR_HANDLING.md --- diff --git a/manager/knot_resolver_manager/compat/asyncio.py b/manager/knot_resolver_manager/compat/asyncio.py index 70485022b..173e7b9c0 100644 --- a/manager/knot_resolver_manager/compat/asyncio.py +++ b/manager/knot_resolver_manager/compat/asyncio.py @@ -32,21 +32,7 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T: else: loop = asyncio.get_event_loop() pfunc = functools.partial(func, *args, **kwargs) - exc: Optional[BaseException] = None - - def exc_catcher(): - nonlocal exc - - try: - return pfunc() - except BaseException as e: - exc = e - return None - - res = await loop.run_in_executor(None, exc_catcher) - # propagate exception in this thread - if exc is not None: - raise exc + res = await loop.run_in_executor(None, pfunc) return res diff --git a/manager/knot_resolver_manager/constants.py b/manager/knot_resolver_manager/constants.py index 5bade6bca..7e3e5b754 100644 --- a/manager/knot_resolver_manager/constants.py +++ b/manager/knot_resolver_manager/constants.py @@ -12,6 +12,8 @@ if TYPE_CHECKING: STARTUP_LOG_LEVEL = logging.DEBUG DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml") +MANAGER_FIX_ATTEMPT_MAX_COUNTER = 2 +FIX_COUNTER_DECREASE_INTERVAL_SEC = 30 * 60 def kresd_executable() -> Path: diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index ed487197e..067cb62d1 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -1,13 +1,19 @@ import asyncio import logging import sys +import time from subprocess import SubprocessError -from typing import List, Optional +from typing import List, NoReturn, Optional import knot_resolver_manager.kresd_controller from knot_resolver_manager.compat.asyncio import create_task from knot_resolver_manager.config_store import ConfigStore -from knot_resolver_manager.constants import WATCHDOG_INTERVAL +from knot_resolver_manager.constants import ( + FIX_COUNTER_DECREASE_INTERVAL_SEC, + MANAGER_FIX_ATTEMPT_MAX_COUNTER, + WATCHDOG_INTERVAL, +) +from knot_resolver_manager.exceptions import SubprocessControllerException from knot_resolver_manager.kresd_controller.interface import ( Subprocess, SubprocessController, @@ -22,6 +28,31 @@ from .datamodel import KresConfig logger = logging.getLogger(__name__) +class _FixCounter: + def __init__(self) -> None: + self._counter = 0 + self._timestamp = time.time() + + def increase(self) -> None: + self._counter += 1 + self._timestamp = time.time() + + def try_decrease(self) -> None: + if time.time() - self._timestamp > FIX_COUNTER_DECREASE_INTERVAL_SEC: + if self._counter > 0: + logger.info( + f"Enough time has passed since last detected instability, decreasing fix attempt counter to {self._counter}" + ) + self._counter -= 1 + self._timestamp = time.time() + + def __str__(self) -> str: + return str(self._counter) + + def is_too_high(self) -> bool: + return self._counter > MANAGER_FIX_ATTEMPT_MAX_COUNTER + + class KresManager: """ Core of the whole operation. Orchestrates individual instances under some @@ -43,6 +74,8 @@ class KresManager: self._manager_lock = asyncio.Lock() self._controller: SubprocessController self._watchdog_task: Optional["asyncio.Task[None]"] = None + self._fix_counter: _FixCounter = _FixCounter() + self._config_store: ConfigStore @staticmethod async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager": @@ -61,18 +94,15 @@ class KresManager: ) else: self._controller = selected_controller + self._config_store = config_store await self._controller.initialize_controller(config_store.get()) self._watchdog_task = create_task(self._watchdog()) - await self._load_system_state() + await self._collect_already_running_workers() # registering the function calls them immediately, therefore after this, the config is applied await config_store.register_verifier(self.validate_config) await config_store.register_on_change_callback(self.apply_config) - async def _load_system_state(self) -> None: - async with self._manager_lock: - await self._collect_already_running_children() - async def _spawn_new_worker(self, config: KresConfig) -> None: subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD) await subprocess.start() @@ -85,7 +115,7 @@ class KresManager: subprocess = self._workers.pop() await subprocess.stop() - async def _collect_already_running_children(self) -> None: + async def _collect_already_running_workers(self) -> None: for subp in await self._controller.get_all_running_instances(): if subp.type == SubprocessType.KRESD: self._workers.append(subp) @@ -137,19 +167,39 @@ class KresManager: logger.debug("Canary process test passed.") return Result.ok(None) - async def apply_config(self, config: KresConfig) -> None: + async def _reload_system_state(self) -> None: async with self._manager_lock: - logger.debug("Applying new config to all workers") - await self._ensure_number_of_children(config, int(config.server.workers)) - await self._rolling_restart(config) - - if self._is_gc_running() != config.server.use_cache_gc: - if config.server.use_cache_gc: - logger.debug("Starting cache GC") - await self._start_gc(config) - else: - logger.debug("Stopping cache GC") - await self._stop_gc() + self._workers = [] + self._gc = None + await self._collect_already_running_workers() + + async def apply_config(self, config: KresConfig, _noretry: bool = False) -> None: + try: + async with self._manager_lock: + logger.debug("Applying config to all workers") + await self._ensure_number_of_children(config, int(config.server.workers)) + await self._rolling_restart(config) + + if self._is_gc_running() != config.server.use_cache_gc: + if config.server.use_cache_gc: + logger.debug("Starting cache GC") + await self._start_gc(config) + else: + logger.debug("Stopping cache GC") + await self._stop_gc() + except SubprocessControllerException as e: + if _noretry: + raise + elif self._fix_counter.is_too_high(): + logger.error(f"Failed to apply config: {e}") + logger.error("There have already been problems recently, refusing to try to fix it.") + await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way + else: + logger.error(f"Failed to apply config: {e}") + logger.warning("Reloading system state and trying again.") + self._fix_counter.increase() + await self._reload_system_state() + await self.apply_config(config, _noretry=True) async def stop(self): if self._watchdog_task is not None: @@ -165,16 +215,41 @@ class KresManager: await self._stop_gc() await self._controller.shutdown_controller() - async def _instability_handler(self) -> None: - logger.error( - "Instability detected. Something is wrong, no idea how to react." " Performing suicide. See you later!" + async def forced_shutdown(self) -> NoReturn: + logger.warning("Collecting all remaining workers...") + await self._reload_system_state() + + logger.warning("Stopping all workers...") + await self.stop() + logger.warning( + "All workers stopped. Terminating. You might see an exception stack trace at the end of the log." ) sys.exit(1) + async def _instability_handler(self) -> None: + if self._fix_counter.is_too_high(): + logger.error( + "Already attempted to many times to fix system state. Refusing to try again and shutting down." + ) + await self.forced_shutdown() + + try: + logger.warning("Instability detected. Dropping known list of workers and reloading it from the system.") + self._fix_counter.increase() + await self._reload_system_state() + logger.warning("Workers reloaded. Applying old config....") + await self.apply_config(self._config_store.get(), _noretry=True) + logger.warning(f"System stability hopefully renewed. Fix attempt counter is currently {self._fix_counter}") + except BaseException: + logger.error("Failed attempting to fix an error. Forcefully shutting down.", exc_info=True) + await self.forced_shutdown() + async def _watchdog(self) -> None: while True: await asyncio.sleep(WATCHDOG_INTERVAL) + self._fix_counter.try_decrease() + try: # gather current state async with self._manager_lock: @@ -205,4 +280,9 @@ class KresManager: logger.error("Knot Resolver watchdog failed with an unexpected exception.", exc_info=True) if invoke_callback: - await self._instability_handler() + try: + await self._instability_handler() + except Exception: + logger.error("Watchdog failed while invoking instability callback", exc_info=True) + logger.error("Violently terminating!") + sys.exit(1) diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py index e087f1de4..51afef52a 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py @@ -110,6 +110,7 @@ def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str], job_path = job_creating_func() except BaseException: loop.quit() + thread.join() raise # then wait for the results