From 01564966aef0f02b7e613886df2f057c44608d4f Mon Sep 17 00:00:00 2001 From: Vasek Sraier Date: Wed, 21 Jul 2021 23:00:59 +0200 Subject: [PATCH] Revert "systemd: termination notifications; pretty ids improvement" This reverts commit 0e36c7ff19c680ee9a0098745e7f9fe9c4920a67. Integration tests fail and I don't know why. The commit is moved onto a separate branch and the work on it will continue there, until it works properly. --- manager/README.md | 4 ++ manager/knot_resolver_manager/kres_id.py | 57 ---------------- manager/knot_resolver_manager/kres_manager.py | 47 ++++++++++++- .../kresd_controller/interface.py | 4 +- .../kresd_controller/supervisord/__init__.py | 11 ++- .../kresd_controller/systemd/__init__.py | 67 ++----------------- .../kresd_controller/systemd/dbus_api.py | 19 ------ .../utils/async_utils.py | 27 +------- manager/pyproject.toml | 1 - 9 files changed, 60 insertions(+), 177 deletions(-) delete mode 100644 manager/knot_resolver_manager/kres_id.py diff --git a/manager/README.md b/manager/README.md index 60681b890..4d3527212 100644 --- a/manager/README.md +++ b/manager/README.md @@ -57,6 +57,10 @@ Before commiting, please ensure that both `poe check` and `poe test` pass. Those This project uses [`apkg`](https://gitlab.nic.cz/packaging/apkg) for packaging. See [`distro/README.md`](distro/README.md) for packaging specific instructions. +Not yet properly implemented. Ideal situation would be a command like `poe package` which would create all possible packages. + +Temporary solution to build a wheel/sdist - just call `poetry build`. The result will be in the `dist/` directory. + ## FAQ ### What all those dev dependencies for? diff --git a/manager/knot_resolver_manager/kres_id.py b/manager/knot_resolver_manager/kres_id.py deleted file mode 100644 index ce072ed37..000000000 --- a/manager/knot_resolver_manager/kres_id.py +++ /dev/null @@ -1,57 +0,0 @@ -import itertools -import weakref -from typing import Optional - -from knot_resolver_manager.utils import ignore_exceptions_optional - - -class KresID: - """ - ID object. Effectively only a wrapper around an int, so that the references - behave normally (bypassing integer interning and other optimizations) - """ - - def __init__(self, n: int): - self._id = n - self._repr: Optional[str] = None - - def set_custom_str_representation(self, representation: str): - self._repr = representation - - def __str__(self) -> str: - if self._repr is None: - return str(self._id) - else: - return self._repr - - def __hash__(self) -> int: - return self._id - - def __eq__(self, o: object) -> bool: - return isinstance(o, KresID) and self._id == o._id - - -_used: "weakref.WeakSet[KresID]" = weakref.WeakSet() - - -def alloc() -> KresID: - for i in itertools.count(start=1): - val = KresID(i) - if val not in _used: - _used.add(val) - return val - - raise RuntimeError("Reached an end of an infinite loop. How?") - - -def alloc_from_string(val: str) -> KresID: - int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val) - if int_val is not None: - res = KresID(int_val) - assert res not in _used - _used.add(res) - return res - else: - res = alloc() - res.set_custom_str_representation(val) - return res diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 5e65e62e3..f369d05a0 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -1,9 +1,10 @@ import asyncio +import itertools import logging +import weakref from subprocess import SubprocessError from typing import List, Optional, Type -from knot_resolver_manager import kres_id from knot_resolver_manager.constants import KRESD_CONFIG_FILE from knot_resolver_manager.exceptions import ValidationException from knot_resolver_manager.kresd_controller import get_best_controller_implementation @@ -15,6 +16,45 @@ from .datamodel import KresConfig logger = logging.getLogger(__name__) +class _PrettyID: + """ + ID object. Effectively only a wrapper around an int, so that the references + behave normally (bypassing integer interning and other optimizations) + """ + + def __init__(self, n: int): + self._id = n + + def __str__(self): + return str(self._id) + + def __hash__(self) -> int: + return self._id + + def __eq__(self, o: object) -> bool: + return isinstance(o, _PrettyID) and self._id == o._id + + +class _PrettyIDAllocator: + """ + Pretty numeric ID allocator. Keeps weak refences to the IDs it has + allocated. The IDs get recycled once the previously allocated ID + objects get garbage collected + """ + + def __init__(self): + self._used: "weakref.WeakSet[_PrettyID]" = weakref.WeakSet() + + def alloc(self) -> _PrettyID: + for i in itertools.count(start=1): + val = _PrettyID(i) + if val not in self._used: + self._used.add(val) + return val + + raise RuntimeError("Reached an end of an infinite loop. How?") + + class KresManager: """ Core of the whole operation. Orchestrates individual instances under some @@ -43,13 +83,14 @@ class KresManager: self._manager_lock = asyncio.Lock() self._controller: SubprocessController self._last_used_config: Optional[KresConfig] = None + self._id_allocator = _PrettyIDAllocator() async def load_system_state(self): async with self._manager_lock: await self._collect_already_running_children() async def _spawn_new_worker(self): - subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc()) + subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, self._id_allocator.alloc()) await subprocess.start() self._workers.append(subprocess) @@ -88,7 +129,7 @@ class KresManager: return self._gc is not None async def _start_gc(self): - subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc()) + subprocess = await self._controller.create_subprocess(SubprocessType.GC, "gc") await subprocess.start() self._gc = subprocess diff --git a/manager/knot_resolver_manager/kresd_controller/interface.py b/manager/knot_resolver_manager/kresd_controller/interface.py index 052a59744..7a8784337 100644 --- a/manager/knot_resolver_manager/kresd_controller/interface.py +++ b/manager/knot_resolver_manager/kresd_controller/interface.py @@ -1,8 +1,6 @@ from enum import Enum, auto from typing import Iterable -from knot_resolver_manager.kres_id import KresID - class SubprocessType(Enum): KRESD = auto() @@ -67,7 +65,7 @@ class SubprocessController: """ raise NotImplementedError() - async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess: + async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess: """ Return a Subprocess object which can be operated on. The subprocess is not started or in any way active after this call. That has to be performaed manually diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py index bb865b086..383df85bb 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py @@ -3,7 +3,6 @@ from asyncio.futures import Future from typing import Any, Iterable, Set from knot_resolver_manager.compat.asyncio import create_task -from knot_resolver_manager.kres_id import KresID, alloc_from_string from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType from .config import ( @@ -23,9 +22,9 @@ logger = logging.getLogger(__name__) class SupervisordSubprocess(Subprocess): - def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType): + def __init__(self, controller: "SupervisordSubprocessController", id_: object, type_: SubprocessType): self._controller: "SupervisordSubprocessController" = controller - self._id: KresID = id_ + self._id = id_ self._type: SubprocessType = type_ @property @@ -64,8 +63,6 @@ class SupervisordSubprocessController(SubprocessController): res = await is_supervisord_available() if not res: logger.info("Failed to find usable supervisord.") - - logger.debug("Detection - supervisord controller is available for use") return res async def _update_config_with_real_state(self): @@ -73,7 +70,7 @@ class SupervisordSubprocessController(SubprocessController): if running: ids = await list_ids_from_existing_config() for tp, id_ in ids: - self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp)) + self._running_instances.add(SupervisordSubprocess(self, id_, tp)) async def get_all_running_instances(self) -> Iterable[Subprocess]: await self._update_config_with_real_state() @@ -106,5 +103,5 @@ class SupervisordSubprocessController(SubprocessController): assert subprocess in self._running_instances await restart(subprocess.id) - async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess: + async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess: return SupervisordSubprocess(self, id_hint, subprocess_type) diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py index ec2b2b311..98a0e3707 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py @@ -1,53 +1,17 @@ import logging import os -from asyncio.futures import Future from enum import Enum, auto -from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional, Tuple +from typing import Iterable, List from knot_resolver_manager import compat -from knot_resolver_manager.compat.asyncio import create_task -from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType from knot_resolver_manager.utils.async_utils import call -from knot_resolver_manager.utils.types import NoneType from . import dbus_api as systemd logger = logging.getLogger(__name__) -_CallbackType = Callable[[], Coroutine[Any, NoneType, NoneType]] -_callbacks: Dict[Tuple[systemd.SystemdType, str], _CallbackType] = dict() -_dispatcher_task: "Optional[Future[NoneType]]" = None - - -async def _monitor_unit_termination(type_: systemd.SystemdType) -> NoneType: - dispatcher = systemd.UnitRemovedEventDispatcher(type_) - - try: - dispatcher.start() - - while True: - event = await dispatcher.next_event() - if (type_, event) in _callbacks: - await _callbacks[(type_, event)]() - finally: - dispatcher.stop() - - -def _register_terminated_callback(type_: systemd.SystemdType, unit_name: str, callback: _CallbackType): - assert (type_, unit_name) not in _callbacks - _callbacks[(type_, unit_name)] = callback - - global _dispatcher_task - if _dispatcher_task is None: - _dispatcher_task = create_task(_monitor_unit_termination(type_)) - - -def _unregister_terminated_callback(type_: systemd.SystemdType, unit_name: str): - del _callbacks[(type_, unit_name)] - - class SystemdPersistanceType(Enum): PERSISTENT = auto() TRANSIENT = auto() @@ -57,19 +21,15 @@ class SystemdSubprocess(Subprocess): def __init__( self, type_: SubprocessType, - id_: KresID, + id_: object, systemd_type: systemd.SystemdType, persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT, - already_running: bool = False, ): self._type = type_ - self._id: KresID = id_ + self._id = id_ self._systemd_type = systemd_type self._persistance_type = persistance_type - if already_running: - _register_terminated_callback(systemd_type, self.id, self._on_unexpected_termination) - @property def id(self): if self._type is SubprocessType.GC: @@ -87,19 +47,13 @@ class SystemdSubprocess(Subprocess): async def is_running(self) -> bool: raise NotImplementedError() - async def _on_unexpected_termination(self): - logger.warning("Detected unexpected termination of unit %s", self.id) - async def start(self): - _register_terminated_callback(self._systemd_type, self.id, self._on_unexpected_termination) - if self._persistance_type is SystemdPersistanceType.PERSISTENT: await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id) elif self._persistance_type is SystemdPersistanceType.TRANSIENT: await compat.asyncio.to_thread(systemd.start_transient_unit, self._systemd_type, self.id, self._type) async def stop(self): - _unregister_terminated_callback(self._systemd_type, self.id) await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.id) async def restart(self): @@ -114,7 +68,6 @@ class SystemdSubprocessController(SubprocessController): ): self._systemd_type = systemd_type self._persistance_type = persistance_type - self._unit_removed_event_dispatcher = systemd.UnitRemovedEventDispatcher(systemd_type) def __str__(self): if self._systemd_type == systemd.SystemdType.SESSION: @@ -164,17 +117,9 @@ class SystemdSubprocessController(SubprocessController): if u.startswith("kresd") and u.endswith(".service"): iden = u.replace("kresd", "")[1:].replace(".service", "") persistance_type = SystemdPersistanceType.PERSISTENT if "@" in u else SystemdPersistanceType.TRANSIENT - res.append( - SystemdSubprocess( - SubprocessType.KRESD, - alloc_from_string(iden), - self._systemd_type, - persistance_type, - already_running=True, - ) - ) + res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type, persistance_type)) elif u == "kres-cache-gc.service": - res.append(SystemdSubprocess(SubprocessType.GC, alloc(), self._systemd_type, already_running=True)) + res.append(SystemdSubprocess(SubprocessType.GC, "", self._systemd_type)) return res async def initialize_controller(self) -> None: @@ -183,5 +128,5 @@ class SystemdSubprocessController(SubprocessController): async def shutdown_controller(self) -> None: pass - async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess: + async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess: return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type) diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py index 91d920b53..9997f9d29 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py @@ -13,7 +13,6 @@ from typing_extensions import Literal from knot_resolver_manager.constants import KRES_CACHE_DIR, KRESD_CONFIG_FILE, RUNTIME_DIR from knot_resolver_manager.exceptions import SubprocessControllerException from knot_resolver_manager.kresd_controller.interface import SubprocessType -from knot_resolver_manager.utils.async_utils import BlockingEventDispatcher class SystemdType(Enum): @@ -193,21 +192,3 @@ def can_load_unit(type_: SystemdType, unit_name: str) -> bool: except Exception: # if this fails in any way, we can assume that the unit is not properly loaded return False - - -class UnitRemovedEventDispatcher(BlockingEventDispatcher[str]): - def __init__(self, type_: SystemdType) -> None: - super().__init__(name="UnitRemovedEventDispatcher") - self._systemd = _create_manager_proxy(type_) - self._loop: Any = GLib.MainLoop() - - def _handler(self, name: str, _unit: Any): - self.dispatch_event(name) - - def run(self) -> None: - self._systemd.UnitRemoved.connect(self._handler) - self._loop.run() - - def stop(self) -> None: - if self.is_alive(): - self._loop.quit() diff --git a/manager/knot_resolver_manager/utils/async_utils.py b/manager/knot_resolver_manager/utils/async_utils.py index b54eab20a..1b22781e7 100644 --- a/manager/knot_resolver_manager/utils/async_utils.py +++ b/manager/knot_resolver_manager/utils/async_utils.py @@ -4,8 +4,7 @@ import pkgutil import time from asyncio import create_subprocess_exec, create_subprocess_shell from pathlib import PurePath -from threading import Thread -from typing import Generic, List, Optional, TypeVar, Union +from typing import List, Optional, Union from knot_resolver_manager.compat.asyncio import to_thread @@ -82,27 +81,3 @@ async def wait_for_process_termination(pid: int, sleep_sec: float = 0): async def read_resource(package: str, filename: str) -> Optional[bytes]: return await to_thread(pkgutil.get_data, package, filename) - - -T = TypeVar("T") - - -class BlockingEventDispatcher(Thread, Generic[T]): - def __init__(self, name: str = "blocking_event_dispatcher") -> None: - super().__init__(name=name, daemon=True) - # warning: the asyncio queue is not thread safe - self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue() - self._main_event_loop = asyncio.get_event_loop() - - def dispatch_event(self, event: T): - """ - Method to dispatch events from the blocking thread - """ - - async def add_to_queue(): - await self._removed_unit_names.put(event) - - self._main_event_loop.call_soon_threadsafe(add_to_queue) - - async def next_event(self) -> T: - return await self._removed_unit_names.get() diff --git a/manager/pyproject.toml b/manager/pyproject.toml index 610d0cb23..ad51eadb2 100644 --- a/manager/pyproject.toml +++ b/manager/pyproject.toml @@ -111,7 +111,6 @@ disable= [ "logging-fstring-interpolation", # see https://github.com/PyCQA/pylint/issues/1788 "no-else-raise", # not helpful for readability, when we want explicit branches "raising-bad-type", # handled by type checker - "too-many-arguments", # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid... ] [tool.pylint.SIMILARITIES] -- 2.47.3