From: Vasek Sraier Date: Sun, 27 Feb 2022 12:23:08 +0000 (+0100) Subject: manager: id allocation system: rewritten once more with id subclassing X-Git-Tag: v6.0.0a1~41^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3d37c6e4228d2fccd3a29b10b44f8326f445c4b1;p=thirdparty%2Fknot-resolver.git manager: id allocation system: rewritten once more with id subclassing --- diff --git a/manager/.flake8 b/manager/.flake8 index 4a464999f..3a7c8e749 100644 --- a/manager/.flake8 +++ b/manager/.flake8 @@ -1,2 +1,3 @@ [flake8] -max-line-length = 200 \ No newline at end of file +max-line-length = 200 +extend-ignore = E203 \ No newline at end of file diff --git a/manager/knot_resolver_manager/constants.py b/manager/knot_resolver_manager/constants.py index 3f1f0a8c7..5bade6bca 100644 --- a/manager/knot_resolver_manager/constants.py +++ b/manager/knot_resolver_manager/constants.py @@ -1,9 +1,14 @@ import logging from pathlib import Path +from typing import TYPE_CHECKING, Optional +from knot_resolver_manager.config_store import ConfigStore from knot_resolver_manager.datamodel.config_schema import KresConfig -from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.utils import which +from knot_resolver_manager.utils.functional import Result + +if TYPE_CHECKING: + from knot_resolver_manager.kresd_controller.interface import KresID STARTUP_LOG_LEVEL = logging.DEBUG DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml") @@ -21,8 +26,8 @@ def kresd_cache_dir(config: KresConfig) -> Path: return config.cache.storage.to_path() -def kresd_config_file(config: KresConfig, kres_id: KresID) -> Path: - return Path(f"{config.server.groupid}kresd_{kres_id}.conf") +def kresd_config_file(_config: KresConfig, kres_id: "KresID") -> Path: + return Path(f"{kres_id}.conf") def supervisord_config_file(_config: KresConfig) -> Path: @@ -53,3 +58,40 @@ WATCHDOG_INTERVAL: float = 5 """ Used in KresdManager. It's a number of seconds in between system health checks. """ + + +class _UserConstants: + """ + Class for accessing constants, which are technically not constants as they are user configurable. + """ + + def __init__(self, config_store: ConfigStore) -> None: + self._config_store = config_store + + @property + def SERVICE_GROUP_ID(self) -> str: + return self._config_store.get().server.groupid + + +_user_constants: Optional[_UserConstants] = None + + +async def _deny_groupid_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]: + if config_old.server.groupid != config_new.server.groupid: + return Result.err( + "/server/groupid: Based on the groupid, the manager recognizes his subprocesses," + " so it is not possible to change it while services are running." + ) + return Result.ok(None) + + +async def init_user_constants(config_store: ConfigStore) -> None: + global _user_constants + _user_constants = _UserConstants(config_store) + + await config_store.register_verifier(_deny_groupid_changes) + + +def user_constants() -> _UserConstants: + assert _user_constants is not None + return _user_constants diff --git a/manager/knot_resolver_manager/kres_id.py b/manager/knot_resolver_manager/kres_id.py deleted file mode 100644 index 76cccd8b7..000000000 --- a/manager/knot_resolver_manager/kres_id.py +++ /dev/null @@ -1,71 +0,0 @@ -import itertools -import weakref -from typing import Optional - -from knot_resolver_manager.utils import ignore_exceptions_optional - - -class KresID: - """ - ID object. Effectively only a wrapper around an int, so that the references - behave normally (bypassing integer interning and other optimizations) - """ - - _used: "weakref.WeakSet[KresID]" = weakref.WeakSet() - - @staticmethod - def alloc(_custom_name_id: bool = False) -> "KresID": - for i in itertools.count(start=1): - val = KresID(i if not _custom_name_id else -i) - if val not in KresID._used: - KresID._used.add(val) - return val - - raise RuntimeError("Reached an end of an infinite loop. How?") - - @staticmethod - def from_string(val: str) -> "KresID": - """ - Create a new KresID instance with ID based on the given string. There are no guarantees - that the returned KresID is unique. - """ - int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val) - if int_val is not None: - res = KresID(int_val) - else: - # this would be for example 'gc' - # we want a special value, so that they do not clash with normal numerical values - res = KresID.alloc(_custom_name_id=True) - res.set_custom_str_representation(val) - - KresID._used.add(res) - return res - - def __init__(self, n: int): - self._id = n - self._repr: Optional[str] = None - - def set_custom_str_representation(self, representation: str) -> None: - self._repr = representation - - def __str__(self) -> str: - if self._repr is None: - return str(self._id) - else: - return self._repr - - def __repr__(self) -> str: - return f"KresID({self})" - - def __hash__(self) -> int: - if self._repr: - return hash(self._repr) - return self._id - - def __eq__(self, o: object) -> bool: - if isinstance(o, KresID): - ret = self._id == o._id - if self._repr: - ret |= self._repr == o._repr - return ret - return False diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 444bebcfb..ed487197e 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -177,7 +177,8 @@ class KresManager: try: # gather current state - detected_subprocesses = await self._controller.get_subprocess_status() + async with self._manager_lock: + detected_subprocesses = await self._controller.get_subprocess_status() expected_ids = [x.id for x in self._workers] if self._gc: expected_ids.append(self._gc.id) diff --git a/manager/knot_resolver_manager/kresd_controller/interface.py b/manager/knot_resolver_manager/kresd_controller/interface.py index 8a815fc22..3aa08f6ef 100644 --- a/manager/knot_resolver_manager/kresd_controller/interface.py +++ b/manager/knot_resolver_manager/kresd_controller/interface.py @@ -1,12 +1,13 @@ import asyncio +import itertools import sys from enum import Enum, auto -from typing import Dict, Iterable, Optional +from typing import Dict, Iterable, Optional, Type, TypeVar +from weakref import WeakValueDictionary from knot_resolver_manager.constants import kresd_config_file from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.exceptions import SubprocessControllerException -from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for from knot_resolver_manager.utils.async_utils import writefile @@ -16,16 +17,97 @@ class SubprocessType(Enum): GC = auto() +T = TypeVar("T", bound="KresID") + + +class KresID: + """ + ID object used for identifying subprocesses. + """ + + _used: "WeakValueDictionary[int, KresID]" = WeakValueDictionary() + + @classmethod + def alloc(cls: Type[T], typ: SubprocessType) -> T: + # we split them in order to make the numbers nice (no gaps, pretty naming) + # there are no strictly technical reasons to do this + # + # GC - negative IDs + # KRESD - positive IDs + if typ is SubprocessType.GC: + start = -1 + step = -1 + elif typ is SubprocessType.KRESD: + start = 1 + step = 1 + else: + raise RuntimeError(f"Unexpected subprocess type {typ}") + + # find free ID closest to zero + for i in itertools.count(start=start, step=step): + if i not in cls._used: + res = cls.new(typ, i) + return res + + raise RuntimeError("Reached an end of an infinite loop. How?") + + @classmethod + def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T": + if n in cls._used: + # Ignoring typing here, because I can't find a way how to make the _used dict + # typed based on subclass. I am not even sure that it's different between subclasses, + # it's probably still the same dict. But we don't really care about it + return cls._used[n] # type: ignore + else: + val = cls(typ, n, _i_know_what_i_am_doing=True) + cls._used[n] = val + return val + + def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False): + if not _i_know_what_i_am_doing: + raise RuntimeError("Don't do this. You seem to have no idea what it does") + + self._id = n + self._type = typ + + @property + def subprocess_type(self) -> SubprocessType: + return self._type + + def __repr__(self) -> str: + return f"KresID({self})" + + def __hash__(self) -> int: + return self._id + + def __eq__(self, o: object) -> bool: + if isinstance(o, KresID): + return self._id == o._id + return False + + def __str__(self) -> str: + """ + Returns string representation of the ID usable directly in the underlying service manager + """ + raise NotImplementedError() + + @staticmethod + def from_string(val: str) -> "KresID": + """ + Inverse of __str__ + """ + raise NotImplementedError() + + class Subprocess: """ One SubprocessInstance corresponds to one manager's subprocess """ - def __init__(self, config: KresConfig, typ: SubprocessType, custom_id: Optional[KresID] = None) -> None: - self._id = KresID.alloc() if custom_id is None else custom_id + def __init__(self, config: KresConfig, kid: KresID) -> None: + self._id = kid self._config = config self._metrics_registered: bool = False - self._type = typ async def start(self) -> None: # create config file @@ -70,7 +152,7 @@ class Subprocess: @property def type(self) -> SubprocessType: - return self._type + return self.id.subprocess_type @property def id(self) -> KresID: diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py index 9d4570f5f..8844bedcb 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py @@ -4,7 +4,7 @@ import os import signal from os import kill from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Set, Union from xmlrpc.client import ServerProxy import supervisor.xmlrpc # type: ignore[import] @@ -26,8 +26,8 @@ from knot_resolver_manager.constants import ( ) from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.exceptions import SubprocessControllerException -from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.kresd_controller.interface import ( + KresID, Subprocess, SubprocessController, SubprocessStatus, @@ -44,6 +44,24 @@ from knot_resolver_manager.utils.async_utils import ( logger = logging.getLogger(__name__) +class SupervisordKresID(KresID): + @staticmethod + def from_string(val: str) -> "SupervisordKresID": + if val == "gc": + return SupervisordKresID.new(SubprocessType.GC, -1) + else: + val = val.replace("kresd", "") + return SupervisordKresID.new(SubprocessType.KRESD, int(val)) + + def __str__(self) -> str: + if self.subprocess_type is SubprocessType.GC: + return "gc" + elif self.subprocess_type is SubprocessType.KRESD: + return f"kresd{self._id}" + else: + raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}") + + @dataclass class _Instance: """ @@ -182,21 +200,20 @@ def _list_subprocesses(config: KresConfig) -> Dict[KresID, SubprocessStatus]: status = SubprocessStatus.UNKNOWN return status - return {KresID.from_string(pr["name"]): convert(pr) for pr in processes} + return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes} -async def _list_ids_from_existing_config(cfg: KresConfig) -> List[Tuple[SubprocessType, KresID]]: +async def _list_ids_from_existing_config(cfg: KresConfig) -> List[SupervisordKresID]: config = await readfile(supervisord_config_file(cfg)) cp = configparser.ConfigParser() cp.read_string(config) - res: List[Tuple[SubprocessType, KresID]] = [] + res: List[SupervisordKresID] = [] for section in cp.sections(): if section.startswith("program:"): program_id = section.replace("program:", "") - iid = KresID.from_string(program_id) - typ = SubprocessType[cp[section].get("type")] - res.append((typ, iid)) + kid = SupervisordKresID.from_string(program_id) + res.append(kid) return res @@ -205,10 +222,12 @@ class SupervisordSubprocess(Subprocess): self, config: KresConfig, controller: "SupervisordSubprocessController", - typ: SubprocessType, - custom_id: Optional[KresID] = None, + base_id: Union[SubprocessType, SupervisordKresID], ): - super().__init__(config, typ, custom_id=custom_id) + if isinstance(base_id, SubprocessType): + super().__init__(config, SupervisordKresID.alloc(base_id)) + else: + super().__init__(config, base_id) self._controller: "SupervisordSubprocessController" = controller async def _start(self) -> None: @@ -249,8 +268,8 @@ class SupervisordSubprocessController(SubprocessController): running = await _is_supervisord_running(config) if running: ids = await _list_ids_from_existing_config(config) - for tp, id_ in ids: - self._running_instances.add(SupervisordSubprocess(self._controller_config, self, tp, custom_id=id_)) + for id_ in ids: + self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_)) async def get_all_running_instances(self) -> Iterable[Subprocess]: assert self._controller_config is not None diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py index 2d56ada2b..417f5bdb7 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py @@ -1,24 +1,22 @@ import logging import os -from typing import Dict, Iterable, List, Optional +import re +from typing import Dict, Iterable, List, Optional, Union from knot_resolver_manager import compat from knot_resolver_manager.compat.asyncio import to_thread +from knot_resolver_manager.constants import user_constants from knot_resolver_manager.datamodel.config_schema import KresConfig -from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.kresd_controller.interface import ( + KresID, Subprocess, SubprocessController, SubprocessStatus, SubprocessType, ) from knot_resolver_manager.kresd_controller.systemd.dbus_api import ( - GC_SERVICE_BASE_NAME, SystemdType, Unit, - create_service_name, - is_service_name_ours, - kres_id_from_service_name, list_units, reset_failed_unit, restart_unit, @@ -31,23 +29,57 @@ from knot_resolver_manager.utils.async_utils import call logger = logging.getLogger(__name__) +GC_SERVICE_BASE_NAME = "kres_cache_gc.service" +KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$") + + +def _is_service_name_ours(name: str) -> bool: + pref_len = len(user_constants().SERVICE_GROUP_ID) + is_ours = name == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME + is_ours |= name.startswith(user_constants().SERVICE_GROUP_ID) and bool( + KRESD_SERVICE_BASE_PATTERN.match(name[pref_len:]) + ) + return is_ours + + +class SystemdKresID(KresID): + @staticmethod + def from_string(val: str) -> "SystemdKresID": + if val == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME: + return SystemdKresID.new(SubprocessType.GC, -1) + else: + val = val[len(user_constants().SERVICE_GROUP_ID) :] + kid = KRESD_SERVICE_BASE_PATTERN.search(val) + if kid: + return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0])) + else: + raise RuntimeError("Trying to parse systemd service name which does not match our expectations") + + def __str__(self) -> str: + if self.subprocess_type is SubprocessType.GC: + return user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME + elif self.subprocess_type is SubprocessType.KRESD: + return f"{user_constants().SERVICE_GROUP_ID}kresd_{self._id}.service" + else: + raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}") + + class SystemdSubprocess(Subprocess): - def __init__( - self, config: KresConfig, typ: SubprocessType, systemd_type: SystemdType, custom_id: Optional[KresID] = None - ): - super().__init__(config, typ, custom_id=custom_id) + def __init__(self, config: KresConfig, systemd_type: SystemdType, id_base: Union[SubprocessType, KresID]): + if isinstance(id_base, SubprocessType): + super().__init__(config, SystemdKresID.alloc(id_base)) + else: + super().__init__(config, id_base) self._systemd_type = systemd_type async def _start(self): - await compat.asyncio.to_thread( - start_transient_kresd_unit, self._config, self._systemd_type, self.id, self._type - ) + await compat.asyncio.to_thread(start_transient_kresd_unit, self._config, self._systemd_type, self.id) async def _stop(self): - await compat.asyncio.to_thread(stop_unit, self._systemd_type, create_service_name(self.id, self._config)) + await compat.asyncio.to_thread(stop_unit, self._systemd_type, str(self.id)) async def _restart(self): - await compat.asyncio.to_thread(restart_unit, self._systemd_type, create_service_name(self.id, self._config)) + await compat.asyncio.to_thread(restart_unit, self._systemd_type, str(self.id)) class SystemdSubprocessController(SubprocessController): @@ -97,7 +129,7 @@ class SystemdSubprocessController(SubprocessController): res: List[SystemdSubprocess] = [] units = await compat.asyncio.to_thread(list_units, self._systemd_type) for unit in units: - if is_service_name_ours(unit.name, self._controller_config): + if _is_service_name_ours(unit.name): if unit.state == "failed": # if a unit is failed, remove it from the system by reseting its state logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name) @@ -107,11 +139,8 @@ class SystemdSubprocessController(SubprocessController): res.append( SystemdSubprocess( self._controller_config, - SubprocessType.GC - if unit.name == self._controller_config.server.groupid + GC_SERVICE_BASE_NAME - else SubprocessType.KRESD, self._systemd_type, - custom_id=kres_id_from_service_name(unit.name, self._controller_config), + SystemdKresID.from_string(unit.name), ) ) @@ -125,12 +154,7 @@ class SystemdSubprocessController(SubprocessController): async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess: assert self._controller_config is not None - - custom_id = None - if subprocess_type == SubprocessType.GC: - custom_id = KresID.from_string(subprocess_config.server.groupid + GC_SERVICE_BASE_NAME) - - return SystemdSubprocess(subprocess_config, subprocess_type, self._systemd_type, custom_id=custom_id) + return SystemdSubprocess(subprocess_config, self._systemd_type, subprocess_type) async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]: assert self._controller_config is not None @@ -144,5 +168,5 @@ class SystemdSubprocessController(SubprocessController): data: List[Unit] = await to_thread(list_units, self._systemd_type) - our_data = filter(lambda u: is_service_name_ours(u.name, self._controller_config), data) # type: ignore - return {kres_id_from_service_name(u.name, self._controller_config): convert(u.state) for u in our_data} + our_data = filter(lambda u: _is_service_name_ours(u.name), data) + return {SystemdKresID.from_string(u.name): convert(u.state) for u in our_data} diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py index b0270c9e4..b18e4d487 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py @@ -3,7 +3,6 @@ import logging import os -import re from enum import Enum, auto from threading import Thread from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar @@ -17,40 +16,10 @@ from knot_resolver_manager.compat.dataclasses import dataclass from knot_resolver_manager.constants import kres_gc_executable, kresd_cache_dir, kresd_config_file, kresd_executable from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.exceptions import SubprocessControllerException -from knot_resolver_manager.kres_id import KresID -from knot_resolver_manager.kresd_controller.interface import SubprocessType +from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType logger = logging.getLogger(__name__) -GC_SERVICE_BASE_NAME = "kres_cache_gc.service" -KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$") - - -def service_name_remove_prefix(service_name: str, prefix: str) -> str: - return service_name[len(prefix) :] if service_name.startswith(prefix) else service_name # noqa: E203 - - -def kres_id_from_service_name(service_name: str, config: KresConfig) -> KresID: - service_name_noprefix = service_name_remove_prefix(service_name, config.server.groupid) - kid = KRESD_SERVICE_BASE_PATTERN.search(service_name_noprefix) - if kid: - return KresID.from_string(kid.groups()[0]) - return KresID.from_string(service_name) - - -def create_service_name(kid: KresID, config: KresConfig) -> str: - rep = str(kid) - if rep.isnumeric(): - return f"{config.server.groupid}kresd_{rep}.service" - return rep - - -def is_service_name_ours(service_name: str, config: KresConfig) -> bool: - service_name_noprefix = service_name_remove_prefix(service_name, config.server.groupid) - is_ours = service_name_noprefix == GC_SERVICE_BASE_NAME - is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(service_name_noprefix)) - return is_ours - class SystemdType(Enum): SYSTEM = auto() @@ -117,17 +86,24 @@ def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str]) return event_hander + loop: Any = None + def event_loop_isolation_thread() -> None: - loop: Any = GLib.MainLoop() + nonlocal loop + loop = GLib.MainLoop() systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop)) loop.run() # first start the thread to watch for results to prevent race conditions - thread = Thread(target=event_loop_isolation_thread) + thread = Thread(target=event_loop_isolation_thread, name="glib-loop-isolation-thread") thread.start() # then create the job - job_path = job_creating_func() + try: + job_path = job_creating_func() + except BaseException: + loop.quit() + raise # then wait for the results thread.join() @@ -247,13 +223,12 @@ def _gc_unit_properties(config: KresConfig) -> Any: @_wrap_dbus_errors -def start_transient_kresd_unit( - config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType -) -> None: - name, properties = { - SubprocessType.KRESD: (create_service_name(kres_id, config), _kresd_unit_properties(config, kres_id)), - SubprocessType.GC: (create_service_name(kres_id, config), _gc_unit_properties(config)), - }[subprocess_type] +def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None: + properties = { + SubprocessType.KRESD: _kresd_unit_properties(config, kres_id), + SubprocessType.GC: _gc_unit_properties(config), + }[kres_id.subprocess_type] + name = str(kres_id) systemd = _create_manager_proxy(type_) diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index e7e68ee4b..95c20400d 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -17,7 +17,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite from knot_resolver_manager import log, statistics from knot_resolver_manager.compat import asyncio as asyncio_compat from knot_resolver_manager.config_store import ConfigStore -from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE +from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, init_user_constants from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.datamodel.server_schema import ManagementSchema from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException, TreeException @@ -88,14 +88,6 @@ class Server: ) return Result.ok(None) - async def _deny_groupid_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]: - if config_old.server.groupid != config_new.server.groupid: - return Result.err( - "/server/groupid: Based on the groupid, the manager recognizes his subprocesses," - " so it is not possible to change it while services are running." - ) - return Result.ok(None) - async def sigint_handler(self) -> None: logger.info("Received SIGINT, triggering graceful shutdown") self.shutdown_event.set() @@ -129,7 +121,6 @@ class Server: asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler) await self.runner.setup() await self.config_store.register_verifier(self._deny_management_changes) - await self.config_store.register_verifier(self._deny_groupid_changes) await self.config_store.register_on_change_callback(self._reconfigure) async def wait_for_shutdown(self) -> None: @@ -292,7 +283,9 @@ async def _load_config(config: ParsedTree) -> KresConfig: async def _init_config_store(config: ParsedTree) -> ConfigStore: config_validated = await _load_config(config) - return ConfigStore(config_validated) + config_store = ConfigStore(config_validated) + await init_user_constants(config_store) + return config_store async def _init_manager(config_store: ConfigStore) -> KresManager: diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index 949f6c977..d21a3915d 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -16,11 +16,10 @@ from prometheus_client.core import ( # type: ignore from knot_resolver_manager import compat from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes from knot_resolver_manager.datamodel.config_schema import KresConfig -from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.utils.functional import Result if TYPE_CHECKING: - from knot_resolver_manager.kresd_controller.interface import Subprocess + from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess logger = logging.getLogger(__name__) @@ -51,8 +50,8 @@ def async_timing_histogram(metric: Histogram) -> Callable[[Callable[..., Awaitab return decorator -async def _command_registered_resolvers(cmd: str) -> Dict[KresID, str]: - async def single_pair(sub: "Subprocess") -> Tuple[KresID, str]: +async def _command_registered_resolvers(cmd: str) -> "Dict[KresID, str]": + async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]": return sub.id, await sub.command(cmd) pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values())) @@ -81,7 +80,7 @@ def _histogram( class ResolverCollector: def __init__(self, config_store: ConfigStore) -> None: - self._stats_raw: Optional[Dict[KresID, str]] = None + self._stats_raw: "Optional[Dict[KresID, str]]" = None self._config_store: ConfigStore = config_store self._collection_task: "Optional[asyncio.Task[None]]" = None self._skip_immediate_collection: bool = False @@ -137,7 +136,7 @@ class ResolverCollector: # when not running, we can start a new loop (we are not in the manager's main thread) compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) - def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily: + def _create_resolver_metrics_loaded_gauge(self, kid: "KresID", loaded: bool) -> GaugeMetricFamily: return _gauge( "resolver_metrics_loaded", "0 if metrics from resolver instance were not loaded, otherwise 1", @@ -179,7 +178,7 @@ class ResolverCollector: # this function prevents the collector registry from invoking the collect function on startup return [] - def _parse_resolver_metrics(self, instance_id: KresID, metrics: Any) -> Generator[Metric, None, None]: + def _parse_resolver_metrics(self, instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]: sid = str(instance_id) # response latency histogram