]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
Failed worker notifications
authorVaclav Sraier <vaclav.sraier@nic.cz>
Sat, 21 Aug 2021 14:29:25 +0000 (14:29 +0000)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:52 +0000 (16:17 +0200)
17 files changed:
manager/README.md
manager/containers/ci/Containerfile
manager/etc/knot-resolver/.gitignore
manager/integration/runner.py
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/kres_id.py [new file with mode: 0644]
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/__init__.py
manager/knot_resolver_manager/kresd_controller/interface.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/config.py
manager/knot_resolver_manager/kresd_controller/systemd/__init__.py
manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/utils/__init__.py
manager/knot_resolver_manager/utils/async_utils.py
manager/pyproject.toml

index 4d352721243ca43d415c746ded23dbf5145e53f9..60681b890b33e76040f33fa951944e733855f593 100644 (file)
@@ -57,10 +57,6 @@ Before commiting, please ensure that both `poe check` and `poe test` pass. Those
 
 This project uses [`apkg`](https://gitlab.nic.cz/packaging/apkg) for packaging. See [`distro/README.md`](distro/README.md) for packaging specific instructions.
 
-Not yet properly implemented. Ideal situation would be a command like `poe package` which would create all possible packages.
-
-Temporary solution to build a wheel/sdist - just call `poetry build`. The result will be in the `dist/` directory.
-
 ## FAQ
 
 ### What all those dev dependencies for?
index 5de016b4e2699df7b3509d552ae3902fdb47adc7..9984e883421a81b7980763bed421cdcafe01f772 100644 (file)
@@ -19,6 +19,7 @@ ENV POETRY_NO_INTERACTION=1 \
   PYTHONHASHSEED=random \
   PYTHONDONTWRITEBYTECODE=1
 
+RUN dnf install -y knot-resolver procps-ng
 
 # How does this work?
 # ===================
@@ -39,7 +40,7 @@ ENV POETRY_NO_INTERACTION=1 \
 # We do not have to install the dependencies every single time. They are cached in the container itself and 
 # we can rebuild it only when it's definition or the list of dependencies changes.
 
-COPY pyproject.toml package.json .
+COPY pyproject.toml poetry.lock package.json .
 RUN source $HOME/.poetry/env \
   && poetry config --list \
   && poetry env use $(which python3.6) \
index 7a814b8799b61e8986e36b153627e0295238a0b4..fb64123a482a7bd9528c02f59e191d05037149a7 100644 (file)
@@ -1,2 +1,2 @@
 runtime/
-cache/
\ No newline at end of file
+cache/ 
\ No newline at end of file
index 3ae46e2871d18b41c6408de3ebf124f93146e3e3..5741f6ee3c3572fbd5c175742807fb25af74d795 100644 (file)
@@ -1,5 +1,5 @@
 import logging
-from sys import exc_info
+import sys
 from typing import Callable
 
 from knot_resolver_manager.client import KnotManagerClient, count_running_kresds, start_manager_in_background
@@ -15,14 +15,15 @@ Test = Callable[[KnotManagerClient], None]
 logger = logging.getLogger(__name__)
 
 
-def test_wrapper(test: Test):
+def test_wrapper(test: Test) -> bool:
     p = start_manager_in_background(HOST, PORT)
     client = KnotManagerClient(BASE_URL)
     client.wait_for_initialization()
 
     logger.info("Starting test %s", test.__name__)
     try:
-        res = test(client)
+        test(client)
+        res = True
     except AssertionError:
         logger.error("Test %s failed", exc_info=True)
         res = False
@@ -77,5 +78,7 @@ def crash_resistance(client: KnotManagerClient):
 
 if __name__ == "__main__":
     logging.basicConfig(level=logging.DEBUG)
-    test_wrapper(worker_count)
-    # test_wrapper(crash_resistance)
+    success = True
+    success &= test_wrapper(worker_count)
+    # success &= test_wrapper(crash_resistance)
+    sys.exit(int(not success))
index ec68541037fb955a36348610b5f1f94ad45c8d0b..ebdb97d5f59666c8d9fb728154238c5d3768ab15 100644 (file)
@@ -31,3 +31,8 @@ SUPERVISORD_SUBPROCESS_LOG_DIR.mkdir(exist_ok=True)
 MANAGER_CONFIG_FILE = CONFIGURATION_DIR / "config.yml"
 
 LISTEN_SOCKET_PATH = RUNTIME_DIR / "manager.sock"
+
+"""
+Used in KresdManager. It's a number of seconds in between system health checks.
+"""
+WATCHDOG_INTERVAL: float = 5
diff --git a/manager/knot_resolver_manager/kres_id.py b/manager/knot_resolver_manager/kres_id.py
new file mode 100644 (file)
index 0000000..ce072ed
--- /dev/null
@@ -0,0 +1,57 @@
+import itertools
+import weakref
+from typing import Optional
+
+from knot_resolver_manager.utils import ignore_exceptions_optional
+
+
+class KresID:
+    """
+    ID object. Effectively only a wrapper around an int, so that the references
+    behave normally (bypassing integer interning and other optimizations)
+    """
+
+    def __init__(self, n: int):
+        self._id = n
+        self._repr: Optional[str] = None
+
+    def set_custom_str_representation(self, representation: str):
+        self._repr = representation
+
+    def __str__(self) -> str:
+        if self._repr is None:
+            return str(self._id)
+        else:
+            return self._repr
+
+    def __hash__(self) -> int:
+        return self._id
+
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, KresID) and self._id == o._id
+
+
+_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
+
+
+def alloc() -> KresID:
+    for i in itertools.count(start=1):
+        val = KresID(i)
+        if val not in _used:
+            _used.add(val)
+            return val
+
+    raise RuntimeError("Reached an end of an infinite loop. How?")
+
+
+def alloc_from_string(val: str) -> KresID:
+    int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
+    if int_val is not None:
+        res = KresID(int_val)
+        assert res not in _used
+        _used.add(res)
+        return res
+    else:
+        res = alloc()
+        res.set_custom_str_representation(val)
+        return res
index f369d05a0a24edc6ff6fe4cce68e671f349856b7..ef661e3cfba04f39363b7abedaac97a58b65f289 100644 (file)
@@ -1,14 +1,21 @@
 import asyncio
-import itertools
 import logging
-import weakref
+import sys
+from asyncio.futures import Future
 from subprocess import SubprocessError
-from typing import List, Optional, Type
+from typing import List, Optional
 
-from knot_resolver_manager.constants import KRESD_CONFIG_FILE
+import knot_resolver_manager.kresd_controller
+from knot_resolver_manager import kres_id
+from knot_resolver_manager.compat.asyncio import create_task
+from knot_resolver_manager.constants import KRESD_CONFIG_FILE, WATCHDOG_INTERVAL
 from knot_resolver_manager.exceptions import ValidationException
-from knot_resolver_manager.kresd_controller import get_best_controller_implementation
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from knot_resolver_manager.kresd_controller.interface import (
+    Subprocess,
+    SubprocessController,
+    SubprocessStatus,
+    SubprocessType,
+)
 from knot_resolver_manager.utils.async_utils import writefile
 
 from .datamodel import KresConfig
@@ -16,45 +23,6 @@ from .datamodel import KresConfig
 logger = logging.getLogger(__name__)
 
 
-class _PrettyID:
-    """
-    ID object. Effectively only a wrapper around an int, so that the references
-    behave normally (bypassing integer interning and other optimizations)
-    """
-
-    def __init__(self, n: int):
-        self._id = n
-
-    def __str__(self):
-        return str(self._id)
-
-    def __hash__(self) -> int:
-        return self._id
-
-    def __eq__(self, o: object) -> bool:
-        return isinstance(o, _PrettyID) and self._id == o._id
-
-
-class _PrettyIDAllocator:
-    """
-    Pretty numeric ID allocator. Keeps weak refences to the IDs it has
-    allocated. The IDs get recycled once the previously allocated ID
-    objects get garbage collected
-    """
-
-    def __init__(self):
-        self._used: "weakref.WeakSet[_PrettyID]" = weakref.WeakSet()
-
-    def alloc(self) -> _PrettyID:
-        for i in itertools.count(start=1):
-            val = _PrettyID(i)
-            if val not in self._used:
-                self._used.add(val)
-                return val
-
-        raise RuntimeError("Reached an end of an infinite loop. How?")
-
-
 class KresManager:
     """
     Core of the whole operation. Orchestrates individual instances under some
@@ -63,34 +31,68 @@ class KresManager:
     Instantiate with `KresManager.create()`, not with the usual constructor!
     """
 
-    @classmethod
-    async def create(cls: Type["KresManager"], controller: Optional[SubprocessController]) -> "KresManager":
-        obj = cls()
-        await obj._async_init(controller)  # pylint: disable=protected-access
-        return obj
+    _instance_lock = asyncio.Lock()
+    _instance: Optional["KresManager"] = None
+
+    @staticmethod
+    async def create_instance(selected_controller: Optional[SubprocessController]) -> "KresManager":
+        """
+        Creates new singleton instance of KresManager. Can be called only once. Afterwards, use
+        `KresManager.get_instance()` to obtain the already existing instance
+        """
+
+        assert KresManager._instance is None
+
+        async with KresManager._instance_lock:
+            # trying to create, but racing and somebody already did it
+            if KresManager._instance is not None:
+                raise AssertionError("Must NOT call `create_instance` multiple times - race detected!")
+
+            # create it for real
+            inst = KresManager(_i_know_what_i_am_doing=True)
+            await inst._async_init(selected_controller)  # pylint: disable=protected-access
+            KresManager._instance = inst
+            return inst
+
+    @staticmethod
+    def get_instance() -> "KresManager":
+        """
+        Obtain reference to the singleton instance of this class. If you want to create an instance,
+        use `create_instance()`
+        """
+        assert KresManager._instance is not None
+        return KresManager._instance
 
     async def _async_init(self, selected_controller: Optional[SubprocessController]):
         if selected_controller is None:
-            self._controller = await get_best_controller_implementation()
+            self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation()
         else:
             self._controller = selected_controller
         await self._controller.initialize_controller()
+        self._watchdog_task = create_task(self._watchdog())
         await self.load_system_state()
 
-    def __init__(self):
+    def __init__(self, _i_know_what_i_am_doing: bool = False):
+        if not _i_know_what_i_am_doing:
+            logger.error(
+                "Trying to create an instance of KresManager using normal contructor. Please use "
+                "`KresManager.get_instance()` instead"
+            )
+            sys.exit(1)
+
         self._workers: List[Subprocess] = []
         self._gc: Optional[Subprocess] = None
         self._manager_lock = asyncio.Lock()
         self._controller: SubprocessController
         self._last_used_config: Optional[KresConfig] = None
-        self._id_allocator = _PrettyIDAllocator()
+        self._watchdog_task: Optional["Future[None]"] = None
 
     async def load_system_state(self):
         async with self._manager_lock:
             await self._collect_already_running_children()
 
     async def _spawn_new_worker(self):
-        subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, self._id_allocator.alloc())
+        subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc())
         await subprocess.start()
         self._workers.append(subprocess)
 
@@ -129,7 +131,7 @@ class KresManager:
         return self._gc is not None
 
     async def _start_gc(self):
-        subprocess = await self._controller.create_subprocess(SubprocessType.GC, "gc")
+        subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc())
         await subprocess.start()
         self._gc = subprocess
 
@@ -175,5 +177,38 @@ class KresManager:
             await self._ensure_number_of_children(0)
             await self._controller.shutdown_controller()
 
+        if self._watchdog_task is not None:
+            self._watchdog_task.cancel()
+
     def get_last_used_config(self) -> Optional[KresConfig]:
         return self._last_used_config
+
+    async def _instability_handler(self) -> None:
+        logger.error("Instability callback invoked. No idea how to react, performing suicide. See you later!")
+        sys.exit(1)
+
+    async def _watchdog(self) -> None:
+        while True:
+            await asyncio.sleep(WATCHDOG_INTERVAL)
+
+            # gather current state
+            units = {u.id: u for u in await self._controller.get_subprocess_info()}
+            worker_ids = [x.id for x in self._workers]
+            invoke_callback = False
+
+            for w in worker_ids:
+                if w not in units:
+                    logger.error("Expected to find subprocess with id '%s' in the system, but did not.", w)
+                    invoke_callback = True
+                    continue
+
+                if units[w].status is SubprocessStatus.FAILED:
+                    logger.error("Subprocess '%s' is failed.", w)
+                    invoke_callback = True
+                    continue
+
+                if units[w].status is SubprocessStatus.UNKNOWN:
+                    logger.warning("Subprocess '%s' is in unknown state!", w)
+
+            if invoke_callback:
+                await self._instability_handler()
index 3d2f7c29d1de91180de41de54c672c6e2c2b2ad1..c3e6750dd3095388c4cf98ca8043897cb722214d 100644 (file)
@@ -1,6 +1,6 @@
 """
 This file contains autodetection logic for available subprocess controllers. Because we have to catch errors
-from imports, you can not see a simple list, but it's more complicated.
+from imports, they are located in functions which are invoked at the end of this file.
 """
 # pylint: disable=import-outside-toplevel
 
index 7a87843376de3c432ec90ad89e218c825236906d..6ed63f2ebacccf3da03fa4937b1d7c943bad396e 100644 (file)
@@ -1,5 +1,8 @@
+from dataclasses import dataclass
 from enum import Enum, auto
-from typing import Iterable
+from typing import Iterable, List
+
+from knot_resolver_manager.kres_id import KresID
 
 
 class SubprocessType(Enum):
@@ -39,6 +42,18 @@ class Subprocess:
         return hash(type(self)) ^ hash(self.type) ^ hash(self.id)
 
 
+class SubprocessStatus(Enum):
+    RUNNING = auto()
+    FAILED = auto()
+    UNKNOWN = auto()
+
+
+@dataclass
+class SubprocessInfo:
+    id: str
+    status: SubprocessStatus
+
+
 class SubprocessController:
     """
     The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
@@ -65,7 +80,7 @@ class SubprocessController:
         """
         raise NotImplementedError()
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
         """
         Return a Subprocess object which can be operated on. The subprocess is not
         started or in any way active after this call. That has to be performaed manually
@@ -74,3 +89,10 @@ class SubprocessController:
         Must NOT be called before initialize_controller()
         """
         raise NotImplementedError()
+
+    async def get_subprocess_info(self) -> List[SubprocessInfo]:
+        """
+        Get a status of running subprocesses as seen by the controller. This method  actively polls
+        for information.
+        """
+        raise NotImplementedError()
index 383df85bb0f5fe8fb3b67a50109a4c206db51805..ee51e93107fa2d46d3f71cc243f4ccd924a702f1 100644 (file)
@@ -1,9 +1,14 @@
 import logging
-from asyncio.futures import Future
-from typing import Any, Iterable, Set
-
-from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from typing import Iterable, List, Set
+
+from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.kres_id import KresID, alloc_from_string
+from knot_resolver_manager.kresd_controller.interface import (
+    Subprocess,
+    SubprocessController,
+    SubprocessInfo,
+    SubprocessType,
+)
 
 from .config import (
     SupervisordConfig,
@@ -11,20 +16,20 @@ from .config import (
     is_supervisord_available,
     is_supervisord_running,
     list_ids_from_existing_config,
+    list_subprocesses,
     restart,
     start_supervisord,
     stop_supervisord,
     update_config,
-    watchdog,
 )
 
 logger = logging.getLogger(__name__)
 
 
 class SupervisordSubprocess(Subprocess):
-    def __init__(self, controller: "SupervisordSubprocessController", id_: object, type_: SubprocessType):
+    def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType):
         self._controller: "SupervisordSubprocessController" = controller
-        self._id = id_
+        self._id: KresID = id_
         self._type: SubprocessType = type_
 
     @property
@@ -51,7 +56,6 @@ class SupervisordSubprocess(Subprocess):
 class SupervisordSubprocessController(SubprocessController):
     def __init__(self):
         self._running_instances: Set[SupervisordSubprocess] = set()
-        self._watchdog_task: "Future[Any]"
 
     def __str__(self):
         return "supervisord"
@@ -63,6 +67,8 @@ class SupervisordSubprocessController(SubprocessController):
         res = await is_supervisord_available()
         if not res:
             logger.info("Failed to find usable supervisord.")
+
+        logger.debug("Detection - supervisord controller is available for use")
         return res
 
     async def _update_config_with_real_state(self):
@@ -70,7 +76,7 @@ class SupervisordSubprocessController(SubprocessController):
         if running:
             ids = await list_ids_from_existing_config()
             for tp, id_ in ids:
-                self._running_instances.add(SupervisordSubprocess(self, id_, tp))
+                self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp))
 
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         await self._update_config_with_real_state()
@@ -83,10 +89,8 @@ class SupervisordSubprocessController(SubprocessController):
         if not await is_supervisord_running():
             config = self._create_config()
             await start_supervisord(config)
-        self._watchdog_task = create_task(watchdog())
 
     async def shutdown_controller(self) -> None:
-        self._watchdog_task.cancel()
         await stop_supervisord()
 
     async def start_subprocess(self, subprocess: SupervisordSubprocess):
@@ -103,5 +107,8 @@ class SupervisordSubprocessController(SubprocessController):
         assert subprocess in self._running_instances
         await restart(subprocess.id)
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
         return SupervisordSubprocess(self, id_hint, subprocess_type)
+
+    async def get_subprocess_info(self) -> List[SubprocessInfo]:
+        return await to_thread(list_subprocesses)
index ae77b729c03f5a2b669dbb30c2128fc833944eba..3124762341b5dfce217481656358d5023a2a6535 100644 (file)
@@ -1,9 +1,7 @@
-import asyncio
 import configparser
 import logging
 import os
 import signal
-import sys
 from os import kill
 from pathlib import Path
 from typing import Any, List, Optional, Set, Tuple
@@ -12,7 +10,6 @@ from xmlrpc.client import ServerProxy
 import supervisor.xmlrpc
 from jinja2 import Template
 
-from knot_resolver_manager.compat.asyncio import to_thread
 from knot_resolver_manager.compat.dataclasses import dataclass
 from knot_resolver_manager.constants import (
     GC_EXECUTABLE,
@@ -27,7 +24,12 @@ from knot_resolver_manager.constants import (
     SUPERVISORD_SOCK,
     SUPERVISORD_SUBPROCESS_LOG_DIR,
 )
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessType
+from knot_resolver_manager.kresd_controller.interface import (
+    Subprocess,
+    SubprocessInfo,
+    SubprocessStatus,
+    SubprocessType,
+)
 from knot_resolver_manager.utils.async_utils import (
     call,
     read_resource,
@@ -122,13 +124,28 @@ async def is_supervisord_running() -> bool:
         return True
 
 
-def list_fatal_subprocesses_ids() -> List[str]:
+def list_subprocesses() -> List[SubprocessInfo]:
     proxy = ServerProxy(
         "http://127.0.0.1",
         transport=supervisor.xmlrpc.SupervisorTransport(None, None, serverurl="unix://" + str(SUPERVISORD_SOCK)),
     )
     processes: Any = proxy.supervisor.getAllProcessInfo()
-    return [pr["name"] for pr in processes if pr["statename"] == "FATAL"]
+
+    def convert(proc: Any) -> SubprocessInfo:
+        conversion_tbl = {
+            "FATAL": SubprocessStatus.FAILED,
+            "EXITED": SubprocessStatus.FAILED,
+            "RUNNING": SubprocessStatus.RUNNING,
+        }
+
+        if proc["statename"] in conversion_tbl:
+            status = conversion_tbl[proc["statename"]]
+        else:
+            status = SubprocessStatus.UNKNOWN
+
+        return SubprocessInfo(id=proc["name"], status=status)
+
+    return [convert(pr) for pr in processes]
 
 
 def create_id(type_name: SubprocessType, id_: object) -> str:
@@ -151,39 +168,3 @@ async def list_ids_from_existing_config() -> List[Tuple[SubprocessType, str]]:
             program_id = section.replace("program:", "")
             res.append(parse_id(program_id))
     return res
-
-
-async def watchdog() -> None:
-    while True:
-        # the sleep is split into two, because is very likely a problem will occur
-        # just after start
-        await asyncio.sleep(10)
-
-        logger.debug("Watchdog running and checking system's sanity")
-
-        # check that supervisord is running fine
-        if not await is_supervisord_running():
-            logger.error(
-                "Supervisord is not running! It might have crashed, it might "
-                "have been stopped. This behavior is unexpected and we can't "
-                "really tell what's happening right now. The safest option is "
-                "to terminate... Sorry and bye!"
-            )
-            sys.exit(1)
-
-        # check that there are no subprocesses in FATAL state
-        fatals = await to_thread(list_fatal_subprocesses_ids)
-        if len(fatals) != 0:
-            logger.error(
-                "Some kresd instances are in a FATAL state. The configuration "
-                "provided was probably invalid and passed the validation. You "
-                "might find it helpful to look at logfiles for ids %s."
-                "Sadly, there is no safer way forward than a simple suicide.",
-                fatals,
-            )
-            logger.info("Killing supervisord")
-            await stop_supervisord()
-            logger.info("So Long, and Thanks for All the Fish")
-            sys.exit(1)
-
-        await asyncio.sleep(WATCHDOG_INTERVAL - 10)
index 98a0e3707158e630475ede2cac86aa875e2ac429..6fb456730393ac70148de37500ad687425a2aad5 100644 (file)
@@ -4,7 +4,15 @@ from enum import Enum, auto
 from typing import Iterable, List
 
 from knot_resolver_manager import compat
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string
+from knot_resolver_manager.kresd_controller.interface import (
+    Subprocess,
+    SubprocessController,
+    SubprocessInfo,
+    SubprocessStatus,
+    SubprocessType,
+)
 from knot_resolver_manager.utils.async_utils import call
 
 from . import dbus_api as systemd
@@ -21,12 +29,12 @@ class SystemdSubprocess(Subprocess):
     def __init__(
         self,
         type_: SubprocessType,
-        id_: object,
+        id_: KresID,
         systemd_type: systemd.SystemdType,
         persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
     ):
         self._type = type_
-        self._id = id_
+        self._id: KresID = id_
         self._systemd_type = systemd_type
         self._persistance_type = persistance_type
 
@@ -40,6 +48,12 @@ class SystemdSubprocess(Subprocess):
             ]
             return f"kresd{sep}{self._id}.service"
 
+    @staticmethod
+    def id_could_be_ours(unit_name: str) -> bool:
+        is_ours = unit_name == "kres-cache-gc.service"
+        is_ours |= unit_name.startswith("kresd") and unit_name.endswith(".service")
+        return is_ours
+
     @property
     def type(self):
         return self._type
@@ -47,6 +61,9 @@ class SystemdSubprocess(Subprocess):
     async def is_running(self) -> bool:
         raise NotImplementedError()
 
+    async def _on_unexpected_termination(self):
+        logger.warning("Detected unexpected termination of unit %s", self.id)
+
     async def start(self):
         if self._persistance_type is SystemdPersistanceType.PERSISTENT:
             await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id)
@@ -111,15 +128,22 @@ class SystemdSubprocessController(SubprocessController):
 
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         res: List[SystemdSubprocess] = []
-        units = await compat.asyncio.to_thread(systemd.list_units, self._systemd_type)
+        units = await compat.asyncio.to_thread(systemd.list_unit_names, self._systemd_type)
         for unit in units:
             u: str = unit
             if u.startswith("kresd") and u.endswith(".service"):
                 iden = u.replace("kresd", "")[1:].replace(".service", "")
                 persistance_type = SystemdPersistanceType.PERSISTENT if "@" in u else SystemdPersistanceType.TRANSIENT
-                res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type, persistance_type))
+                res.append(
+                    SystemdSubprocess(
+                        SubprocessType.KRESD,
+                        alloc_from_string(iden),
+                        self._systemd_type,
+                        persistance_type,
+                    )
+                )
             elif u == "kres-cache-gc.service":
-                res.append(SystemdSubprocess(SubprocessType.GC, "", self._systemd_type))
+                res.append(SystemdSubprocess(SubprocessType.GC, alloc(), self._systemd_type))
         return res
 
     async def initialize_controller(self) -> None:
@@ -128,5 +152,18 @@ class SystemdSubprocessController(SubprocessController):
     async def shutdown_controller(self) -> None:
         pass
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
         return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type)
+
+    async def get_subprocess_info(self) -> List[SubprocessInfo]:
+        def convert(u: systemd.Unit) -> SubprocessInfo:
+            status_lookup_table = {"failed": SubprocessStatus.FAILED, "running": SubprocessStatus.RUNNING}
+
+            if u.state in status_lookup_table:
+                status = status_lookup_table[u.state]
+            else:
+                status = SubprocessStatus.UNKNOWN
+
+            return SubprocessInfo(id=u.name, status=status)
+
+        return list(map(convert, await to_thread(systemd.list_units, self._systemd_type)))
index 9997f9d29153550b99c47beb273425279a4c59f8..8b0e364a598b2d48d34aabc602e292bb709b7595 100644 (file)
@@ -1,6 +1,8 @@
 # pyright: reportUnknownMemberType=false
 # pyright: reportMissingTypeStubs=false
 
+import logging
+from dataclasses import dataclass
 from enum import Enum, auto
 from threading import Thread
 from typing import Any, Callable, Dict, List, Optional, Tuple, Union
@@ -14,6 +16,8 @@ from knot_resolver_manager.constants import KRES_CACHE_DIR, KRESD_CONFIG_FILE, R
 from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import SubprocessType
 
+logger = logging.getLogger(__name__)
+
 
 class SystemdType(Enum):
     SYSTEM = auto()
@@ -89,8 +93,26 @@ def get_unit_file_state(
     return res
 
 
-def list_units(type_: SystemdType) -> List[str]:
-    return [str(u[0]) for u in _create_manager_proxy(type_).ListUnits()]
+@dataclass
+class Unit:
+    name: str
+    state: str
+
+
+def _list_units_internal(type_: SystemdType) -> List[Any]:
+    return _create_manager_proxy(type_).ListUnits()
+
+
+def list_units(type_: SystemdType) -> List[Unit]:
+    return [Unit(name=str(u[0]), state=str(u[4])) for u in _list_units_internal(type_)]
+
+
+def list_unit_names(type_: SystemdType) -> List[str]:
+    return [str(u[0]) for u in _list_units_internal(type_)]
+
+
+def list_failed_unit_names(type_: SystemdType) -> List[str]:
+    return [str(u[0]) for u in _list_units_internal(type_) if str(u[3]) == "failed"]
 
 
 def restart_unit(type_: SystemdType, unit_name: str):
index da41541ffff253ce18a60ce182471c0188b2ad88..5f669986779cc439ed3d866d319b26bd9f73dfe1 100644 (file)
@@ -1,7 +1,6 @@
 import asyncio
 import logging
 import sys
-from functools import partial
 from http import HTTPStatus
 from pathlib import Path
 from time import time
@@ -21,20 +20,19 @@ from knot_resolver_manager.utils.dataclasses_parservalidator import Format
 from .datamodel import KresConfig
 from .kres_manager import KresManager
 
-_MANAGER = "kres_manager"
 _SHUTDOWN_EVENT = "shutdown-event"
 
 logger = logging.getLogger(__name__)
 
 
-async def _index(request: web.Request) -> web.Response:
+async def _index(_request: web.Request) -> web.Response:
     """
     Dummy index handler to indicate that the server is indeed running...
     """
     return json_response(
         {
             "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
-            "status": "RUNNING" if get_kres_manager(request.app) is not None else "INITIALIZING",
+            "status": "RUNNING",
         }
     )
 
@@ -46,7 +44,7 @@ async def _apply_config(request: web.Request) -> web.Response:
 
     document_path = request.match_info["path"]
 
-    manager: KresManager = get_kres_manager(request.app)
+    manager: KresManager = KresManager.get_instance()
     if manager is None:
         # handle the case when the manager is not yet initialized
         return web.Response(
@@ -99,13 +97,6 @@ def stop_server(app: web.Application):
     logger.info("Shutdown event triggered...")
 
 
-def get_kres_manager(app: web.Application) -> KresManager:
-    if _MANAGER not in app:
-        raise ValueError("Accessing manager in an application where it was not defined")
-
-    return app[_MANAGER]
-
-
 class _DefaultSentinel:
     pass
 
@@ -116,7 +107,6 @@ _DEFAULT_SENTINEL = _DefaultSentinel()
 async def _init_manager(
     config: Union[None, Path, KresConfig, _DefaultSentinel],
     subprocess_controller_name: Optional[str],
-    app: web.Application,
 ):
     """
     Called asynchronously when the application initializes.
@@ -129,8 +119,7 @@ async def _init_manager(
 
         # Create KresManager. This will perform autodetection of available service managers and
         # select the most appropriate to use (or use the one configured directly)
-        manager = await KresManager.create(controller)
-        app[_MANAGER] = manager
+        manager = await KresManager.create_instance(controller)
 
         # Initial configuration of the manager
         if config is None:
@@ -167,13 +156,12 @@ async def start_server(
 ):
     start_time = time()
 
-    app = web.Application(middlewares=[error_handler])
+    # before starting any server, initialize the subprocess controller etc.
+    await _init_manager(config, subprocess_controller_name)
 
-    app[_MANAGER] = None
+    app = web.Application(middlewares=[error_handler])
     app[_SHUTDOWN_EVENT] = asyncio.Event()
 
-    app.on_startup.append(partial(_init_manager, config, subprocess_controller_name))
-
     # configure routing
     setup_routes(app)
 
@@ -196,5 +184,5 @@ async def start_server(
     await app[_SHUTDOWN_EVENT].wait()
     logger.info("Gracefull shutdown triggered. Cleaning up...")
     await runner.cleanup()
-    await get_kres_manager(app).stop()
+    await KresManager.get_instance().stop()
     logger.info(f"The manager run for {round(time() - start_time)} seconds... Hope it served well. Bye!")
index 0acb1df763c405c7ff1668c2b1b00dac54e56618..c9efbff7495c6cf6fb457100ce0b268e8cd96860 100644 (file)
@@ -55,7 +55,6 @@ def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bo
 __all__ = [
     "ignore_exceptions_optional",
     "ignore_exceptions",
-    "types",
     "DataclassParserValidatorMixin",
     "Overloaded",
 ]
index 1b22781e7e14d5b72c5263e9336c2da67df51a0a..b54eab20a4e04bcc964bf5c068b4727a419539e1 100644 (file)
@@ -4,7 +4,8 @@ import pkgutil
 import time
 from asyncio import create_subprocess_exec, create_subprocess_shell
 from pathlib import PurePath
-from typing import List, Optional, Union
+from threading import Thread
+from typing import Generic, List, Optional, TypeVar, Union
 
 from knot_resolver_manager.compat.asyncio import to_thread
 
@@ -81,3 +82,27 @@ async def wait_for_process_termination(pid: int, sleep_sec: float = 0):
 
 async def read_resource(package: str, filename: str) -> Optional[bytes]:
     return await to_thread(pkgutil.get_data, package, filename)
+
+
+T = TypeVar("T")
+
+
+class BlockingEventDispatcher(Thread, Generic[T]):
+    def __init__(self, name: str = "blocking_event_dispatcher") -> None:
+        super().__init__(name=name, daemon=True)
+        # warning: the asyncio queue is not thread safe
+        self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
+        self._main_event_loop = asyncio.get_event_loop()
+
+    def dispatch_event(self, event: T):
+        """
+        Method to dispatch events from the blocking thread
+        """
+
+        async def add_to_queue():
+            await self._removed_unit_names.put(event)
+
+        self._main_event_loop.call_soon_threadsafe(add_to_queue)
+
+    async def next_event(self) -> T:
+        return await self._removed_unit_names.get()
index ad51eadb295f80755c333115ebe1e98bc4d7cee7..610d0cb23dd964abadee869485a5ddf1d81288cb 100644 (file)
@@ -111,6 +111,7 @@ disable= [
     "logging-fstring-interpolation", # see https://github.com/PyCQA/pylint/issues/1788
     "no-else-raise", # not helpful for readability, when we want explicit branches
     "raising-bad-type", # handled by type checker
+    "too-many-arguments",  # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
 ]
 
 [tool.pylint.SIMILARITIES]