From: Vasek Sraier Date: Tue, 22 Mar 2022 11:51:24 +0000 (+0100) Subject: manager systemd backend now uses slices X-Git-Tag: v6.0.0a1~38^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=13545051511c3dfec15ab478b98d20a436c87570;p=thirdparty%2Fknot-resolver.git manager systemd backend now uses slices - added id field to config, which determines the slice name (must be unique across the whole system) - we no longer stop all processes individually, instead we leave it up to the subprocess controller to stop all (and it just stops the slice) --- diff --git a/manager/etc/knot-resolver/config.dev.yml b/manager/etc/knot-resolver/config.dev.yml index 6360004a6..9bba35e4e 100644 --- a/manager/etc/knot-resolver/config.dev.yml +++ b/manager/etc/knot-resolver/config.dev.yml @@ -1,3 +1,4 @@ +id: dev cache: storage: ../cache logging: diff --git a/manager/knot_resolver_manager/constants.py b/manager/knot_resolver_manager/constants.py index 06c7c7d45..44eea9308 100644 --- a/manager/knot_resolver_manager/constants.py +++ b/manager/knot_resolver_manager/constants.py @@ -1,11 +1,9 @@ import logging from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING -from knot_resolver_manager.config_store import ConfigStore from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.utils import which -from knot_resolver_manager.utils.functional import Result if TYPE_CHECKING: from knot_resolver_manager.kresd_controller.interface import KresID @@ -61,40 +59,3 @@ WATCHDOG_INTERVAL: float = 5 """ Used in KresdManager. It's a number of seconds in between system health checks. """ - - -class _UserConstants: - """ - Class for accessing constants, which are technically not constants as they are user configurable. - """ - - def __init__(self, config_store: ConfigStore) -> None: - self._config_store = config_store - - @property - def SERVICE_GROUP_ID(self) -> str: - return self._config_store.get().server.groupid - - -_user_constants: Optional[_UserConstants] = None - - -async def _deny_groupid_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]: - if config_old.server.groupid != config_new.server.groupid: - return Result.err( - "/server/groupid: Based on the groupid, the manager recognizes his subprocesses," - " so it is not possible to change it while services are running." - ) - return Result.ok(None) - - -async def init_user_constants(config_store: ConfigStore) -> None: - global _user_constants - _user_constants = _UserConstants(config_store) - - await config_store.register_verifier(_deny_groupid_changes) - - -def user_constants() -> _UserConstants: - assert _user_constants is not None - return _user_constants diff --git a/manager/knot_resolver_manager/datamodel/config_schema.py b/manager/knot_resolver_manager/datamodel/config_schema.py index fbd9ee7be..3b346ffb4 100644 --- a/manager/knot_resolver_manager/datamodel/config_schema.py +++ b/manager/knot_resolver_manager/datamodel/config_schema.py @@ -1,4 +1,5 @@ import os +import re import sys from typing import Dict, Optional, Union @@ -20,6 +21,7 @@ from knot_resolver_manager.datamodel.server_schema import ServerSchema from knot_resolver_manager.datamodel.static_hints_schema import StaticHintsSchema from knot_resolver_manager.datamodel.stub_zone_schema import StubZoneSchema from knot_resolver_manager.datamodel.types import DomainName +from knot_resolver_manager.datamodel.types.base_types import PatternBase from knot_resolver_manager.datamodel.view_schema import ViewSchema from knot_resolver_manager.utils import SchemaNode @@ -53,12 +55,17 @@ def _import_lua_template() -> Template: _MAIN_TEMPLATE = _import_lua_template() +class IDPattern(PatternBase): + _re = re.compile(r"[a-zA-Z0-9]*") + + class KresConfig(SchemaNode): class Raw(SchemaNode): """ Knot Resolver declarative configuration. --- + id: System-wide unique identifier of this manager instance. Used for grouping logs and tagging kresd processes. server: DNS server control and management configuration. options: Fine-tuning global parameters of DNS resolver operation. network: Network connections and protocols configuration. @@ -76,6 +83,7 @@ class KresConfig(SchemaNode): lua: Custom Lua configuration. """ + id: IDPattern server: ServerSchema = ServerSchema() options: OptionsSchema = OptionsSchema() network: NetworkSchema = NetworkSchema() @@ -94,6 +102,7 @@ class KresConfig(SchemaNode): _PREVIOUS_SCHEMA = Raw + id: str server: ServerSchema options: OptionsSchema network: NetworkSchema diff --git a/manager/knot_resolver_manager/datamodel/server_schema.py b/manager/knot_resolver_manager/datamodel/server_schema.py index 0bdb9f6e1..9c58961d5 100644 --- a/manager/knot_resolver_manager/datamodel/server_schema.py +++ b/manager/knot_resolver_manager/datamodel/server_schema.py @@ -101,7 +101,6 @@ class ServerSchema(SchemaNode): --- hostname: Internal DNS resolver hostname. Default is machine hostname. - groupid: Additional identifier in case more DNS resolvers are running on single machine. nsid: Name Server Identifier (RFC 5001) which allows DNS clients to request resolver to send back its NSID along with the reply to a DNS request. workers: The number of running kresd (Knot Resolver daemon) workers. If set to 'auto', it is equal to number of CPUs available. use_cache_gc: Use (start) kres-cache-gc (cache garbage collector) automatically. @@ -113,7 +112,6 @@ class ServerSchema(SchemaNode): """ hostname: Optional[str] = None - groupid: str = "m" nsid: Optional[str] = None workers: Union[Literal["auto"], IntPositive] = IntPositive(1) use_cache_gc: bool = True @@ -126,7 +124,6 @@ class ServerSchema(SchemaNode): _PREVIOUS_SCHEMA = Raw hostname: str - groupid: str nsid: Optional[str] workers: IntPositive use_cache_gc: bool diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 588b8416c..c479db09e 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -215,9 +215,8 @@ class KresManager: # pylint: disable=too-many-instance-attributes pass async with self._manager_lock: - await self._ensure_number_of_children(KresConfig(), 0) - if self._gc is not None: - await self._stop_gc() + # we could stop all the children one by one right now + # we won't do that and we leave that up to the subprocess controller to do that while it is shutting down await self._controller.shutdown_controller() async def forced_shutdown(self) -> None: diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py index 58d454f50..9b0ae4172 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/__init__.py @@ -1,3 +1,4 @@ +import asyncio import logging import os import re @@ -5,8 +6,8 @@ from typing import Dict, Iterable, List, Optional, Union from knot_resolver_manager import compat from knot_resolver_manager.compat.asyncio import to_thread -from knot_resolver_manager.constants import user_constants from knot_resolver_manager.datamodel.config_schema import KresConfig +from knot_resolver_manager.exceptions import SubprocessControllerException from knot_resolver_manager.kresd_controller.interface import ( KresID, Subprocess, @@ -17,10 +18,14 @@ from knot_resolver_manager.kresd_controller.interface import ( from knot_resolver_manager.kresd_controller.systemd.dbus_api import ( SystemdType, Unit, + is_unit_failed, + list_our_slice_processes, list_units, reset_failed_unit, restart_unit, + start_slice, start_transient_kresd_unit, + stop_slice, stop_unit, ) from knot_resolver_manager.utils import phantom_use @@ -34,21 +39,17 @@ KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$") def _is_service_name_ours(name: str) -> bool: - pref_len = len(user_constants().SERVICE_GROUP_ID) - is_ours = name == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME - is_ours |= name.startswith(user_constants().SERVICE_GROUP_ID) and bool( - KRESD_SERVICE_BASE_PATTERN.match(name[pref_len:]) - ) + is_ours = name == GC_SERVICE_BASE_NAME + is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(name)) return is_ours class SystemdKresID(KresID): @staticmethod def from_string(val: str) -> "SystemdKresID": - if val == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME: + if val == GC_SERVICE_BASE_NAME: return SystemdKresID.new(SubprocessType.GC, -1) else: - val = val[len(user_constants().SERVICE_GROUP_ID) :] kid = KRESD_SERVICE_BASE_PATTERN.search(val) if kid: return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0])) @@ -57,9 +58,9 @@ class SystemdKresID(KresID): def __str__(self) -> str: if self.subprocess_type is SubprocessType.GC: - return user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME + return GC_SERVICE_BASE_NAME elif self.subprocess_type is SubprocessType.KRESD: - return f"{user_constants().SERVICE_GROUP_ID}kresd_{self._id}.service" + return f"kresd_{self._id}.service" else: raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}") @@ -129,31 +130,41 @@ class SystemdSubprocessController(SubprocessController): async def get_all_running_instances(self) -> Iterable[Subprocess]: assert self._controller_config is not None - res: List[SystemdSubprocess] = [] - units = await compat.asyncio.to_thread(list_units, self._systemd_type) - for unit in units: - if _is_service_name_ours(unit.name): - if unit.state == "failed": + units = await compat.asyncio.to_thread(list_our_slice_processes, self._controller_config, self._systemd_type) + + async def load(name: str) -> Optional[SystemdSubprocess]: + assert self._controller_config is not None + + if _is_service_name_ours(name): + failed = await to_thread(is_unit_failed, self._systemd_type, name) + if failed: # if a unit is failed, remove it from the system by reseting its state - logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name) - await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name) - continue - - res.append( - SystemdSubprocess( - self._controller_config, - self._systemd_type, - SystemdKresID.from_string(unit.name), - ) + logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", name) + await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, name) + return None + + return SystemdSubprocess( + self._controller_config, + self._systemd_type, + SystemdKresID.from_string(name), ) + else: + return None - return res + subprocesses = await asyncio.gather(*[load(name) for name in units]) + return filter(lambda x: x is not None, subprocesses) # type: ignore async def initialize_controller(self, config: KresConfig) -> None: self._controller_config = config + try: + await to_thread(start_slice, self._controller_config, self._systemd_type) + except SubprocessControllerException as e: + logger.warning( + f"Failed to create systemd slice for our subprocesses: '{e}'. There is/was a manager running with the same ID." + ) async def shutdown_controller(self) -> None: - pass + await to_thread(stop_slice, self._controller_config, self._systemd_type) async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess: assert self._controller_config is not None diff --git a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py index 4f0c03672..b12ed6094 100644 --- a/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py +++ b/manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py @@ -5,7 +5,7 @@ import logging import os from enum import Enum, auto from threading import Thread -from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar from gi.repository import GLib # type: ignore[import] from pydbus import SystemBus # type: ignore[import] @@ -160,6 +160,16 @@ def reset_failed_unit(typ: SystemdType, unit_name: str) -> None: systemd.ResetFailedUnit(unit_name) +@_wrap_dbus_errors +def _list_slice_services(typ: SystemdType, slice_name: str) -> Set[str]: + # uses DBus method call, which is not documented, but is present since 2016 and is used by `systemctl status` + # appears for the first time in commit 291d565a04263452c03beaf537773ade4f0b1617 in systemd + + systemd = _create_manager_proxy(typ) + data = systemd.GetUnitProcesses(slice_name) + return set((p[0].split("/")[-1] for p in data)) + + @_wrap_dbus_errors def restart_unit(type_: SystemdType, unit_name: str) -> None: systemd = _create_manager_proxy(type_) @@ -170,6 +180,10 @@ def restart_unit(type_: SystemdType, unit_name: str) -> None: _wait_for_job_completion(systemd, job) +def _slice_name(config: KresConfig) -> str: + return f"kres-{config.id}.slice" + + def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[str, str]]: val: Any = [ ("Description", GLib.Variant("s", "transient Knot Resolver unit started by Knot Resolver Manager")), @@ -195,6 +209,7 @@ def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[st ("Restart", GLib.Variant("s", "always")), ("LimitNOFILE", GLib.Variant("t", 524288)), ("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={kres_id}"])), + ("Slice", GLib.Variant("s", _slice_name(config))), ] if config.server.watchdog: @@ -230,19 +245,21 @@ def _gc_unit_properties(config: KresConfig) -> Any: ("RestartUSec", GLib.Variant("t", 30000000)), ("StartLimitIntervalUSec", GLib.Variant("t", 400000000)), ("StartLimitBurst", GLib.Variant("u", 10)), + ("Slice", GLib.Variant("s", _slice_name(config))), ] return val -@_wrap_dbus_errors -def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None: - properties = { - SubprocessType.KRESD: _kresd_unit_properties(config, kres_id), - SubprocessType.GC: _gc_unit_properties(config), - }[kres_id.subprocess_type] - name = str(kres_id) +def _kres_slice_properties() -> Any: + val: Any = [ + ("Description", GLib.Variant("s", "Knot Resolver processes")), + ] + return val - systemd = _create_manager_proxy(type_) + +@_wrap_dbus_errors +def _start_transient_unit(systemd_type: SystemdType, name: str, properties: Any) -> None: + systemd = _create_manager_proxy(systemd_type) def job(): return systemd.StartTransientUnit(name, "fail", properties, []) @@ -262,6 +279,28 @@ def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: raise SubprocessControllerException(f"Failed to start systemd transient service '{name}': {e}") from e +def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None: + properties = { + SubprocessType.KRESD: _kresd_unit_properties(config, kres_id), + SubprocessType.GC: _gc_unit_properties(config), + }[kres_id.subprocess_type] + name = str(kres_id) + + _start_transient_unit(type_, name, properties) + + +def start_slice(config: KresConfig, systemd_type: SystemdType) -> None: + _start_transient_unit(systemd_type, _slice_name(config), _kres_slice_properties()) + + +def stop_slice(config: KresConfig, systemd_type: SystemdType) -> None: + stop_unit(systemd_type, _slice_name(config)) + + +def list_our_slice_processes(config: KresConfig, systemd_type: SystemdType) -> Set[str]: + return _list_slice_services(systemd_type, _slice_name(config)) + + @_wrap_dbus_errors def start_unit(type_: SystemdType, unit_name: str) -> None: systemd = _create_manager_proxy(type_) @@ -289,6 +328,14 @@ def list_unit_files(type_: SystemdType) -> List[str]: return [str(x[0]) for x in files] +@_wrap_dbus_errors +def is_unit_failed(typ: SystemdType, unit_name: str) -> bool: + systemd = _create_manager_proxy(typ) + unit_path = systemd.LoadUnit(unit_name) + unit_object = _create_object_proxy(typ, ".systemd1", unit_path) + return unit_object.ActiveState == "failed" + + @_wrap_dbus_errors def can_load_unit(type_: SystemdType, unit_name: str) -> bool: systemd = _create_manager_proxy(type_) diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index 9b2a08e8d..a35f4a10d 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -19,7 +19,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite from knot_resolver_manager import log, statistics from knot_resolver_manager.compat import asyncio as asyncio_compat from knot_resolver_manager.config_store import ConfigStore -from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants +from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.datamodel.server_schema import ManagementSchema from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException @@ -305,7 +305,6 @@ async def _load_config(config: ParsedTree) -> KresConfig: async def _init_config_store(config: ParsedTree) -> ConfigStore: config_validated = await _load_config(config) config_store = ConfigStore(config_validated) - await init_user_constants(config_store) return config_store @@ -355,6 +354,9 @@ def _lock_working_directory(attempt: int = 0) -> None: os.kill(pid, 0) except OSError as e: if e.errno == errno.ESRCH: + logger.warning( + "Detected old lock file in the working directory, previous instance of the manager must have crashed." + ) os.unlink(PID_FILE_NAME) _lock_working_directory(attempt=attempt + 1) return