import asyncio
import logging
import sys
+import time
from subprocess import SubprocessError
-from typing import List, Optional
+from typing import List, NoReturn, Optional
import knot_resolver_manager.kresd_controller
from knot_resolver_manager.compat.asyncio import create_task
from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import WATCHDOG_INTERVAL
+from knot_resolver_manager.constants import (
+ FIX_COUNTER_DECREASE_INTERVAL_SEC,
+ MANAGER_FIX_ATTEMPT_MAX_COUNTER,
+ WATCHDOG_INTERVAL,
+)
+from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
logger = logging.getLogger(__name__)
+class _FixCounter:
+ def __init__(self) -> None:
+ self._counter = 0
+ self._timestamp = time.time()
+
+ def increase(self) -> None:
+ self._counter += 1
+ self._timestamp = time.time()
+
+ def try_decrease(self) -> None:
+ if time.time() - self._timestamp > FIX_COUNTER_DECREASE_INTERVAL_SEC:
+ if self._counter > 0:
+ logger.info(
+ f"Enough time has passed since last detected instability, decreasing fix attempt counter to {self._counter}"
+ )
+ self._counter -= 1
+ self._timestamp = time.time()
+
+ def __str__(self) -> str:
+ return str(self._counter)
+
+ def is_too_high(self) -> bool:
+ return self._counter > MANAGER_FIX_ATTEMPT_MAX_COUNTER
+
+
class KresManager:
"""
Core of the whole operation. Orchestrates individual instances under some
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
self._watchdog_task: Optional["asyncio.Task[None]"] = None
+ self._fix_counter: _FixCounter = _FixCounter()
+ self._config_store: ConfigStore
@staticmethod
async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager":
)
else:
self._controller = selected_controller
+ self._config_store = config_store
await self._controller.initialize_controller(config_store.get())
self._watchdog_task = create_task(self._watchdog())
- await self._load_system_state()
+ await self._collect_already_running_workers()
# registering the function calls them immediately, therefore after this, the config is applied
await config_store.register_verifier(self.validate_config)
await config_store.register_on_change_callback(self.apply_config)
- async def _load_system_state(self) -> None:
- async with self._manager_lock:
- await self._collect_already_running_children()
-
async def _spawn_new_worker(self, config: KresConfig) -> None:
subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD)
await subprocess.start()
subprocess = self._workers.pop()
await subprocess.stop()
- async def _collect_already_running_children(self) -> None:
+ async def _collect_already_running_workers(self) -> None:
for subp in await self._controller.get_all_running_instances():
if subp.type == SubprocessType.KRESD:
self._workers.append(subp)
logger.debug("Canary process test passed.")
return Result.ok(None)
- async def apply_config(self, config: KresConfig) -> None:
+ async def _reload_system_state(self) -> None:
async with self._manager_lock:
- logger.debug("Applying new config to all workers")
- await self._ensure_number_of_children(config, int(config.server.workers))
- await self._rolling_restart(config)
-
- if self._is_gc_running() != config.server.use_cache_gc:
- if config.server.use_cache_gc:
- logger.debug("Starting cache GC")
- await self._start_gc(config)
- else:
- logger.debug("Stopping cache GC")
- await self._stop_gc()
+ self._workers = []
+ self._gc = None
+ await self._collect_already_running_workers()
+
+ async def apply_config(self, config: KresConfig, _noretry: bool = False) -> None:
+ try:
+ async with self._manager_lock:
+ logger.debug("Applying config to all workers")
+ await self._ensure_number_of_children(config, int(config.server.workers))
+ await self._rolling_restart(config)
+
+ if self._is_gc_running() != config.server.use_cache_gc:
+ if config.server.use_cache_gc:
+ logger.debug("Starting cache GC")
+ await self._start_gc(config)
+ else:
+ logger.debug("Stopping cache GC")
+ await self._stop_gc()
+ except SubprocessControllerException as e:
+ if _noretry:
+ raise
+ elif self._fix_counter.is_too_high():
+ logger.error(f"Failed to apply config: {e}")
+ logger.error("There have already been problems recently, refusing to try to fix it.")
+ await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way
+ else:
+ logger.error(f"Failed to apply config: {e}")
+ logger.warning("Reloading system state and trying again.")
+ self._fix_counter.increase()
+ await self._reload_system_state()
+ await self.apply_config(config, _noretry=True)
async def stop(self):
if self._watchdog_task is not None:
await self._stop_gc()
await self._controller.shutdown_controller()
- async def _instability_handler(self) -> None:
- logger.error(
- "Instability detected. Something is wrong, no idea how to react." " Performing suicide. See you later!"
+ async def forced_shutdown(self) -> NoReturn:
+ logger.warning("Collecting all remaining workers...")
+ await self._reload_system_state()
+
+ logger.warning("Stopping all workers...")
+ await self.stop()
+ logger.warning(
+ "All workers stopped. Terminating. You might see an exception stack trace at the end of the log."
)
sys.exit(1)
+ async def _instability_handler(self) -> None:
+ if self._fix_counter.is_too_high():
+ logger.error(
+ "Already attempted to many times to fix system state. Refusing to try again and shutting down."
+ )
+ await self.forced_shutdown()
+
+ try:
+ logger.warning("Instability detected. Dropping known list of workers and reloading it from the system.")
+ self._fix_counter.increase()
+ await self._reload_system_state()
+ logger.warning("Workers reloaded. Applying old config....")
+ await self.apply_config(self._config_store.get(), _noretry=True)
+ logger.warning(f"System stability hopefully renewed. Fix attempt counter is currently {self._fix_counter}")
+ except BaseException:
+ logger.error("Failed attempting to fix an error. Forcefully shutting down.", exc_info=True)
+ await self.forced_shutdown()
+
async def _watchdog(self) -> None:
while True:
await asyncio.sleep(WATCHDOG_INTERVAL)
+ self._fix_counter.try_decrease()
+
try:
# gather current state
async with self._manager_lock:
logger.error("Knot Resolver watchdog failed with an unexpected exception.", exc_info=True)
if invoke_callback:
- await self._instability_handler()
+ try:
+ await self._instability_handler()
+ except Exception:
+ logger.error("Watchdog failed while invoking instability callback", exc_info=True)
+ logger.error("Violently terminating!")
+ sys.exit(1)