]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
Revert "systemd: termination notifications; pretty ids improvement"
authorVasek Sraier <git@vakabus.cz>
Wed, 21 Jul 2021 21:00:59 +0000 (23:00 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:52 +0000 (16:17 +0200)
This reverts commit 0e36c7ff19c680ee9a0098745e7f9fe9c4920a67. Integration tests fail
and I don't know why. The commit is moved onto a separate branch and the work on it
will continue there, until it works properly.

manager/README.md
manager/knot_resolver_manager/kres_id.py [deleted file]
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/interface.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/systemd/__init__.py
manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py
manager/knot_resolver_manager/utils/async_utils.py
manager/pyproject.toml

index 60681b890b33e76040f33fa951944e733855f593..4d352721243ca43d415c746ded23dbf5145e53f9 100644 (file)
@@ -57,6 +57,10 @@ Before commiting, please ensure that both `poe check` and `poe test` pass. Those
 
 This project uses [`apkg`](https://gitlab.nic.cz/packaging/apkg) for packaging. See [`distro/README.md`](distro/README.md) for packaging specific instructions.
 
+Not yet properly implemented. Ideal situation would be a command like `poe package` which would create all possible packages.
+
+Temporary solution to build a wheel/sdist - just call `poetry build`. The result will be in the `dist/` directory.
+
 ## FAQ
 
 ### What all those dev dependencies for?
diff --git a/manager/knot_resolver_manager/kres_id.py b/manager/knot_resolver_manager/kres_id.py
deleted file mode 100644 (file)
index ce072ed..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-import itertools
-import weakref
-from typing import Optional
-
-from knot_resolver_manager.utils import ignore_exceptions_optional
-
-
-class KresID:
-    """
-    ID object. Effectively only a wrapper around an int, so that the references
-    behave normally (bypassing integer interning and other optimizations)
-    """
-
-    def __init__(self, n: int):
-        self._id = n
-        self._repr: Optional[str] = None
-
-    def set_custom_str_representation(self, representation: str):
-        self._repr = representation
-
-    def __str__(self) -> str:
-        if self._repr is None:
-            return str(self._id)
-        else:
-            return self._repr
-
-    def __hash__(self) -> int:
-        return self._id
-
-    def __eq__(self, o: object) -> bool:
-        return isinstance(o, KresID) and self._id == o._id
-
-
-_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
-
-
-def alloc() -> KresID:
-    for i in itertools.count(start=1):
-        val = KresID(i)
-        if val not in _used:
-            _used.add(val)
-            return val
-
-    raise RuntimeError("Reached an end of an infinite loop. How?")
-
-
-def alloc_from_string(val: str) -> KresID:
-    int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
-    if int_val is not None:
-        res = KresID(int_val)
-        assert res not in _used
-        _used.add(res)
-        return res
-    else:
-        res = alloc()
-        res.set_custom_str_representation(val)
-        return res
index 5e65e62e3514be0bff98c2feb7a48c61a3237ff3..f369d05a0a24edc6ff6fe4cce68e671f349856b7 100644 (file)
@@ -1,9 +1,10 @@
 import asyncio
+import itertools
 import logging
+import weakref
 from subprocess import SubprocessError
 from typing import List, Optional, Type
 
-from knot_resolver_manager import kres_id
 from knot_resolver_manager.constants import KRESD_CONFIG_FILE
 from knot_resolver_manager.exceptions import ValidationException
 from knot_resolver_manager.kresd_controller import get_best_controller_implementation
@@ -15,6 +16,45 @@ from .datamodel import KresConfig
 logger = logging.getLogger(__name__)
 
 
+class _PrettyID:
+    """
+    ID object. Effectively only a wrapper around an int, so that the references
+    behave normally (bypassing integer interning and other optimizations)
+    """
+
+    def __init__(self, n: int):
+        self._id = n
+
+    def __str__(self):
+        return str(self._id)
+
+    def __hash__(self) -> int:
+        return self._id
+
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, _PrettyID) and self._id == o._id
+
+
+class _PrettyIDAllocator:
+    """
+    Pretty numeric ID allocator. Keeps weak refences to the IDs it has
+    allocated. The IDs get recycled once the previously allocated ID
+    objects get garbage collected
+    """
+
+    def __init__(self):
+        self._used: "weakref.WeakSet[_PrettyID]" = weakref.WeakSet()
+
+    def alloc(self) -> _PrettyID:
+        for i in itertools.count(start=1):
+            val = _PrettyID(i)
+            if val not in self._used:
+                self._used.add(val)
+                return val
+
+        raise RuntimeError("Reached an end of an infinite loop. How?")
+
+
 class KresManager:
     """
     Core of the whole operation. Orchestrates individual instances under some
@@ -43,13 +83,14 @@ class KresManager:
         self._manager_lock = asyncio.Lock()
         self._controller: SubprocessController
         self._last_used_config: Optional[KresConfig] = None
+        self._id_allocator = _PrettyIDAllocator()
 
     async def load_system_state(self):
         async with self._manager_lock:
             await self._collect_already_running_children()
 
     async def _spawn_new_worker(self):
-        subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc())
+        subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, self._id_allocator.alloc())
         await subprocess.start()
         self._workers.append(subprocess)
 
@@ -88,7 +129,7 @@ class KresManager:
         return self._gc is not None
 
     async def _start_gc(self):
-        subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc())
+        subprocess = await self._controller.create_subprocess(SubprocessType.GC, "gc")
         await subprocess.start()
         self._gc = subprocess
 
index 052a597447dc7a122fe99806ac1701d4132c0d34..7a87843376de3c432ec90ad89e218c825236906d 100644 (file)
@@ -1,8 +1,6 @@
 from enum import Enum, auto
 from typing import Iterable
 
-from knot_resolver_manager.kres_id import KresID
-
 
 class SubprocessType(Enum):
     KRESD = auto()
@@ -67,7 +65,7 @@ class SubprocessController:
         """
         raise NotImplementedError()
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
         """
         Return a Subprocess object which can be operated on. The subprocess is not
         started or in any way active after this call. That has to be performaed manually
index bb865b08609d3332eb818f976a8beea3af170711..383df85bb0f5fe8fb3b67a50109a4c206db51805 100644 (file)
@@ -3,7 +3,6 @@ from asyncio.futures import Future
 from typing import Any, Iterable, Set
 
 from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kres_id import KresID, alloc_from_string
 from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
 
 from .config import (
@@ -23,9 +22,9 @@ logger = logging.getLogger(__name__)
 
 
 class SupervisordSubprocess(Subprocess):
-    def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType):
+    def __init__(self, controller: "SupervisordSubprocessController", id_: object, type_: SubprocessType):
         self._controller: "SupervisordSubprocessController" = controller
-        self._id: KresID = id_
+        self._id = id_
         self._type: SubprocessType = type_
 
     @property
@@ -64,8 +63,6 @@ class SupervisordSubprocessController(SubprocessController):
         res = await is_supervisord_available()
         if not res:
             logger.info("Failed to find usable supervisord.")
-        
-        logger.debug("Detection - supervisord controller is available for use")
         return res
 
     async def _update_config_with_real_state(self):
@@ -73,7 +70,7 @@ class SupervisordSubprocessController(SubprocessController):
         if running:
             ids = await list_ids_from_existing_config()
             for tp, id_ in ids:
-                self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp))
+                self._running_instances.add(SupervisordSubprocess(self, id_, tp))
 
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         await self._update_config_with_real_state()
@@ -106,5 +103,5 @@ class SupervisordSubprocessController(SubprocessController):
         assert subprocess in self._running_instances
         await restart(subprocess.id)
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
         return SupervisordSubprocess(self, id_hint, subprocess_type)
index ec2b2b3117dacd153c86b6e177d9645cd37f5387..98a0e3707158e630475ede2cac86aa875e2ac429 100644 (file)
@@ -1,53 +1,17 @@
 import logging
 import os
-from asyncio.futures import Future
 from enum import Enum, auto
-from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional, Tuple
+from typing import Iterable, List
 
 from knot_resolver_manager import compat
-from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string
 from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
 from knot_resolver_manager.utils.async_utils import call
-from knot_resolver_manager.utils.types import NoneType
 
 from . import dbus_api as systemd
 
 logger = logging.getLogger(__name__)
 
 
-_CallbackType = Callable[[], Coroutine[Any, NoneType, NoneType]]
-_callbacks: Dict[Tuple[systemd.SystemdType, str], _CallbackType] = dict()
-_dispatcher_task: "Optional[Future[NoneType]]" = None
-
-
-async def _monitor_unit_termination(type_: systemd.SystemdType) -> NoneType:
-    dispatcher = systemd.UnitRemovedEventDispatcher(type_)
-
-    try:
-        dispatcher.start()
-
-        while True:
-            event = await dispatcher.next_event()
-            if (type_, event) in _callbacks:
-                await _callbacks[(type_, event)]()
-    finally:
-        dispatcher.stop()
-
-
-def _register_terminated_callback(type_: systemd.SystemdType, unit_name: str, callback: _CallbackType):
-    assert (type_, unit_name) not in _callbacks
-    _callbacks[(type_, unit_name)] = callback
-
-    global _dispatcher_task
-    if _dispatcher_task is None:
-        _dispatcher_task = create_task(_monitor_unit_termination(type_))
-
-
-def _unregister_terminated_callback(type_: systemd.SystemdType, unit_name: str):
-    del _callbacks[(type_, unit_name)]
-
-
 class SystemdPersistanceType(Enum):
     PERSISTENT = auto()
     TRANSIENT = auto()
@@ -57,19 +21,15 @@ class SystemdSubprocess(Subprocess):
     def __init__(
         self,
         type_: SubprocessType,
-        id_: KresID,
+        id_: object,
         systemd_type: systemd.SystemdType,
         persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
-        already_running: bool = False,
     ):
         self._type = type_
-        self._id: KresID = id_
+        self._id = id_
         self._systemd_type = systemd_type
         self._persistance_type = persistance_type
 
-        if already_running:
-            _register_terminated_callback(systemd_type, self.id, self._on_unexpected_termination)
-
     @property
     def id(self):
         if self._type is SubprocessType.GC:
@@ -87,19 +47,13 @@ class SystemdSubprocess(Subprocess):
     async def is_running(self) -> bool:
         raise NotImplementedError()
 
-    async def _on_unexpected_termination(self):
-        logger.warning("Detected unexpected termination of unit %s", self.id)
-
     async def start(self):
-        _register_terminated_callback(self._systemd_type, self.id, self._on_unexpected_termination)
-
         if self._persistance_type is SystemdPersistanceType.PERSISTENT:
             await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id)
         elif self._persistance_type is SystemdPersistanceType.TRANSIENT:
             await compat.asyncio.to_thread(systemd.start_transient_unit, self._systemd_type, self.id, self._type)
 
     async def stop(self):
-        _unregister_terminated_callback(self._systemd_type, self.id)
         await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.id)
 
     async def restart(self):
@@ -114,7 +68,6 @@ class SystemdSubprocessController(SubprocessController):
     ):
         self._systemd_type = systemd_type
         self._persistance_type = persistance_type
-        self._unit_removed_event_dispatcher = systemd.UnitRemovedEventDispatcher(systemd_type)
 
     def __str__(self):
         if self._systemd_type == systemd.SystemdType.SESSION:
@@ -164,17 +117,9 @@ class SystemdSubprocessController(SubprocessController):
             if u.startswith("kresd") and u.endswith(".service"):
                 iden = u.replace("kresd", "")[1:].replace(".service", "")
                 persistance_type = SystemdPersistanceType.PERSISTENT if "@" in u else SystemdPersistanceType.TRANSIENT
-                res.append(
-                    SystemdSubprocess(
-                        SubprocessType.KRESD,
-                        alloc_from_string(iden),
-                        self._systemd_type,
-                        persistance_type,
-                        already_running=True,
-                    )
-                )
+                res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type, persistance_type))
             elif u == "kres-cache-gc.service":
-                res.append(SystemdSubprocess(SubprocessType.GC, alloc(), self._systemd_type, already_running=True))
+                res.append(SystemdSubprocess(SubprocessType.GC, "", self._systemd_type))
         return res
 
     async def initialize_controller(self) -> None:
@@ -183,5 +128,5 @@ class SystemdSubprocessController(SubprocessController):
     async def shutdown_controller(self) -> None:
         pass
 
-    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+    async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
         return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type)
index 91d920b538e1f8dcaca1c53b71fd8ad2ddeb9cb7..9997f9d29153550b99c47beb273425279a4c59f8 100644 (file)
@@ -13,7 +13,6 @@ from typing_extensions import Literal
 from knot_resolver_manager.constants import KRES_CACHE_DIR, KRESD_CONFIG_FILE, RUNTIME_DIR
 from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import SubprocessType
-from knot_resolver_manager.utils.async_utils import BlockingEventDispatcher
 
 
 class SystemdType(Enum):
@@ -193,21 +192,3 @@ def can_load_unit(type_: SystemdType, unit_name: str) -> bool:
     except Exception:
         # if this fails in any way, we can assume that the unit is not properly loaded
         return False
-
-
-class UnitRemovedEventDispatcher(BlockingEventDispatcher[str]):
-    def __init__(self, type_: SystemdType) -> None:
-        super().__init__(name="UnitRemovedEventDispatcher")
-        self._systemd = _create_manager_proxy(type_)
-        self._loop: Any = GLib.MainLoop()
-
-    def _handler(self, name: str, _unit: Any):
-        self.dispatch_event(name)
-
-    def run(self) -> None:
-        self._systemd.UnitRemoved.connect(self._handler)
-        self._loop.run()
-
-    def stop(self) -> None:
-        if self.is_alive():
-            self._loop.quit()
index b54eab20a4e04bcc964bf5c068b4727a419539e1..1b22781e7e14d5b72c5263e9336c2da67df51a0a 100644 (file)
@@ -4,8 +4,7 @@ import pkgutil
 import time
 from asyncio import create_subprocess_exec, create_subprocess_shell
 from pathlib import PurePath
-from threading import Thread
-from typing import Generic, List, Optional, TypeVar, Union
+from typing import List, Optional, Union
 
 from knot_resolver_manager.compat.asyncio import to_thread
 
@@ -82,27 +81,3 @@ async def wait_for_process_termination(pid: int, sleep_sec: float = 0):
 
 async def read_resource(package: str, filename: str) -> Optional[bytes]:
     return await to_thread(pkgutil.get_data, package, filename)
-
-
-T = TypeVar("T")
-
-
-class BlockingEventDispatcher(Thread, Generic[T]):
-    def __init__(self, name: str = "blocking_event_dispatcher") -> None:
-        super().__init__(name=name, daemon=True)
-        # warning: the asyncio queue is not thread safe
-        self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
-        self._main_event_loop = asyncio.get_event_loop()
-
-    def dispatch_event(self, event: T):
-        """
-        Method to dispatch events from the blocking thread
-        """
-
-        async def add_to_queue():
-            await self._removed_unit_names.put(event)
-
-        self._main_event_loop.call_soon_threadsafe(add_to_queue)
-
-    async def next_event(self) -> T:
-        return await self._removed_unit_names.get()
index 610d0cb23dd964abadee869485a5ddf1d81288cb..ad51eadb295f80755c333115ebe1e98bc4d7cee7 100644 (file)
@@ -111,7 +111,6 @@ disable= [
     "logging-fstring-interpolation", # see https://github.com/PyCQA/pylint/issues/1788
     "no-else-raise", # not helpful for readability, when we want explicit branches
     "raising-bad-type", # handled by type checker
-    "too-many-arguments",  # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
 ]
 
 [tool.pylint.SIMILARITIES]