]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager systemd backend now uses slices
authorVasek Sraier <git@vakabus.cz>
Tue, 22 Mar 2022 11:51:24 +0000 (12:51 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:54 +0000 (16:17 +0200)
- added id field to config, which determines the slice name (must be unique across the whole system)
- we no longer stop all processes individually, instead we leave it up to the subprocess controller to stop all (and it just stops the slice)

manager/etc/knot-resolver/config.dev.yml
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/config_schema.py
manager/knot_resolver_manager/datamodel/server_schema.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/systemd/__init__.py
manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py
manager/knot_resolver_manager/server.py

index 6360004a60f77645b68fc9c0e6d89bd5a6872a9a..9bba35e4ef41c30733d5a1404ffdf28eab94939e 100644 (file)
@@ -1,3 +1,4 @@
+id: dev
 cache:
   storage: ../cache
 logging:
index 06c7c7d45103f8bdedf25ad29ec46cd1dc5ac7fd..44eea9308c052572fa61e9f4d7d31a52ebeb67f5 100644 (file)
@@ -1,11 +1,9 @@
 import logging
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
 
-from knot_resolver_manager.config_store import ConfigStore
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.utils import which
-from knot_resolver_manager.utils.functional import Result
 
 if TYPE_CHECKING:
     from knot_resolver_manager.kresd_controller.interface import KresID
@@ -61,40 +59,3 @@ WATCHDOG_INTERVAL: float = 5
 """
 Used in KresdManager. It's a number of seconds in between system health checks.
 """
-
-
-class _UserConstants:
-    """
-    Class for accessing constants, which are technically not constants as they are user configurable.
-    """
-
-    def __init__(self, config_store: ConfigStore) -> None:
-        self._config_store = config_store
-
-    @property
-    def SERVICE_GROUP_ID(self) -> str:
-        return self._config_store.get().server.groupid
-
-
-_user_constants: Optional[_UserConstants] = None
-
-
-async def _deny_groupid_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
-    if config_old.server.groupid != config_new.server.groupid:
-        return Result.err(
-            "/server/groupid: Based on the groupid, the manager recognizes his subprocesses,"
-            " so it is not possible to change it while services are running."
-        )
-    return Result.ok(None)
-
-
-async def init_user_constants(config_store: ConfigStore) -> None:
-    global _user_constants
-    _user_constants = _UserConstants(config_store)
-
-    await config_store.register_verifier(_deny_groupid_changes)
-
-
-def user_constants() -> _UserConstants:
-    assert _user_constants is not None
-    return _user_constants
index fbd9ee7bed698d19ae16e802255a70ddad197596..3b346ffb42abe444deeaff9e9920e8885c1f88b2 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import re
 import sys
 from typing import Dict, Optional, Union
 
@@ -20,6 +21,7 @@ from knot_resolver_manager.datamodel.server_schema import ServerSchema
 from knot_resolver_manager.datamodel.static_hints_schema import StaticHintsSchema
 from knot_resolver_manager.datamodel.stub_zone_schema import StubZoneSchema
 from knot_resolver_manager.datamodel.types import DomainName
+from knot_resolver_manager.datamodel.types.base_types import PatternBase
 from knot_resolver_manager.datamodel.view_schema import ViewSchema
 from knot_resolver_manager.utils import SchemaNode
 
@@ -53,12 +55,17 @@ def _import_lua_template() -> Template:
 _MAIN_TEMPLATE = _import_lua_template()
 
 
+class IDPattern(PatternBase):
+    _re = re.compile(r"[a-zA-Z0-9]*")
+
+
 class KresConfig(SchemaNode):
     class Raw(SchemaNode):
         """
         Knot Resolver declarative configuration.
 
         ---
+        id: System-wide unique identifier of this manager instance. Used for grouping logs and tagging kresd processes.
         server: DNS server control and management configuration.
         options: Fine-tuning global parameters of DNS resolver operation.
         network: Network connections and protocols configuration.
@@ -76,6 +83,7 @@ class KresConfig(SchemaNode):
         lua: Custom Lua configuration.
         """
 
+        id: IDPattern
         server: ServerSchema = ServerSchema()
         options: OptionsSchema = OptionsSchema()
         network: NetworkSchema = NetworkSchema()
@@ -94,6 +102,7 @@ class KresConfig(SchemaNode):
 
     _PREVIOUS_SCHEMA = Raw
 
+    id: str
     server: ServerSchema
     options: OptionsSchema
     network: NetworkSchema
index 0bdb9f6e130bc3c10d783a07a2137c38617a93aa..9c58961d5124d633d8fce109ba7fbf74549701e5 100644 (file)
@@ -101,7 +101,6 @@ class ServerSchema(SchemaNode):
 
         ---
         hostname: Internal DNS resolver hostname. Default is machine hostname.
-        groupid: Additional identifier in case more DNS resolvers are running on single machine.
         nsid: Name Server Identifier (RFC 5001) which allows DNS clients to request resolver to send back its NSID along with the reply to a DNS request.
         workers: The number of running kresd (Knot Resolver daemon) workers. If set to 'auto', it is equal to number of CPUs available.
         use_cache_gc: Use (start) kres-cache-gc (cache garbage collector) automatically.
@@ -113,7 +112,6 @@ class ServerSchema(SchemaNode):
         """
 
         hostname: Optional[str] = None
-        groupid: str = "m"
         nsid: Optional[str] = None
         workers: Union[Literal["auto"], IntPositive] = IntPositive(1)
         use_cache_gc: bool = True
@@ -126,7 +124,6 @@ class ServerSchema(SchemaNode):
     _PREVIOUS_SCHEMA = Raw
 
     hostname: str
-    groupid: str
     nsid: Optional[str]
     workers: IntPositive
     use_cache_gc: bool
index 588b8416c30b18f03bc1012670ecc9c6cc839fa2..c479db09e4a955677df968b63ff6dac48475c2c0 100644 (file)
@@ -215,9 +215,8 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 pass
 
         async with self._manager_lock:
-            await self._ensure_number_of_children(KresConfig(), 0)
-            if self._gc is not None:
-                await self._stop_gc()
+            # we could stop all the children one by one right now
+            # we won't do that and we leave that up to the subprocess controller to do that while it is shutting down
             await self._controller.shutdown_controller()
 
     async def forced_shutdown(self) -> None:
index 58d454f505d49a63219f210244d00c68edf64b88..9b0ae41728a9fcc5f8e75f8d6a4852e8f924d180 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import logging
 import os
 import re
@@ -5,8 +6,8 @@ from typing import Dict, Iterable, List, Optional, Union
 
 from knot_resolver_manager import compat
 from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.constants import user_constants
 from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import (
     KresID,
     Subprocess,
@@ -17,10 +18,14 @@ from knot_resolver_manager.kresd_controller.interface import (
 from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
     SystemdType,
     Unit,
+    is_unit_failed,
+    list_our_slice_processes,
     list_units,
     reset_failed_unit,
     restart_unit,
+    start_slice,
     start_transient_kresd_unit,
+    stop_slice,
     stop_unit,
 )
 from knot_resolver_manager.utils import phantom_use
@@ -34,21 +39,17 @@ KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$")
 
 
 def _is_service_name_ours(name: str) -> bool:
-    pref_len = len(user_constants().SERVICE_GROUP_ID)
-    is_ours = name == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
-    is_ours |= name.startswith(user_constants().SERVICE_GROUP_ID) and bool(
-        KRESD_SERVICE_BASE_PATTERN.match(name[pref_len:])
-    )
+    is_ours = name == GC_SERVICE_BASE_NAME
+    is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(name))
     return is_ours
 
 
 class SystemdKresID(KresID):
     @staticmethod
     def from_string(val: str) -> "SystemdKresID":
-        if val == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME:
+        if val == GC_SERVICE_BASE_NAME:
             return SystemdKresID.new(SubprocessType.GC, -1)
         else:
-            val = val[len(user_constants().SERVICE_GROUP_ID) :]
             kid = KRESD_SERVICE_BASE_PATTERN.search(val)
             if kid:
                 return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0]))
@@ -57,9 +58,9 @@ class SystemdKresID(KresID):
 
     def __str__(self) -> str:
         if self.subprocess_type is SubprocessType.GC:
-            return user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
+            return GC_SERVICE_BASE_NAME
         elif self.subprocess_type is SubprocessType.KRESD:
-            return f"{user_constants().SERVICE_GROUP_ID}kresd_{self._id}.service"
+            return f"kresd_{self._id}.service"
         else:
             raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
 
@@ -129,31 +130,41 @@ class SystemdSubprocessController(SubprocessController):
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         assert self._controller_config is not None
 
-        res: List[SystemdSubprocess] = []
-        units = await compat.asyncio.to_thread(list_units, self._systemd_type)
-        for unit in units:
-            if _is_service_name_ours(unit.name):
-                if unit.state == "failed":
+        units = await compat.asyncio.to_thread(list_our_slice_processes, self._controller_config, self._systemd_type)
+
+        async def load(name: str) -> Optional[SystemdSubprocess]:
+            assert self._controller_config is not None
+
+            if _is_service_name_ours(name):
+                failed = await to_thread(is_unit_failed, self._systemd_type, name)
+                if failed:
                     # if a unit is failed, remove it from the system by reseting its state
-                    logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
-                    await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name)
-                    continue
-
-                res.append(
-                    SystemdSubprocess(
-                        self._controller_config,
-                        self._systemd_type,
-                        SystemdKresID.from_string(unit.name),
-                    )
+                    logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", name)
+                    await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, name)
+                    return None
+
+                return SystemdSubprocess(
+                    self._controller_config,
+                    self._systemd_type,
+                    SystemdKresID.from_string(name),
                 )
+            else:
+                return None
 
-        return res
+        subprocesses = await asyncio.gather(*[load(name) for name in units])
+        return filter(lambda x: x is not None, subprocesses)  # type: ignore
 
     async def initialize_controller(self, config: KresConfig) -> None:
         self._controller_config = config
+        try:
+            await to_thread(start_slice, self._controller_config, self._systemd_type)
+        except SubprocessControllerException as e:
+            logger.warning(
+                f"Failed to create systemd slice for our subprocesses: '{e}'. There is/was a manager running with the same ID."
+            )
 
     async def shutdown_controller(self) -> None:
-        pass
+        await to_thread(stop_slice, self._controller_config, self._systemd_type)
 
     async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
         assert self._controller_config is not None
index 4f0c0367250636f42084d15d5e8ced301a2ad706..b12ed609449130afda70a3f43ae0b5da98054303 100644 (file)
@@ -5,7 +5,7 @@ import logging
 import os
 from enum import Enum, auto
 from threading import Thread
-from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
 
 from gi.repository import GLib  # type: ignore[import]
 from pydbus import SystemBus  # type: ignore[import]
@@ -160,6 +160,16 @@ def reset_failed_unit(typ: SystemdType, unit_name: str) -> None:
     systemd.ResetFailedUnit(unit_name)
 
 
+@_wrap_dbus_errors
+def _list_slice_services(typ: SystemdType, slice_name: str) -> Set[str]:
+    # uses DBus method call, which is not documented, but is present since 2016 and is used by `systemctl status`
+    # appears for the first time in commit 291d565a04263452c03beaf537773ade4f0b1617 in systemd
+
+    systemd = _create_manager_proxy(typ)
+    data = systemd.GetUnitProcesses(slice_name)
+    return set((p[0].split("/")[-1] for p in data))
+
+
 @_wrap_dbus_errors
 def restart_unit(type_: SystemdType, unit_name: str) -> None:
     systemd = _create_manager_proxy(type_)
@@ -170,6 +180,10 @@ def restart_unit(type_: SystemdType, unit_name: str) -> None:
     _wait_for_job_completion(systemd, job)
 
 
+def _slice_name(config: KresConfig) -> str:
+    return f"kres-{config.id}.slice"
+
+
 def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[str, str]]:
     val: Any = [
         ("Description", GLib.Variant("s", "transient Knot Resolver unit started by Knot Resolver Manager")),
@@ -195,6 +209,7 @@ def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[st
         ("Restart", GLib.Variant("s", "always")),
         ("LimitNOFILE", GLib.Variant("t", 524288)),
         ("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={kres_id}"])),
+        ("Slice", GLib.Variant("s", _slice_name(config))),
     ]
 
     if config.server.watchdog:
@@ -230,19 +245,21 @@ def _gc_unit_properties(config: KresConfig) -> Any:
         ("RestartUSec", GLib.Variant("t", 30000000)),
         ("StartLimitIntervalUSec", GLib.Variant("t", 400000000)),
         ("StartLimitBurst", GLib.Variant("u", 10)),
+        ("Slice", GLib.Variant("s", _slice_name(config))),
     ]
     return val
 
 
-@_wrap_dbus_errors
-def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None:
-    properties = {
-        SubprocessType.KRESD: _kresd_unit_properties(config, kres_id),
-        SubprocessType.GC: _gc_unit_properties(config),
-    }[kres_id.subprocess_type]
-    name = str(kres_id)
+def _kres_slice_properties() -> Any:
+    val: Any = [
+        ("Description", GLib.Variant("s", "Knot Resolver processes")),
+    ]
+    return val
 
-    systemd = _create_manager_proxy(type_)
+
+@_wrap_dbus_errors
+def _start_transient_unit(systemd_type: SystemdType, name: str, properties: Any) -> None:
+    systemd = _create_manager_proxy(systemd_type)
 
     def job():
         return systemd.StartTransientUnit(name, "fail", properties, [])
@@ -262,6 +279,28 @@ def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id:
         raise SubprocessControllerException(f"Failed to start systemd transient service '{name}': {e}") from e
 
 
+def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None:
+    properties = {
+        SubprocessType.KRESD: _kresd_unit_properties(config, kres_id),
+        SubprocessType.GC: _gc_unit_properties(config),
+    }[kres_id.subprocess_type]
+    name = str(kres_id)
+
+    _start_transient_unit(type_, name, properties)
+
+
+def start_slice(config: KresConfig, systemd_type: SystemdType) -> None:
+    _start_transient_unit(systemd_type, _slice_name(config), _kres_slice_properties())
+
+
+def stop_slice(config: KresConfig, systemd_type: SystemdType) -> None:
+    stop_unit(systemd_type, _slice_name(config))
+
+
+def list_our_slice_processes(config: KresConfig, systemd_type: SystemdType) -> Set[str]:
+    return _list_slice_services(systemd_type, _slice_name(config))
+
+
 @_wrap_dbus_errors
 def start_unit(type_: SystemdType, unit_name: str) -> None:
     systemd = _create_manager_proxy(type_)
@@ -289,6 +328,14 @@ def list_unit_files(type_: SystemdType) -> List[str]:
     return [str(x[0]) for x in files]
 
 
+@_wrap_dbus_errors
+def is_unit_failed(typ: SystemdType, unit_name: str) -> bool:
+    systemd = _create_manager_proxy(typ)
+    unit_path = systemd.LoadUnit(unit_name)
+    unit_object = _create_object_proxy(typ, ".systemd1", unit_path)
+    return unit_object.ActiveState == "failed"
+
+
 @_wrap_dbus_errors
 def can_load_unit(type_: SystemdType, unit_name: str) -> bool:
     systemd = _create_manager_proxy(type_)
index 9b2a08e8d78ef8052e2fcd8ac61197dc0cf21ea2..a35f4a10dd104a6ea6e94157b022f4717cf7b50c 100644 (file)
@@ -19,7 +19,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 from knot_resolver_manager import log, statistics
 from knot_resolver_manager.compat import asyncio as asyncio_compat
 from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.datamodel.server_schema import ManagementSchema
 from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
@@ -305,7 +305,6 @@ async def _load_config(config: ParsedTree) -> KresConfig:
 async def _init_config_store(config: ParsedTree) -> ConfigStore:
     config_validated = await _load_config(config)
     config_store = ConfigStore(config_validated)
-    await init_user_constants(config_store)
     return config_store
 
 
@@ -355,6 +354,9 @@ def _lock_working_directory(attempt: int = 0) -> None:
                 os.kill(pid, 0)
             except OSError as e:
                 if e.errno == errno.ESRCH:
+                    logger.warning(
+                        "Detected old lock file in the working directory, previous instance of the manager must have crashed."
+                    )
                     os.unlink(PID_FILE_NAME)
                     _lock_working_directory(attempt=attempt + 1)
                     return