behave normally (bypassing integer interning and other optimizations)
"""
+ _used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
+
+ @staticmethod
+ def alloc(_custom_name_id: bool = False) -> "KresID":
+ for i in itertools.count(start=1):
+ val = KresID(i if not _custom_name_id else -i)
+ if val not in KresID._used:
+ KresID._used.add(val)
+ return val
+
+ raise RuntimeError("Reached an end of an infinite loop. How?")
+
+ @staticmethod
+ def from_string(val: str) -> "KresID":
+ """
+ Create a new KresID instance with ID based on the given string. There are no guarantees
+ that the returned KresID is unique.
+ """
+ int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
+ if int_val is not None:
+ res = KresID(int_val)
+ else:
+ # this would be for example 'gc'
+ # we want a special value, so that they do not clash with normal numerical values
+ res = KresID.alloc(_custom_name_id=True)
+ res.set_custom_str_representation(val)
+
+ KresID._used.add(res)
+ return res
+
def __init__(self, n: int):
self._id = n
self._repr: Optional[str] = None
def __eq__(self, o: object) -> bool:
return isinstance(o, KresID) and self._id == o._id
-
-
-_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
-
-
-def alloc(_custom_name_id: bool = False) -> KresID:
- for i in itertools.count(start=1):
- val = KresID(i if not _custom_name_id else -i)
- if val not in _used:
- _used.add(val)
- return val
-
- raise RuntimeError("Reached an end of an infinite loop. How?")
-
-
-def alloc_from_string(val: str) -> KresID:
- int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
- if int_val is not None:
- res = KresID(int_val)
- assert res not in _used, "Force allocating a KresID, which already exists..."
- _used.add(res)
- return res
- else:
- # this would be for example 'gc'
- # we want a special value, so that they do not clash with normal numerical values
- res = alloc(_custom_name_id=True)
- res.set_custom_str_representation(val)
- return res
-
-
-def lookup_from_string(val: str) -> KresID:
- for allocated_id in _used:
- if str(allocated_id) == val:
- return allocated_id
-
- raise IndexError(f"ID with identifier '{val}' was not allocated")
import asyncio
import logging
import sys
-from asyncio.futures import Future
from subprocess import SubprocessError
from typing import List, Optional
import knot_resolver_manager.kresd_controller
-from knot_resolver_manager import kres_id
from knot_resolver_manager.compat.asyncio import create_task
from knot_resolver_manager.config_store import ConfigStore
from knot_resolver_manager.constants import WATCHDOG_INTERVAL
SubprocessStatus,
SubprocessType,
)
-from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.types import NoneType
self._gc: Optional[Subprocess] = None
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
- self._watchdog_task: Optional["Future[None]"] = None
+ self._watchdog_task: Optional["asyncio.Task[None]"] = None
@staticmethod
async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager":
await self._collect_already_running_children()
async def _spawn_new_worker(self, config: KresConfig) -> None:
- subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD, kres_id.alloc())
+ subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD)
await subprocess.start()
-
- register_resolver_metrics_for(subprocess)
self._workers.append(subprocess)
async def _stop_a_worker(self) -> None:
raise IndexError("Can't stop a kresd when there are no running")
subprocess = self._workers.pop()
- unregister_resolver_metrics_for(subprocess)
await subprocess.stop()
async def _collect_already_running_children(self) -> None:
return self._gc is not None
async def _start_gc(self, config: KresConfig) -> None:
- subprocess = await self._controller.create_subprocess(config, SubprocessType.GC, kres_id.alloc())
+ subprocess = await self._controller.create_subprocess(config, SubprocessType.GC)
await subprocess.start()
self._gc = subprocess
async def stop(self):
if self._watchdog_task is not None:
- self._watchdog_task.cancel()
+ self._watchdog_task.cancel() # cancel it
+ try:
+ await self._watchdog_task # and let it really finish
+ except asyncio.CancelledError:
+ pass
async with self._manager_lock:
await self._ensure_number_of_children(KresConfig(), 0)
async def _instability_handler(self) -> None:
logger.error(
- "Instability callback invoked. Something is wrong, no idea how to react."
- " Performing suicide. See you later!"
+ "Instability detected. Something is wrong, no idea how to react." " Performing suicide. See you later!"
)
sys.exit(1)
while True:
await asyncio.sleep(WATCHDOG_INTERVAL)
- # gather current state
- detected_subprocesses = await self._controller.get_subprocess_status()
- worker_ids = [x.id for x in self._workers]
- invoke_callback = False
-
- for w in worker_ids:
- if w not in detected_subprocesses:
- logger.error("Expected to find subprocess with id '%s' in the system, but did not.", w)
- invoke_callback = True
- continue
-
- if detected_subprocesses[w] is SubprocessStatus.FAILED:
- logger.error("Subprocess '%s' is failed.", w)
- invoke_callback = True
- continue
-
- if detected_subprocesses[w] is SubprocessStatus.UNKNOWN:
- logger.warning("Subprocess '%s' is in unknown state!", w)
+ try:
+ # gather current state
+ detected_subprocesses = await self._controller.get_subprocess_status()
+ worker_ids = [x.id for x in self._workers]
+ invoke_callback = False
+
+ for w in worker_ids:
+ if w not in detected_subprocesses:
+ logger.error("Expected to find subprocess with id '%s' in the system, but did not.", w)
+ invoke_callback = True
+ continue
+
+ if detected_subprocesses[w] is SubprocessStatus.FAILED:
+ logger.error("Subprocess '%s' is failed.", w)
+ invoke_callback = True
+ continue
+
+ if detected_subprocesses[w] is SubprocessStatus.UNKNOWN:
+ logger.warning("Subprocess '%s' is in unknown state!", w)
+
+ except asyncio.CancelledError:
+ raise
+ except BaseException:
+ invoke_callback = True
+ logger.error("Knot Resolver watchdog failed with an unexpected exception.", exc_info=True)
if invoke_callback:
await self._instability_handler()
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kres_id import KresID
+from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for
from knot_resolver_manager.utils.async_utils import writefile
One SubprocessInstance corresponds to one manager's subprocess
"""
- def __init__(self, config: KresConfig) -> None:
+ def __init__(self, config: KresConfig, typ: SubprocessType, custom_id: Optional[KresID] = None) -> None:
+ self._id = KresID.alloc() if custom_id is None else custom_id
self._config = config
+ self._metrics_registered: bool = False
+ self._type = typ
async def start(self) -> None:
# create config file
await writefile(kresd_config_file(self._config, self.id), lua_config)
try:
await self._start()
+ register_resolver_metrics_for(self)
+ self._metrics_registered = True
except SubprocessControllerException as e:
kresd_config_file(self._config, self.id).unlink()
raise e
await self._restart()
async def stop(self) -> None:
+ if self._metrics_registered:
+ unregister_resolver_metrics_for(self)
await self._stop()
kresd_config_file(self._config, self.id).unlink()
@property
def type(self) -> SubprocessType:
- raise NotImplementedError()
+ return self._type
@property
def id(self) -> KresID:
- raise NotImplementedError()
+ return self._id
async def command(self, cmd: str) -> str:
reader: asyncio.StreamReader
"""
raise NotImplementedError()
- async def create_subprocess(
- self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
- ) -> Subprocess:
+ async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
"""
Return a Subprocess object which can be operated on. The subprocess is not
started or in any way active after this call. That has to be performaed manually
supervisord_subprocess_log_dir,
)
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.kres_id import KresID, alloc_from_string, lookup_from_string
+from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
status = SubprocessStatus.UNKNOWN
return status
- return {lookup_from_string(pr["name"]): convert(pr) for pr in processes}
+ return {KresID.from_string(pr["name"]): convert(pr) for pr in processes}
async def _list_ids_from_existing_config(cfg: KresConfig) -> List[Tuple[SubprocessType, KresID]]:
for section in cp.sections():
if section.startswith("program:"):
program_id = section.replace("program:", "")
- iid = alloc_from_string(program_id)
+ iid = KresID.from_string(program_id)
typ = SubprocessType[cp[section].get("type")]
res.append((typ, iid))
return res
class SupervisordSubprocess(Subprocess):
def __init__(
- self, config: KresConfig, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType
+ self,
+ config: KresConfig,
+ controller: "SupervisordSubprocessController",
+ typ: SubprocessType,
+ custom_id: Optional[KresID] = None,
):
- super().__init__(config)
+ super().__init__(config, typ, custom_id=custom_id)
self._controller: "SupervisordSubprocessController" = controller
- self._id: KresID = id_
- self._type: SubprocessType = type_
-
- @property
- def type(self) -> SubprocessType:
- return self._type
-
- @property
- def id(self) -> KresID:
- return self._id
async def _start(self) -> None:
return await self._controller.start_subprocess(self)
if running:
ids = await _list_ids_from_existing_config(config)
for tp, id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_, tp))
+ self._running_instances.add(SupervisordSubprocess(self._controller_config, self, tp, custom_id=id_))
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
assert subprocess in self._running_instances
await _restart(self._controller_config, subprocess.id)
- async def create_subprocess(
- self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
- ) -> Subprocess:
- return SupervisordSubprocess(subprocess_config, self, id_hint, subprocess_type)
+ async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
+ return SupervisordSubprocess(subprocess_config, self, subprocess_type)
async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
return await to_thread(_list_subprocesses, self._controller_config)
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.kres_id import KresID, alloc_from_string, lookup_from_string
+from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
SubprocessStatus,
SubprocessType,
)
+from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
+ GC_SERVICE_NAME,
+ SystemdType,
+ Unit,
+ is_service_name_ours,
+ kres_id_from_service_name,
+ list_units,
+ reset_failed_unit,
+ restart_unit,
+ service_name_from_kres_id,
+ start_transient_kresd_unit,
+ stop_unit,
+)
from knot_resolver_manager.utils import phantom_use
from knot_resolver_manager.utils.async_utils import call
-from . import dbus_api as systemd
-
logger = logging.getLogger(__name__)
class SystemdSubprocess(Subprocess):
def __init__(
- self,
- config: KresConfig,
- typ: SubprocessType,
- id_: KresID,
- systemd_type: systemd.SystemdType,
+ self, config: KresConfig, typ: SubprocessType, systemd_type: SystemdType, custom_id: Optional[KresID] = None
):
- super().__init__(config)
- self._type = typ
- self._id: KresID = id_
+ super().__init__(config, typ, custom_id=custom_id)
self._systemd_type = systemd_type
- @property
- def id(self) -> KresID:
- return self._id
-
- @property
- def systemd_id(self) -> str:
- if self._type is SubprocessType.GC:
- return systemd.GC_SERVICE_NAME
- else:
- return f"kresd_{self._id}.service"
-
- @staticmethod
- def is_unit_name_ours(unit_name: str) -> bool:
- is_ours = unit_name == systemd.GC_SERVICE_NAME
- is_ours |= unit_name.startswith("kresd_") and unit_name.endswith(".service")
- return is_ours
-
- @property
- def type(self):
- return self._type
-
- async def is_running(self) -> bool:
- raise NotImplementedError()
-
async def _start(self):
await compat.asyncio.to_thread(
- systemd.start_transient_kresd_unit, self._config, self._systemd_type, self.id, self._type
+ start_transient_kresd_unit, self._config, self._systemd_type, self.id, self._type
)
- async def stop(self):
- await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.systemd_id)
+ async def _stop(self):
+ await compat.asyncio.to_thread(stop_unit, self._systemd_type, service_name_from_kres_id(self.id))
async def _restart(self):
- await compat.asyncio.to_thread(systemd.restart_unit, self._systemd_type, self.systemd_id)
+ await compat.asyncio.to_thread(restart_unit, self._systemd_type, service_name_from_kres_id(self.id))
class SystemdSubprocessController(SubprocessController):
- def __init__(self, systemd_type: systemd.SystemdType):
+ def __init__(self, systemd_type: SystemdType):
self._systemd_type = systemd_type
self._controller_config: Optional[KresConfig] = None
def __str__(self):
- if self._systemd_type == systemd.SystemdType.SESSION:
+ if self._systemd_type == SystemdType.SESSION:
return "systemd-session"
- elif self._systemd_type == systemd.SystemdType.SYSTEM:
+ elif self._systemd_type == SystemdType.SYSTEM:
return "systemd"
else:
raise NotImplementedError("unknown systemd type")
phantom_use(config)
# try to run systemctl (should be quite fast)
- cmd = f"systemctl {'--user' if self._systemd_type == systemd.SystemdType.SESSION else ''} status"
+ cmd = f"systemctl {'--user' if self._systemd_type == SystemdType.SESSION else ''} status"
ret = await call(cmd, shell=True, discard_output=True)
if ret != 0:
logger.info(
# check that we run under root for non-session systemd
try:
- if self._systemd_type is systemd.SystemdType.SYSTEM and os.geteuid() != 0:
+ if self._systemd_type is SystemdType.SYSTEM and os.geteuid() != 0:
logger.info(
"Systemd (%s) looks functional, but we are not running as root. Assuming not enough privileges",
self._systemd_type,
assert self._controller_config is not None
res: List[SystemdSubprocess] = []
- units = await compat.asyncio.to_thread(systemd.list_units, self._systemd_type)
+ units = await compat.asyncio.to_thread(list_units, self._systemd_type)
for unit in units:
- if unit.name.startswith("kresd") and unit.name.endswith(".service"):
- iden = unit.name.replace("kresd", "")[1:].replace(".service", "")
-
+ if is_service_name_ours(unit.name):
if unit.state == "failed":
# if a unit is failed, remove it from the system by reseting its state
- # should work for both transient and persistent units
logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
- await compat.asyncio.to_thread(systemd.reset_failed_unit, self._systemd_type, unit.name)
+ await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name)
continue
res.append(
SystemdSubprocess(
self._controller_config,
SubprocessType.KRESD,
- alloc_from_string(iden),
self._systemd_type,
+ custom_id=kres_id_from_service_name(unit.name),
)
)
- elif unit.name == systemd.GC_SERVICE_NAME:
- # we can't easily check, if the unit is transient or not without additional systemd call
- # we ignore it for now and assume the default persistency state. It shouldn't cause any
- # problems, because interactions with the process are done the same way in all cases
+ elif unit.name == GC_SERVICE_NAME:
res.append(
SystemdSubprocess(
- self._controller_config, SubprocessType.GC, alloc_from_string("gc"), self._systemd_type
+ self._controller_config,
+ SubprocessType.GC,
+ self._systemd_type,
+ custom_id=kres_id_from_service_name(unit.name),
)
)
+
return res
async def initialize_controller(self, config: KresConfig) -> None:
async def shutdown_controller(self) -> None:
pass
- async def create_subprocess(
- self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
- ) -> Subprocess:
+ async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
assert self._controller_config is not None
- return SystemdSubprocess(subprocess_config, subprocess_type, id_hint, self._systemd_type)
+ custom_id = KresID.from_string("gc") if subprocess_type == SubprocessType.GC else None
+ return SystemdSubprocess(subprocess_config, subprocess_type, self._systemd_type, custom_id=custom_id)
async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
assert self._controller_config is not None
else:
return SubprocessStatus.UNKNOWN
- data: List[systemd.Unit] = await to_thread(systemd.list_units, self._systemd_type)
- our_data = filter(lambda u: SystemdSubprocess.is_unit_name_ours(u.name), data)
- return {lookup_from_string(u.name): convert(u.state) for u in our_data}
+ data: List[Unit] = await to_thread(list_units, self._systemd_type)
+ our_data = filter(lambda u: is_service_name_ours(u.name), data)
+ return {kres_id_from_service_name(u.name): convert(u.state) for u in our_data}
GC_SERVICE_NAME = "kres-managed-cache-gc.service"
+def kres_id_from_service_name(service_name: str) -> KresID:
+ v = service_name.replace("kresd_", "").replace(".service", "")
+ return KresID.from_string(v)
+
+
+def service_name_from_kres_id(kid: KresID) -> str:
+ return f"kresd_{kid}.service"
+
+
+def is_service_name_ours(service_name: str) -> bool:
+ is_ours = service_name == GC_SERVICE_NAME
+ is_ours |= service_name.startswith("kresd_") and service_name.endswith(".service")
+ return is_ours
+
+
class SystemdType(Enum):
SYSTEM = auto()
SESSION = auto()
config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType
) -> None:
name, properties = {
- SubprocessType.KRESD: (f"kresd_{kres_id}.service", _kresd_unit_properties(config, kres_id)),
- SubprocessType.GC: (GC_SERVICE_NAME, _gc_unit_properties(config)),
+ SubprocessType.KRESD: (service_name_from_kres_id(kres_id), _kresd_unit_properties(config, kres_id)),
+ SubprocessType.GC: (service_name_from_kres_id(kres_id), _gc_unit_properties(config)),
}[subprocess_type]
systemd = _create_manager_proxy(type_)
import asyncio
import json
import logging
-from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar
from prometheus_client import Histogram, exposition # type: ignore
from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore
from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kres_id import KresID
-from knot_resolver_manager.kresd_controller.interface import Subprocess
from knot_resolver_manager.utils.functional import Result
+if TYPE_CHECKING:
+ from knot_resolver_manager.kresd_controller.interface import Subprocess
+
+
logger = logging.getLogger(__name__)
MANAGER_REQUEST_RECONFIGURE_LATENCY = Histogram(
"manager_request_reconfigure_latency", "Time it takes to change configuration"
)
-_REGISTERED_RESOLVERS: Dict[KresID, Subprocess] = {}
+_REGISTERED_RESOLVERS: "Dict[KresID, Subprocess]" = {}
T = TypeVar("T")
async def _command_registered_resolvers(cmd: str) -> Dict[KresID, str]:
- async def single_pair(sub: Subprocess) -> Tuple[KresID, str]:
+ async def single_pair(sub: "Subprocess") -> Tuple[KresID, str]:
return sub.id, await sub.command(cmd)
pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values()))
_resolver_collector: Optional[ResolverCollector] = None
-def unregister_resolver_metrics_for(subprocess: Subprocess) -> None:
+def unregister_resolver_metrics_for(subprocess: "Subprocess") -> None:
"""
- Cancel metric collection from resolver subprocess
+ Cancel metric collection from resolver "Subprocess"
"""
del _REGISTERED_RESOLVERS[subprocess.id]
-def register_resolver_metrics_for(subprocess: Subprocess) -> None:
+def register_resolver_metrics_for(subprocess: "Subprocess") -> None:
"""
- Register resolver subprocess for metric collection
+ Register resolver "Subprocess" for metric collection
"""
_REGISTERED_RESOLVERS[subprocess.id] = subprocess
echo
echo Building Knot Resolver
echo ----------------------
+echo -e "${blue}In case of an compilation error, run this command to try to fix it:${reset}"
+echo -e "\t${blue}rm -r $(realpath .install_kresd) $(realpath .build_kresd)${reset}"
+echo
cd ..
mkdir -p manager/.build_kresd manager/.install_kresd
meson manager/.build_kresd --prefix=$(realpath manager/.install_kresd) --default-library=static --buildtype=debug