[flake8]
-max-line-length = 200
\ No newline at end of file
+max-line-length = 200
+extend-ignore = E203
\ No newline at end of file
import logging
from pathlib import Path
+from typing import TYPE_CHECKING, Optional
+from knot_resolver_manager.config_store import ConfigStore
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.utils import which
+from knot_resolver_manager.utils.functional import Result
+
+if TYPE_CHECKING:
+ from knot_resolver_manager.kresd_controller.interface import KresID
STARTUP_LOG_LEVEL = logging.DEBUG
DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml")
return config.cache.storage.to_path()
-def kresd_config_file(config: KresConfig, kres_id: KresID) -> Path:
- return Path(f"{config.server.groupid}kresd_{kres_id}.conf")
+def kresd_config_file(_config: KresConfig, kres_id: "KresID") -> Path:
+ return Path(f"{kres_id}.conf")
def supervisord_config_file(_config: KresConfig) -> Path:
"""
Used in KresdManager. It's a number of seconds in between system health checks.
"""
+
+
+class _UserConstants:
+ """
+ Class for accessing constants, which are technically not constants as they are user configurable.
+ """
+
+ def __init__(self, config_store: ConfigStore) -> None:
+ self._config_store = config_store
+
+ @property
+ def SERVICE_GROUP_ID(self) -> str:
+ return self._config_store.get().server.groupid
+
+
+_user_constants: Optional[_UserConstants] = None
+
+
+async def _deny_groupid_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+ if config_old.server.groupid != config_new.server.groupid:
+ return Result.err(
+ "/server/groupid: Based on the groupid, the manager recognizes his subprocesses,"
+ " so it is not possible to change it while services are running."
+ )
+ return Result.ok(None)
+
+
+async def init_user_constants(config_store: ConfigStore) -> None:
+ global _user_constants
+ _user_constants = _UserConstants(config_store)
+
+ await config_store.register_verifier(_deny_groupid_changes)
+
+
+def user_constants() -> _UserConstants:
+ assert _user_constants is not None
+ return _user_constants
+++ /dev/null
-import itertools
-import weakref
-from typing import Optional
-
-from knot_resolver_manager.utils import ignore_exceptions_optional
-
-
-class KresID:
- """
- ID object. Effectively only a wrapper around an int, so that the references
- behave normally (bypassing integer interning and other optimizations)
- """
-
- _used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
-
- @staticmethod
- def alloc(_custom_name_id: bool = False) -> "KresID":
- for i in itertools.count(start=1):
- val = KresID(i if not _custom_name_id else -i)
- if val not in KresID._used:
- KresID._used.add(val)
- return val
-
- raise RuntimeError("Reached an end of an infinite loop. How?")
-
- @staticmethod
- def from_string(val: str) -> "KresID":
- """
- Create a new KresID instance with ID based on the given string. There are no guarantees
- that the returned KresID is unique.
- """
- int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
- if int_val is not None:
- res = KresID(int_val)
- else:
- # this would be for example 'gc'
- # we want a special value, so that they do not clash with normal numerical values
- res = KresID.alloc(_custom_name_id=True)
- res.set_custom_str_representation(val)
-
- KresID._used.add(res)
- return res
-
- def __init__(self, n: int):
- self._id = n
- self._repr: Optional[str] = None
-
- def set_custom_str_representation(self, representation: str) -> None:
- self._repr = representation
-
- def __str__(self) -> str:
- if self._repr is None:
- return str(self._id)
- else:
- return self._repr
-
- def __repr__(self) -> str:
- return f"KresID({self})"
-
- def __hash__(self) -> int:
- if self._repr:
- return hash(self._repr)
- return self._id
-
- def __eq__(self, o: object) -> bool:
- if isinstance(o, KresID):
- ret = self._id == o._id
- if self._repr:
- ret |= self._repr == o._repr
- return ret
- return False
try:
# gather current state
- detected_subprocesses = await self._controller.get_subprocess_status()
+ async with self._manager_lock:
+ detected_subprocesses = await self._controller.get_subprocess_status()
expected_ids = [x.id for x in self._workers]
if self._gc:
expected_ids.append(self._gc.id)
import asyncio
+import itertools
import sys
from enum import Enum, auto
-from typing import Dict, Iterable, Optional
+from typing import Dict, Iterable, Optional, Type, TypeVar
+from weakref import WeakValueDictionary
from knot_resolver_manager.constants import kresd_config_file
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for
from knot_resolver_manager.utils.async_utils import writefile
GC = auto()
+T = TypeVar("T", bound="KresID")
+
+
+class KresID:
+ """
+ ID object used for identifying subprocesses.
+ """
+
+ _used: "WeakValueDictionary[int, KresID]" = WeakValueDictionary()
+
+ @classmethod
+ def alloc(cls: Type[T], typ: SubprocessType) -> T:
+ # we split them in order to make the numbers nice (no gaps, pretty naming)
+ # there are no strictly technical reasons to do this
+ #
+ # GC - negative IDs
+ # KRESD - positive IDs
+ if typ is SubprocessType.GC:
+ start = -1
+ step = -1
+ elif typ is SubprocessType.KRESD:
+ start = 1
+ step = 1
+ else:
+ raise RuntimeError(f"Unexpected subprocess type {typ}")
+
+ # find free ID closest to zero
+ for i in itertools.count(start=start, step=step):
+ if i not in cls._used:
+ res = cls.new(typ, i)
+ return res
+
+ raise RuntimeError("Reached an end of an infinite loop. How?")
+
+ @classmethod
+ def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T":
+ if n in cls._used:
+ # Ignoring typing here, because I can't find a way how to make the _used dict
+ # typed based on subclass. I am not even sure that it's different between subclasses,
+ # it's probably still the same dict. But we don't really care about it
+ return cls._used[n] # type: ignore
+ else:
+ val = cls(typ, n, _i_know_what_i_am_doing=True)
+ cls._used[n] = val
+ return val
+
+ def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
+ if not _i_know_what_i_am_doing:
+ raise RuntimeError("Don't do this. You seem to have no idea what it does")
+
+ self._id = n
+ self._type = typ
+
+ @property
+ def subprocess_type(self) -> SubprocessType:
+ return self._type
+
+ def __repr__(self) -> str:
+ return f"KresID({self})"
+
+ def __hash__(self) -> int:
+ return self._id
+
+ def __eq__(self, o: object) -> bool:
+ if isinstance(o, KresID):
+ return self._id == o._id
+ return False
+
+ def __str__(self) -> str:
+ """
+ Returns string representation of the ID usable directly in the underlying service manager
+ """
+ raise NotImplementedError()
+
+ @staticmethod
+ def from_string(val: str) -> "KresID":
+ """
+ Inverse of __str__
+ """
+ raise NotImplementedError()
+
+
class Subprocess:
"""
One SubprocessInstance corresponds to one manager's subprocess
"""
- def __init__(self, config: KresConfig, typ: SubprocessType, custom_id: Optional[KresID] = None) -> None:
- self._id = KresID.alloc() if custom_id is None else custom_id
+ def __init__(self, config: KresConfig, kid: KresID) -> None:
+ self._id = kid
self._config = config
self._metrics_registered: bool = False
- self._type = typ
async def start(self) -> None:
# create config file
@property
def type(self) -> SubprocessType:
- return self._type
+ return self.id.subprocess_type
@property
def id(self) -> KresID:
import signal
from os import kill
from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Any, Dict, Iterable, List, Optional, Set, Union
from xmlrpc.client import ServerProxy
import supervisor.xmlrpc # type: ignore[import]
)
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import (
+ KresID,
Subprocess,
SubprocessController,
SubprocessStatus,
logger = logging.getLogger(__name__)
+class SupervisordKresID(KresID):
+ @staticmethod
+ def from_string(val: str) -> "SupervisordKresID":
+ if val == "gc":
+ return SupervisordKresID.new(SubprocessType.GC, -1)
+ else:
+ val = val.replace("kresd", "")
+ return SupervisordKresID.new(SubprocessType.KRESD, int(val))
+
+ def __str__(self) -> str:
+ if self.subprocess_type is SubprocessType.GC:
+ return "gc"
+ elif self.subprocess_type is SubprocessType.KRESD:
+ return f"kresd{self._id}"
+ else:
+ raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+
+
@dataclass
class _Instance:
"""
status = SubprocessStatus.UNKNOWN
return status
- return {KresID.from_string(pr["name"]): convert(pr) for pr in processes}
+ return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes}
-async def _list_ids_from_existing_config(cfg: KresConfig) -> List[Tuple[SubprocessType, KresID]]:
+async def _list_ids_from_existing_config(cfg: KresConfig) -> List[SupervisordKresID]:
config = await readfile(supervisord_config_file(cfg))
cp = configparser.ConfigParser()
cp.read_string(config)
- res: List[Tuple[SubprocessType, KresID]] = []
+ res: List[SupervisordKresID] = []
for section in cp.sections():
if section.startswith("program:"):
program_id = section.replace("program:", "")
- iid = KresID.from_string(program_id)
- typ = SubprocessType[cp[section].get("type")]
- res.append((typ, iid))
+ kid = SupervisordKresID.from_string(program_id)
+ res.append(kid)
return res
self,
config: KresConfig,
controller: "SupervisordSubprocessController",
- typ: SubprocessType,
- custom_id: Optional[KresID] = None,
+ base_id: Union[SubprocessType, SupervisordKresID],
):
- super().__init__(config, typ, custom_id=custom_id)
+ if isinstance(base_id, SubprocessType):
+ super().__init__(config, SupervisordKresID.alloc(base_id))
+ else:
+ super().__init__(config, base_id)
self._controller: "SupervisordSubprocessController" = controller
async def _start(self) -> None:
running = await _is_supervisord_running(config)
if running:
ids = await _list_ids_from_existing_config(config)
- for tp, id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self._controller_config, self, tp, custom_id=id_))
+ for id_ in ids:
+ self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_))
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
import logging
import os
-from typing import Dict, Iterable, List, Optional
+import re
+from typing import Dict, Iterable, List, Optional, Union
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.constants import user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import (
+ KresID,
Subprocess,
SubprocessController,
SubprocessStatus,
SubprocessType,
)
from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
- GC_SERVICE_BASE_NAME,
SystemdType,
Unit,
- create_service_name,
- is_service_name_ours,
- kres_id_from_service_name,
list_units,
reset_failed_unit,
restart_unit,
logger = logging.getLogger(__name__)
+GC_SERVICE_BASE_NAME = "kres_cache_gc.service"
+KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$")
+
+
+def _is_service_name_ours(name: str) -> bool:
+ pref_len = len(user_constants().SERVICE_GROUP_ID)
+ is_ours = name == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
+ is_ours |= name.startswith(user_constants().SERVICE_GROUP_ID) and bool(
+ KRESD_SERVICE_BASE_PATTERN.match(name[pref_len:])
+ )
+ return is_ours
+
+
+class SystemdKresID(KresID):
+ @staticmethod
+ def from_string(val: str) -> "SystemdKresID":
+ if val == user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME:
+ return SystemdKresID.new(SubprocessType.GC, -1)
+ else:
+ val = val[len(user_constants().SERVICE_GROUP_ID) :]
+ kid = KRESD_SERVICE_BASE_PATTERN.search(val)
+ if kid:
+ return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0]))
+ else:
+ raise RuntimeError("Trying to parse systemd service name which does not match our expectations")
+
+ def __str__(self) -> str:
+ if self.subprocess_type is SubprocessType.GC:
+ return user_constants().SERVICE_GROUP_ID + GC_SERVICE_BASE_NAME
+ elif self.subprocess_type is SubprocessType.KRESD:
+ return f"{user_constants().SERVICE_GROUP_ID}kresd_{self._id}.service"
+ else:
+ raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+
+
class SystemdSubprocess(Subprocess):
- def __init__(
- self, config: KresConfig, typ: SubprocessType, systemd_type: SystemdType, custom_id: Optional[KresID] = None
- ):
- super().__init__(config, typ, custom_id=custom_id)
+ def __init__(self, config: KresConfig, systemd_type: SystemdType, id_base: Union[SubprocessType, KresID]):
+ if isinstance(id_base, SubprocessType):
+ super().__init__(config, SystemdKresID.alloc(id_base))
+ else:
+ super().__init__(config, id_base)
self._systemd_type = systemd_type
async def _start(self):
- await compat.asyncio.to_thread(
- start_transient_kresd_unit, self._config, self._systemd_type, self.id, self._type
- )
+ await compat.asyncio.to_thread(start_transient_kresd_unit, self._config, self._systemd_type, self.id)
async def _stop(self):
- await compat.asyncio.to_thread(stop_unit, self._systemd_type, create_service_name(self.id, self._config))
+ await compat.asyncio.to_thread(stop_unit, self._systemd_type, str(self.id))
async def _restart(self):
- await compat.asyncio.to_thread(restart_unit, self._systemd_type, create_service_name(self.id, self._config))
+ await compat.asyncio.to_thread(restart_unit, self._systemd_type, str(self.id))
class SystemdSubprocessController(SubprocessController):
res: List[SystemdSubprocess] = []
units = await compat.asyncio.to_thread(list_units, self._systemd_type)
for unit in units:
- if is_service_name_ours(unit.name, self._controller_config):
+ if _is_service_name_ours(unit.name):
if unit.state == "failed":
# if a unit is failed, remove it from the system by reseting its state
logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
res.append(
SystemdSubprocess(
self._controller_config,
- SubprocessType.GC
- if unit.name == self._controller_config.server.groupid + GC_SERVICE_BASE_NAME
- else SubprocessType.KRESD,
self._systemd_type,
- custom_id=kres_id_from_service_name(unit.name, self._controller_config),
+ SystemdKresID.from_string(unit.name),
)
)
async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
assert self._controller_config is not None
-
- custom_id = None
- if subprocess_type == SubprocessType.GC:
- custom_id = KresID.from_string(subprocess_config.server.groupid + GC_SERVICE_BASE_NAME)
-
- return SystemdSubprocess(subprocess_config, subprocess_type, self._systemd_type, custom_id=custom_id)
+ return SystemdSubprocess(subprocess_config, self._systemd_type, subprocess_type)
async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
assert self._controller_config is not None
data: List[Unit] = await to_thread(list_units, self._systemd_type)
- our_data = filter(lambda u: is_service_name_ours(u.name, self._controller_config), data) # type: ignore
- return {kres_id_from_service_name(u.name, self._controller_config): convert(u.state) for u in our_data}
+ our_data = filter(lambda u: _is_service_name_ours(u.name), data)
+ return {SystemdKresID.from_string(u.name): convert(u.state) for u in our_data}
import logging
import os
-import re
from enum import Enum, auto
from threading import Thread
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from knot_resolver_manager.constants import kres_gc_executable, kresd_cache_dir, kresd_config_file, kresd_executable
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.kres_id import KresID
-from knot_resolver_manager.kresd_controller.interface import SubprocessType
+from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
logger = logging.getLogger(__name__)
-GC_SERVICE_BASE_NAME = "kres_cache_gc.service"
-KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$")
-
-
-def service_name_remove_prefix(service_name: str, prefix: str) -> str:
- return service_name[len(prefix) :] if service_name.startswith(prefix) else service_name # noqa: E203
-
-
-def kres_id_from_service_name(service_name: str, config: KresConfig) -> KresID:
- service_name_noprefix = service_name_remove_prefix(service_name, config.server.groupid)
- kid = KRESD_SERVICE_BASE_PATTERN.search(service_name_noprefix)
- if kid:
- return KresID.from_string(kid.groups()[0])
- return KresID.from_string(service_name)
-
-
-def create_service_name(kid: KresID, config: KresConfig) -> str:
- rep = str(kid)
- if rep.isnumeric():
- return f"{config.server.groupid}kresd_{rep}.service"
- return rep
-
-
-def is_service_name_ours(service_name: str, config: KresConfig) -> bool:
- service_name_noprefix = service_name_remove_prefix(service_name, config.server.groupid)
- is_ours = service_name_noprefix == GC_SERVICE_BASE_NAME
- is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(service_name_noprefix))
- return is_ours
-
class SystemdType(Enum):
SYSTEM = auto()
return event_hander
+ loop: Any = None
+
def event_loop_isolation_thread() -> None:
- loop: Any = GLib.MainLoop()
+ nonlocal loop
+ loop = GLib.MainLoop()
systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop))
loop.run()
# first start the thread to watch for results to prevent race conditions
- thread = Thread(target=event_loop_isolation_thread)
+ thread = Thread(target=event_loop_isolation_thread, name="glib-loop-isolation-thread")
thread.start()
# then create the job
- job_path = job_creating_func()
+ try:
+ job_path = job_creating_func()
+ except BaseException:
+ loop.quit()
+ raise
# then wait for the results
thread.join()
@_wrap_dbus_errors
-def start_transient_kresd_unit(
- config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType
-) -> None:
- name, properties = {
- SubprocessType.KRESD: (create_service_name(kres_id, config), _kresd_unit_properties(config, kres_id)),
- SubprocessType.GC: (create_service_name(kres_id, config), _gc_unit_properties(config)),
- }[subprocess_type]
+def start_transient_kresd_unit(config: KresConfig, type_: SystemdType, kres_id: KresID) -> None:
+ properties = {
+ SubprocessType.KRESD: _kresd_unit_properties(config, kres_id),
+ SubprocessType.GC: _gc_unit_properties(config),
+ }[kres_id.subprocess_type]
+ name = str(kres_id)
systemd = _create_manager_proxy(type_)
from knot_resolver_manager import log, statistics
from knot_resolver_manager.compat import asyncio as asyncio_compat
from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, init_user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.server_schema import ManagementSchema
from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException, TreeException
)
return Result.ok(None)
- async def _deny_groupid_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
- if config_old.server.groupid != config_new.server.groupid:
- return Result.err(
- "/server/groupid: Based on the groupid, the manager recognizes his subprocesses,"
- " so it is not possible to change it while services are running."
- )
- return Result.ok(None)
-
async def sigint_handler(self) -> None:
logger.info("Received SIGINT, triggering graceful shutdown")
self.shutdown_event.set()
asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler)
await self.runner.setup()
await self.config_store.register_verifier(self._deny_management_changes)
- await self.config_store.register_verifier(self._deny_groupid_changes)
await self.config_store.register_on_change_callback(self._reconfigure)
async def wait_for_shutdown(self) -> None:
async def _init_config_store(config: ParsedTree) -> ConfigStore:
config_validated = await _load_config(config)
- return ConfigStore(config_validated)
+ config_store = ConfigStore(config_validated)
+ await init_user_constants(config_store)
+ return config_store
async def _init_manager(config_store: ConfigStore) -> KresManager:
from knot_resolver_manager import compat
from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.utils.functional import Result
if TYPE_CHECKING:
- from knot_resolver_manager.kresd_controller.interface import Subprocess
+ from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess
logger = logging.getLogger(__name__)
return decorator
-async def _command_registered_resolvers(cmd: str) -> Dict[KresID, str]:
- async def single_pair(sub: "Subprocess") -> Tuple[KresID, str]:
+async def _command_registered_resolvers(cmd: str) -> "Dict[KresID, str]":
+ async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
return sub.id, await sub.command(cmd)
pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values()))
class ResolverCollector:
def __init__(self, config_store: ConfigStore) -> None:
- self._stats_raw: Optional[Dict[KresID, str]] = None
+ self._stats_raw: "Optional[Dict[KresID, str]]" = None
self._config_store: ConfigStore = config_store
self._collection_task: "Optional[asyncio.Task[None]]" = None
self._skip_immediate_collection: bool = False
# when not running, we can start a new loop (we are not in the manager's main thread)
compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
- def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily:
+ def _create_resolver_metrics_loaded_gauge(self, kid: "KresID", loaded: bool) -> GaugeMetricFamily:
return _gauge(
"resolver_metrics_loaded",
"0 if metrics from resolver instance were not loaded, otherwise 1",
# this function prevents the collector registry from invoking the collect function on startup
return []
- def _parse_resolver_metrics(self, instance_id: KresID, metrics: Any) -> Generator[Metric, None, None]:
+ def _parse_resolver_metrics(self, instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]:
sid = str(instance_id)
# response latency histogram