- added id field to config, which determines the slice name (must be unique across the whole system)
- we no longer stop all processes individually, instead we leave it up to the subprocess controller to stop all (and it just stops the slice)
+id: dev
cache:
storage: ../cache
logging:
import logging
from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
-from knot_resolver_manager.config_store import ConfigStore
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.utils import which
-from knot_resolver_manager.utils.functional import Result
if TYPE_CHECKING:
from knot_resolver_manager.kresd_controller.interface import KresID
"""
Used in KresdManager. It's a number of seconds in between system health checks.
"""
-
-
-class _UserConstants:
- """
- Class for accessing constants, which are technically not constants as they are user configurable.
- """
-
- def __init__(self, config_store: ConfigStore) -> None:
- self._config_store = config_store
-
- @property
- def SERVICE_GROUP_ID(self) -> str:
- return self._config_store.get().server.groupid
-
-
-_user_constants: Optional[_UserConstants] = None
-
-
-async def _deny_groupid_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
- if config_old.server.groupid != config_new.server.groupid:
- return Result.err(
- "/server/groupid: Based on the groupid, the manager recognizes his subprocesses,"
- " so it is not possible to change it while services are running."
- )
- return Result.ok(None)
-
-
-async def init_user_constants(config_store: ConfigStore) -> None:
- global _user_constants
- _user_constants = _UserConstants(config_store)
-
- await config_store.register_verifier(_deny_groupid_changes)
-
-
-def user_constants() -> _UserConstants:
- assert _user_constants is not None
- return _user_constants
import os
+import re
import sys
from typing import Dict, Optional, Union
from knot_resolver_manager.datamodel.static_hints_schema import StaticHintsSchema
from knot_resolver_manager.datamodel.stub_zone_schema import StubZoneSchema
from knot_resolver_manager.datamodel.types import DomainName
+from knot_resolver_manager.datamodel.types.base_types import PatternBase
from knot_resolver_manager.datamodel.view_schema import ViewSchema
from knot_resolver_manager.utils import SchemaNode
_MAIN_TEMPLATE = _import_lua_template()
+class IDPattern(PatternBase):
+ _re = re.compile(r"[a-zA-Z0-9]*")
+
+
class KresConfig(SchemaNode):
class Raw(SchemaNode):
"""
Knot Resolver declarative configuration.
---
+ id: System-wide unique identifier of this manager instance. Used for grouping logs and tagging kresd processes.
server: DNS server control and management configuration.
options: Fine-tuning global parameters of DNS resolver operation.
network: Network connections and protocols configuration.
lua: Custom Lua configuration.
"""
+ id: IDPattern
server: ServerSchema = ServerSchema()
options: OptionsSchema = OptionsSchema()
network: NetworkSchema = NetworkSchema()
_PREVIOUS_SCHEMA = Raw
+ id: str
server: ServerSchema
options: OptionsSchema
network: NetworkSchema
---
hostname: Internal DNS resolver hostname. Default is machine hostname.
- groupid: Additional identifier in case more DNS resolvers are running on single machine.
nsid: Name Server Identifier (RFC 5001) which allows DNS clients to request resolver to send back its NSID along with the reply to a DNS request.
workers: The number of running kresd (Knot Resolver daemon) workers. If set to 'auto', it is equal to number of CPUs available.
use_cache_gc: Use (start) kres-cache-gc (cache garbage collector) automatically.
"""
hostname: Optional[str] = None
- groupid: str = "m"
nsid: Optional[str] = None
workers: Union[Literal["auto"], IntPositive] = IntPositive(1)
use_cache_gc: bool = True
_PREVIOUS_SCHEMA = Raw
hostname: str
- groupid: str
nsid: Optional[str]
workers: IntPositive
use_cache_gc: bool
pass
async with self._manager_lock:
- await self._ensure_number_of_children(KresConfig(), 0)
- if self._gc is not None:
- await self._stop_gc()
+ # we could stop all the children one by one right now
+ # we won't do that and we leave that up to the subprocess controller to do that while it is shutting down
await self._controller.shutdown_controller()
async def forced_shutdown(self) -> None:
+import asyncio
import logging
import os
import re
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.constants import user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import (
KresID,
Subprocess,
from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
SystemdType,
Unit,
+ is_unit_failed,
+ list_our_slice_processes,
list_units,
reset_failed_unit,
restart_unit,
+ start_slice,
start_transient_kresd_unit,
+ stop_slice,
stop_unit,
)
from knot_resolver_manager.utils import phantom_use
def _is_service_name_ours(name: str) -> bool:
- pref_len = len(user_constants().SERVICE_GROUP_ID)
- is_ours = name == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
- is_ours |= name.startswith(user_constants().SERVICE_GROUP_ID) and bool(
- KRESD_SERVICE_BASE_PATTERN.match(name[pref_len:])
- )
+ is_ours = name == GC_SERVICE_BASE_NAME
+ is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(name))
return is_ours
class SystemdKresID(KresID):
@staticmethod
def from_string(val: str) -> "SystemdKresID":
- if val == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME:
+ if val == GC_SERVICE_BASE_NAME:
return SystemdKresID.new(SubprocessType.GC, -1)
else:
- val = val[len(user_constants().SERVICE_GROUP_ID) :]
kid = KRESD_SERVICE_BASE_PATTERN.search(val)
if kid:
return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0]))
def __str__(self) -> str:
if self.subprocess_type is SubprocessType.GC:
- return user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
+ return GC_SERVICE_BASE_NAME
elif self.subprocess_type is SubprocessType.KRESD:
- return f"{user_constants().SERVICE_GROUP_ID}kresd_{self._id}.service"
+ return f"kresd_{self._id}.service"
else:
raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
- res: List[SystemdSubprocess] = []
- units = await compat.asyncio.to_thread(list_units, self._systemd_type)
- for unit in units:
- if _is_service_name_ours(unit.name):
- if unit.state == "failed":
+ units = await compat.asyncio.to_thread(list_our_slice_processes, self._controller_config, self._systemd_type)
+
+ async def load(name: str) -> Optional[SystemdSubprocess]:
+ assert self._controller_config is not None
+
+ if _is_service_name_ours(name):
+ failed = await to_thread(is_unit_failed, self._systemd_type, name)
+ if failed:
# if a unit is failed, remove it from the system by reseting its state
- logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
- await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name)
- continue
-
- res.append(
- SystemdSubprocess(
- self._controller_config,
- self._systemd_type,
- SystemdKresID.from_string(unit.name),
- )
+ logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", name)
+ await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, name)
+ return None
+
+ return SystemdSubprocess(
+ self._controller_config,
+ self._systemd_type,
+ SystemdKresID.from_string(name),
)
+ else:
+ return None
- return res
+ subprocesses = await asyncio.gather(*[load(name) for name in units])
+ return filter(lambda x: x is not None, subprocesses) # type: ignore
async def initialize_controller(self, config: KresConfig) -> None:
self._controller_config = config
+ try:
+ await to_thread(start_slice, self._controller_config, self._systemd_type)
+ except SubprocessControllerException as e:
+ logger.warning(
+ f"Failed to create systemd slice for our subprocesses: '{e}'. There is/was a manager running with the same ID."
+ )
async def shutdown_controller(self) -> None:
- pass
+ await to_thread(stop_slice, self._controller_config, self._systemd_type)
async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
assert self._controller_config is not None
import os
from enum import Enum, auto
from threading import Thread
-from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
from gi.repository import GLib # type: ignore[import]
from pydbus import SystemBus # type: ignore[import]
systemd.ResetFailedUnit(unit_name)
+@_wrap_dbus_errors
+def _list_slice_services(typ: SystemdType, slice_name: str) -> Set[str]:
+ # uses DBus method call, which is not documented, but is present since 2016 and is used by `systemctl status`
+ # appears for the first time in commit 291d565a04263452c03beaf537773ade4f0b1617 in systemd
+
+ systemd = _create_manager_proxy(typ)
+ data = systemd.GetUnitProcesses(slice_name)
+ return set((p[0].split("/")[-1] for p in data))
+
+
@_wrap_dbus_errors
def restart_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
_wait_for_job_completion(systemd, job)
+def _slice_name(config: KresConfig) -> str:
+ return f"kres-{config.id}.slice"
+
+
def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[str, str]]:
val: Any = [
("Description", GLib.Variant("s", "transient Knot Resolver unit started by Knot Resolver Manager")),
("Restart", GLib.Variant("s", "always")),
("LimitNOFILE", GLib.Variant("t", 524288)),
("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={kres_id}"])),
+ ("Slice", GLib.Variant("s", _slice_name(config))),
]
if config.server.watchdog:
("RestartUSec", GLib.Variant("t", 30000000)),
("StartLimitIntervalUSec", GLib.Variant("t", 400000000)),
("StartLimitBurst", GLib.Variant("u", 10)),
+ ("Slice", GLib.Variant("s", _slice_name(config))),
]
return val
-@_wrap_dbus_errors
-def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None:
- properties = {
- SubprocessType.KRESD: _kresd_unit_properties(config, kres_id),
- SubprocessType.GC: _gc_unit_properties(config),
- }[kres_id.subprocess_type]
- name = str(kres_id)
+def _kres_slice_properties() -> Any:
+ val: Any = [
+ ("Description", GLib.Variant("s", "Knot Resolver processes")),
+ ]
+ return val
- systemd = _create_manager_proxy(type_)
+
+@_wrap_dbus_errors
+def _start_transient_unit(systemd_type: SystemdType, name: str, properties: Any) -> None:
+ systemd = _create_manager_proxy(systemd_type)
def job():
return systemd.StartTransientUnit(name, "fail", properties, [])
raise SubprocessControllerException(f"Failed to start systemd transient service '{name}': {e}") from e
+def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None:
+ properties = {
+ SubprocessType.KRESD: _kresd_unit_properties(config, kres_id),
+ SubprocessType.GC: _gc_unit_properties(config),
+ }[kres_id.subprocess_type]
+ name = str(kres_id)
+
+ _start_transient_unit(type_, name, properties)
+
+
+def start_slice(config: KresConfig, systemd_type: SystemdType) -> None:
+ _start_transient_unit(systemd_type, _slice_name(config), _kres_slice_properties())
+
+
+def stop_slice(config: KresConfig, systemd_type: SystemdType) -> None:
+ stop_unit(systemd_type, _slice_name(config))
+
+
+def list_our_slice_processes(config: KresConfig, systemd_type: SystemdType) -> Set[str]:
+ return _list_slice_services(systemd_type, _slice_name(config))
+
+
@_wrap_dbus_errors
def start_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
return [str(x[0]) for x in files]
+@_wrap_dbus_errors
+def is_unit_failed(typ: SystemdType, unit_name: str) -> bool:
+ systemd = _create_manager_proxy(typ)
+ unit_path = systemd.LoadUnit(unit_name)
+ unit_object = _create_object_proxy(typ, ".systemd1", unit_path)
+ return unit_object.ActiveState == "failed"
+
+
@_wrap_dbus_errors
def can_load_unit(type_: SystemdType, unit_name: str) -> bool:
systemd = _create_manager_proxy(type_)
from knot_resolver_manager import log, statistics
from knot_resolver_manager.compat import asyncio as asyncio_compat
from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.server_schema import ManagementSchema
from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
async def _init_config_store(config: ParsedTree) -> ConfigStore:
config_validated = await _load_config(config)
config_store = ConfigStore(config_validated)
- await init_user_constants(config_store)
return config_store
os.kill(pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
+ logger.warning(
+ "Detected old lock file in the working directory, previous instance of the manager must have crashed."
+ )
os.unlink(PID_FILE_NAME)
_lock_working_directory(attempt=attempt + 1)
return