cache:
- storage: etc/knot-resolver/cache
+ storage: ../cache
logging:
level: debug
network:
ip: 127.0.0.1
port: 5000
log-level: DEBUG
+ rundir: etc/knot-resolver/runtime
-cache/
\ No newline at end of file
+cache/
+run/
\ No newline at end of file
ip: 127.0.0.1
port: 5001
log-level: DEBUG
+ rundir: integration/run
cache:
- storage: integration/cache
+ storage: cache
import click
from knot_resolver_manager import compat
-from knot_resolver_manager.constants import MANAGER_CONFIG_FILE, STARTUP_LOG_LEVEL
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, STARTUP_LOG_LEVEL
from knot_resolver_manager.kresd_controller import list_controller_names
from knot_resolver_manager.server import start_server
nargs=1,
required=False,
default=None,
- help="Overrides default config location at '" + str(MANAGER_CONFIG_FILE) + "'",
+ help="Overrides default config location at '" + str(DEFAULT_MANAGER_CONFIG_FILE) + "'",
)
@click.option("--list-backends", "-l", type=bool, is_flag=True, default=False)
def main(config: Optional[str], list_backends: bool):
"""Knot Resolver Manager
[listen] ... numeric port or a path for a Unix domain socket, default is """ + str(
- MANAGER_CONFIG_FILE
+ DEFAULT_MANAGER_CONFIG_FILE
)
# print list of backends and exit (if specified)
sys.exit(0)
# where to look for config
- config_path = MANAGER_CONFIG_FILE if config is None else Path(config)
+ config_path = DEFAULT_MANAGER_CONFIG_FILE if config is None else Path(config)
compat.asyncio.run(start_server(config=config_path))
except BaseException as e:
logger.error("Task in thread failed...", exc_info=True)
exc = e
+ return None
res = await loop.run_in_executor(None, exc_catcher)
# propagate exception in this thread
# https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L8
# version 3.7 and higher, call directly
- if sys.version_info.major >= 3 and sys.version_info.minor >= 7 and False:
- return asyncio.run(coro, debug=debug)
-
+ # disabled due to incompatibilities
+ # if sys.version_info.major >= 3 and sys.version_info.minor >= 7 and False:
+ # return asyncio.run(coro, debug=debug)
+ # else:
# earlier versions, run with default executor
- else:
- # Explicitelly create a new loop to match behaviour of asyncio.run
- loop = asyncio.events.new_event_loop()
- asyncio.set_event_loop(loop)
- if debug is not None:
- loop.set_debug(debug)
- return loop.run_until_complete(coro)
- # asyncio.run would cancel all running tasks, but it would use internal API for that
- # so let's ignore it and let the tasks die
+ # Explicitelly create a new loop to match behaviour of asyncio.run
+ loop = asyncio.events.new_event_loop()
+ asyncio.set_event_loop(loop)
+ if debug is not None:
+ loop.set_debug(debug)
+ return loop.run_until_complete(coro)
+ # asyncio.run would cancel all running tasks, but it would use internal API for that
+ # so let's ignore it and let the tasks die
--- /dev/null
+import asyncio
+from asyncio import Lock
+from typing import Awaitable, Callable, List, Tuple
+
+from knot_resolver_manager.datamodel import KresConfig
+from knot_resolver_manager.exceptions import DataException, KresdManagerException
+from knot_resolver_manager.utils.functional import Result
+
+VerifyCallback = Callable[[KresConfig, KresConfig], Awaitable[Result[None, str]]]
+UpdateCallback = Callable[[KresConfig], Awaitable[None]]
+
+
+class ConfigStore:
+ def __init__(self, initial_config: KresConfig):
+ self._config = initial_config
+ self._verifiers: List[VerifyCallback] = []
+ self._callbacks: List[UpdateCallback] = []
+ self._update_lock: Lock = Lock()
+
+ async def update(self, config: KresConfig):
+ # invoke pre-change verifiers
+ results: Tuple[Result[None, str], ...] = await asyncio.gather(
+ *[ver(self._config, config) for ver in self._verifiers]
+ )
+ err_res = filter(lambda r: r.is_err(), results)
+ errs = list(map(lambda r: r.unwrap_err(), err_res))
+ if len(errs) > 0:
+ raise KresdManagerException("Validation of the new config failed. The reasons are:", *errs)
+
+ async with self._update_lock:
+ # update the stored config with the new version
+ self._config = config
+
+ # invoke change callbacks
+ for call in self._callbacks:
+ await call(config)
+
+ async def register_verifier(self, verifier: VerifyCallback):
+ self._verifiers.append(verifier)
+ res = await verifier(self.get(), self.get())
+ if res.is_err():
+ raise DataException(f"Initial config verification failed with error: {res.unwrap_err()}")
+
+ async def register_on_change_callback(self, callback: UpdateCallback):
+ """
+ Registers new callback and immediatelly calls it with current config
+ """
+
+ self._callbacks.append(callback)
+ await callback(self.get())
+
+ def get(self) -> KresConfig:
+ return self._config
import logging
from pathlib import Path
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kres_id import KresID
+
STARTUP_LOG_LEVEL = logging.DEBUG
+DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml")
+KRESD_EXECUTABLE = Path("/usr/sbin/kresd")
+GC_EXECUTABLE = Path("/usr/sbin/kres-cache-gc")
-CONFIGURATION_DIR = Path("etc/knot-resolver").absolute()
-CONFIGURATION_DIR.mkdir(exist_ok=True)
-RUNTIME_DIR = Path("etc/knot-resolver/runtime").absolute()
-RUNTIME_DIR.mkdir(exist_ok=True)
-KRES_CACHE_DIR = Path("etc/knot-resolver/cache").absolute()
-KRES_CACHE_DIR.mkdir(exist_ok=True)
+def kresd_cache_dir(config: KresConfig) -> Path:
+ return config.cache.storage.to_path()
-KRESD_EXECUTABLE = Path("/usr/sbin/kresd")
-GC_EXECUTABLE = Path("/usr/sbin/kres-cache-gc")
-# KRES_CACHE_DIR = Path("/var/lib/knot-resolver")
-KRESD_CONFIG_FILE = RUNTIME_DIR / "kresd.conf"
-KRESD_SUPERVISORD_ARGS = f"-c {str(KRESD_CONFIG_FILE.absolute())} -n -vvv"
-KRES_GC_SUPERVISORD_ARGS = f"-c {KRES_CACHE_DIR.absolute()} -d 1000"
+def kresd_config_file(_config: KresConfig, kres_id: KresID) -> Path:
+ return Path(f"kresd_{kres_id}.conf")
+
+
+def supervisord_config_file(_config: KresConfig) -> Path:
+ return Path("supervisord.conf")
+
-SUPERVISORD_CONFIG_FILE = RUNTIME_DIR / "supervisord.conf"
-SUPERVISORD_CONFIG_FILE_TMP = RUNTIME_DIR / "supervisord.conf.tmp"
-SUPERVISORD_PID_FILE = RUNTIME_DIR / "supervisord.pid"
-SUPERVISORD_SOCK = RUNTIME_DIR / "supervisord.sock"
-SUPERVISORD_LOGFILE = RUNTIME_DIR / "supervisord.log"
+def supervisord_config_file_tmp(_config: KresConfig) -> Path:
+ return Path("supervisord.conf.tmp")
-SUPERVISORD_SUBPROCESS_LOG_DIR = RUNTIME_DIR / "logs"
-SUPERVISORD_SUBPROCESS_LOG_DIR.mkdir(exist_ok=True)
-MANAGER_CONFIG_FILE = CONFIGURATION_DIR / "config.yml"
+def supervisord_log_file(_config: KresConfig) -> Path:
+ return Path("supervisord.log")
-LISTEN_SOCKET_PATH = RUNTIME_DIR / "manager.sock"
+def supervisord_pid_file(_config: KresConfig) -> Path:
+ return Path("supervisord.pid")
+
+
+def supervisord_sock_file(_config: KresConfig) -> Path:
+ return Path("supervisord.sock")
+
+
+def supervisord_subprocess_log_dir(_config: KresConfig) -> Path:
+ return Path("logs")
+
+
+WATCHDOG_INTERVAL: float = 5
"""
Used in KresdManager. It's a number of seconds in between system health checks.
"""
-WATCHDOG_INTERVAL: float = 5
class ManagementSchema(SchemaNode):
- listen: Listen = Listen({"unix-socket": "/tmp/manager.sock"})
+ # the default listen path here MUST use the default rundir
+ listen: Listen = Listen({"unix-socket": "./manager.sock"})
backend: BackendEnum = "auto"
rundir: AnyPath = AnyPath(".")
log_level: LogLevelEnum = "INFO"
res = alloc(_custom_name_id=True)
res.set_custom_str_representation(val)
return res
+
+
+def lookup_from_string(val: str) -> KresID:
+ for allocated_id in _used:
+ if str(allocated_id) == val:
+ return allocated_id
+
+ raise IndexError(f"ID with identifier '{val}' was not allocated")
import knot_resolver_manager.kresd_controller
from knot_resolver_manager import kres_id
from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.constants import KRESD_CONFIG_FILE, WATCHDOG_INTERVAL
-from knot_resolver_manager.exceptions import KresdManagerException
+from knot_resolver_manager.config_store import ConfigStore
+from knot_resolver_manager.constants import WATCHDOG_INTERVAL
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
SubprocessStatus,
SubprocessType,
)
-from knot_resolver_manager.utils.async_utils import writefile
-from knot_resolver_manager.utils.parsing import ParsedTree
+from knot_resolver_manager.utils.functional import Result
+from knot_resolver_manager.utils.types import NoneType
from .datamodel import KresConfig
"""
@staticmethod
- async def create(selected_controller: Optional[SubprocessController], config: KresConfig) -> "KresManager":
+ async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager":
"""
Creates new instance of KresManager.
"""
- inst = KresManager(config, _i_know_what_i_am_doing=True)
- await inst._async_init(selected_controller, config) # pylint: disable=protected-access
+ inst = KresManager(_i_know_what_i_am_doing=True)
+ await inst._async_init(selected_controller, config_store) # pylint: disable=protected-access
return inst
- async def _async_init(self, selected_controller: Optional[SubprocessController], config: KresConfig):
+ async def _async_init(self, selected_controller: Optional[SubprocessController], config_store: ConfigStore):
if selected_controller is None:
- self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation()
+ self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation(
+ config_store.get()
+ )
else:
self._controller = selected_controller
- await self._controller.initialize_controller()
+ await self._controller.initialize_controller(config_store.get())
self._watchdog_task = create_task(self._watchdog())
- await self.load_system_state()
- await self.apply_config(config)
+ await self._load_system_state()
+
+ await config_store.register_verifier(self.validate_config)
+ await config_store.register_on_change_callback(self.apply_config)
- def __init__(self, config: KresConfig, _i_know_what_i_am_doing: bool = False):
+ def __init__(self, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
logger.error(
"Trying to create an instance of KresManager using normal constructor. Please use "
self._gc: Optional[Subprocess] = None
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
- self._last_used_config_raw: Optional[ParsedTree]
- self._last_used_config: KresConfig = config
self._watchdog_task: Optional["Future[None]"] = None
- async def load_system_state(self):
+ async def _load_system_state(self):
async with self._manager_lock:
await self._collect_already_running_children()
- async def _spawn_new_worker(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc())
+ async def _spawn_new_worker(self, config: KresConfig):
+ subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD, kres_id.alloc())
await subprocess.start()
self._workers.append(subprocess)
else:
raise RuntimeError("unexpected subprocess type")
- async def _rolling_restart(self):
+ async def _rolling_restart(self, new_config: KresConfig):
for kresd in self._workers:
- await kresd.restart()
+ await kresd.apply_new_config(new_config)
await asyncio.sleep(1)
- async def _ensure_number_of_children(self, n: int):
+ async def _ensure_number_of_children(self, config: KresConfig, n: int):
# kill children that are not needed
while len(self._workers) > n:
await self._stop_a_worker()
# spawn new children if needed
while len(self._workers) < n:
- await self._spawn_new_worker()
+ await self._spawn_new_worker(config)
def _is_gc_running(self) -> bool:
return self._gc is not None
- async def _start_gc(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc())
+ async def _start_gc(self, config: KresConfig):
+ subprocess = await self._controller.create_subprocess(config, SubprocessType.GC, kres_id.alloc())
await subprocess.start()
self._gc = subprocess
await self._gc.stop()
self._gc = None
- async def _write_config(self, config: KresConfig):
- lua_config = config.render_lua()
- await writefile(KRESD_CONFIG_FILE, lua_config)
-
- async def apply_config(self, config: KresConfig):
+ async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
async with self._manager_lock:
- logger.debug("Writing new config to file...")
- await self._write_config(config)
-
logger.debug("Testing the new config with a canary process")
try:
- await self._spawn_new_worker()
+ # technically, this has side effects of leaving a new process runnning
+ # but it's practically not a problem, because
+ # if it keeps running, the config is valid and others will soon join as well
+ # if it crashes and the startup fails, then well, it's not running anymore... :)
+ await self._spawn_new_worker(new)
except SubprocessError:
logger.error("kresd with the new config failed to start, rejecting config")
- last = self.get_last_used_config()
- if last is not None:
- await self._write_config(last)
- raise KresdManagerException("Canary kresd instance failed. Config is invalid.")
+ return Result.err("Canary kresd instance failed to start. Config is invalid.")
- logger.debug("Canary process test passed, Applying new config to all workers")
- self._last_used_config = config
- await self._ensure_number_of_children(config.server.workers)
- await self._rolling_restart()
+ logger.debug("Canary process test passed.")
+ return Result.ok(None)
+
+ async def apply_config(self, config: KresConfig):
+ async with self._manager_lock:
+ logger.debug("Applying new config to all workers")
+ await self._ensure_number_of_children(config, config.server.workers)
+ await self._rolling_restart(config)
if self._is_gc_running() != config.server.use_cache_gc:
if config.server.use_cache_gc:
logger.debug("Starting cache GC")
- await self._start_gc()
+ await self._start_gc(config)
else:
logger.debug("Stopping cache GC")
await self._stop_gc()
async def stop(self):
async with self._manager_lock:
- await self._ensure_number_of_children(0)
+ await self._ensure_number_of_children(KresConfig(), 0)
await self._controller.shutdown_controller()
if self._watchdog_task is not None:
self._watchdog_task.cancel()
- def get_last_used_config(self) -> KresConfig:
- return self._last_used_config
-
async def _instability_handler(self) -> None:
logger.error(
"Instability callback invoked. Something is wrong, no idea how to react."
await asyncio.sleep(WATCHDOG_INTERVAL)
# gather current state
- units = {u.id: u for u in await self._controller.get_subprocess_info()}
+ detected_subprocesses = await self._controller.get_subprocess_status()
worker_ids = [x.id for x in self._workers]
invoke_callback = False
for w in worker_ids:
- if w not in units:
+ if w not in detected_subprocesses:
logger.error("Expected to find subprocess with id '%s' in the system, but did not.", w)
invoke_callback = True
continue
- if units[w].status is SubprocessStatus.FAILED:
+ if detected_subprocesses[w] is SubprocessStatus.FAILED:
logger.error("Subprocess '%s' is failed.", w)
invoke_callback = True
continue
- if units[w].status is SubprocessStatus.UNKNOWN:
+ if detected_subprocesses[w] is SubprocessStatus.UNKNOWN:
logger.warning("Subprocess '%s' is in unknown state!", w)
if invoke_callback:
import logging
from typing import List, Optional
+from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kresd_controller.interface import SubprocessController
logger = logging.getLogger(__name__)
Attempt to load systemd controllers.
"""
try:
- from knot_resolver_manager.kresd_controller.systemd import SystemdSubprocessController, SystemdPersistanceType
-
+ from knot_resolver_manager.kresd_controller.systemd import SystemdSubprocessController
from knot_resolver_manager.kresd_controller.systemd.dbus_api import SystemdType
_registered_controllers.extend(
[
SystemdSubprocessController(SystemdType.SYSTEM),
SystemdSubprocessController(SystemdType.SESSION),
- SystemdSubprocessController(SystemdType.SESSION, SystemdPersistanceType.TRANSIENT),
]
)
except ImportError:
logger.info("Failed to import modules related to systemd service manager")
-async def get_best_controller_implementation() -> SubprocessController:
+async def get_best_controller_implementation(config: KresConfig) -> SubprocessController:
logger.debug("Starting service manager auto-selection...")
if len(_registered_controllers) == 0:
raise LookupError("No service managers available!")
# check all controllers concurrently
- res = await asyncio.gather(*(cont.is_controller_available() for cont in _registered_controllers))
+ res = await asyncio.gather(*(cont.is_controller_available(config) for cont in _registered_controllers))
# take the first one on the list which is available
for avail, controller in zip(res, _registered_controllers):
return [str(controller) for controller in sorted(_registered_controllers, key=str)]
-async def get_controller_by_name(name: str) -> SubprocessController:
+async def get_controller_by_name(config: KresConfig, name: str) -> SubprocessController:
logger.debug("Subprocess controller selected manualy by the user, testing feasibility...")
controller: Optional[SubprocessController] = None
logger.error("Subprocess controller with name '%s' was not found", name)
raise LookupError(f"No subprocess controller named '{name}' found")
- if await controller.is_controller_available():
+ if await controller.is_controller_available(config):
logger.info("Selected controller '%s'", str(controller))
return controller
else:
-from dataclasses import dataclass
from enum import Enum, auto
-from typing import Iterable, List
+from typing import Dict, Iterable
+from knot_resolver_manager.constants import kresd_config_file
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kres_id import KresID
+from knot_resolver_manager.utils.async_utils import writefile
class SubprocessType(Enum):
One SubprocessInstance corresponds to one manager's subprocess
"""
- @property
- def type(self) -> SubprocessType:
- raise NotImplementedError()
-
- @property
- def id(self) -> str:
- raise NotImplementedError()
-
- async def is_running(self) -> bool:
- raise NotImplementedError()
+ def __init__(self, config: KresConfig) -> None:
+ self._config = config
async def start(self) -> None:
- raise NotImplementedError()
+ # create config file
+ lua_config = self._config.render_lua()
+ await writefile(kresd_config_file(self._config, self.id), lua_config)
+ try:
+ await self._start()
+ except SubprocessControllerException as e:
+ kresd_config_file(self._config, self.id).unlink()
+ raise e
+
+ async def apply_new_config(self, new_config: KresConfig) -> None:
+ self._config = new_config
+ # update config file
+ lua_config = new_config.render_lua()
+ await writefile(kresd_config_file(new_config, self.id), lua_config)
+ # update runtime status
+ await self._restart()
async def stop(self) -> None:
- raise NotImplementedError()
-
- async def restart(self) -> None:
- raise NotImplementedError()
+ await self._stop()
+ kresd_config_file(self._config, self.id).unlink()
def __eq__(self, o: object) -> bool:
return isinstance(o, type(self)) and o.type == self.type and o.id == self.id
def __hash__(self) -> int:
return hash(type(self)) ^ hash(self.type) ^ hash(self.id)
+ async def _start(self) -> None:
+ raise NotImplementedError()
+
+ async def _stop(self) -> None:
+ raise NotImplementedError()
+
+ async def _restart(self) -> None:
+ raise NotImplementedError()
+
+ @property
+ def type(self) -> SubprocessType:
+ raise NotImplementedError()
+
+ @property
+ def id(self) -> KresID:
+ raise NotImplementedError()
+
class SubprocessStatus(Enum):
RUNNING = auto()
UNKNOWN = auto()
-@dataclass
-class SubprocessInfo:
- id: str
- status: SubprocessStatus
-
-
class SubprocessController:
"""
The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
controllers.
"""
- async def is_controller_available(self) -> bool:
+ async def is_controller_available(self, config: KresConfig) -> bool:
+ """
+ Returns bool, whether the controller is available with the given config
+ """
raise NotImplementedError()
- async def get_all_running_instances(self) -> Iterable[Subprocess]:
+ async def initialize_controller(self, config: KresConfig) -> None:
+ """
+ Should be called when we want to really start using the controller with a specific configuration
+ """
raise NotImplementedError()
- async def initialize_controller(self) -> None:
+ async def get_all_running_instances(self) -> Iterable[Subprocess]:
"""
- Should be called when we want to really start using the controller.
+
+ Must NOT be called before initialize_controller()
"""
raise NotImplementedError()
Called when the manager is gracefully shutting down. Allows us to stop
the service manager process or simply cleanup, so that we don't reuse
the same resources in a new run.
+
+ Must NOT be called before initialize_controller()
"""
raise NotImplementedError()
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+ async def create_subprocess(
+ self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
+ ) -> Subprocess:
"""
Return a Subprocess object which can be operated on. The subprocess is not
started or in any way active after this call. That has to be performaed manually
"""
raise NotImplementedError()
- async def get_subprocess_info(self) -> List[SubprocessInfo]:
+ async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
"""
Get a status of running subprocesses as seen by the controller. This method actively polls
for information.
+
+ Must NOT be called before initialize_controller()
"""
raise NotImplementedError()
+import configparser
import logging
-from typing import Iterable, List, Set
+import os
+import signal
+from os import kill
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
+from xmlrpc.client import ServerProxy
+
+import supervisor.xmlrpc
+from jinja2 import Template
from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.kres_id import KresID, alloc_from_string
+from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.constants import (
+ GC_EXECUTABLE,
+ KRESD_EXECUTABLE,
+ kresd_cache_dir,
+ kresd_config_file,
+ supervisord_config_file,
+ supervisord_config_file_tmp,
+ supervisord_log_file,
+ supervisord_pid_file,
+ supervisord_sock_file,
+ supervisord_subprocess_log_dir,
+)
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kres_id import KresID, lookup_from_string
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
- SubprocessInfo,
+ SubprocessStatus,
SubprocessType,
)
-
-from .config import (
- SupervisordConfig,
- create_id,
- is_supervisord_available,
- is_supervisord_running,
- list_ids_from_existing_config,
- list_subprocesses,
- restart,
- start_supervisord,
- stop_supervisord,
- update_config,
+from knot_resolver_manager.utils.async_utils import (
+ call,
+ read_resource,
+ readfile,
+ wait_for_process_termination,
+ writefile,
)
logger = logging.getLogger(__name__)
+@dataclass
+class _Instance:
+ """
+ Data structure holding data for supervisord config template
+ """
+
+ logfile: str
+ id: str
+ workdir: str
+ command: str
+ environment: str
+
+
+def _get_command_based_on_type(config: KresConfig, i: "SupervisordSubprocess") -> str:
+ if i.type is SubprocessType.KRESD:
+ return f"{KRESD_EXECUTABLE} -c {kresd_config_file(config, i.id)} -n"
+ elif i.type is SubprocessType.GC:
+ return f"{GC_EXECUTABLE} -c {kresd_cache_dir(config)} -d 1000"
+ else:
+ raise NotImplementedError("This subprocess type is not supported")
+
+
+async def _write_config_file(config: KresConfig, instances: Set["SupervisordSubprocess"]):
+ @dataclass
+ class SupervisordConfig:
+ type: str
+ unix_http_server: str
+ pid_file: str
+ workdir: str
+ logfile: str
+
+ template = await read_resource(__package__, "supervisord.conf.j2")
+ assert template is not None
+ template = template.decode("utf8")
+ config_string = Template(template).render(
+ instances=[
+ _Instance(
+ type=i.type,
+ logfile=supervisord_subprocess_log_dir(config) / f"{i.id}.log",
+ id=str(i.id),
+ workdir=str(os.getcwd()),
+ command=_get_command_based_on_type(config, i),
+ environment=f"SYSTEMD_INSTANCE={i.id}",
+ )
+ for i in instances
+ ],
+ config={
+ SupervisordConfig(
+ unix_http_server=supervisord_sock_file(config),
+ pid_file=supervisord_pid_file(config),
+ workdir=str(config.server.management.rundir.to_path().absolute()),
+ log_file=supervisord_log_file(config),
+ )
+ },
+ )
+ await writefile(supervisord_config_file_tmp(config), config_string)
+ # atomically replace
+ os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))
+
+
+async def _start_supervisord(config: KresConfig):
+ await _write_config_file(config, set())
+ res = await call(f'supervisord --configuration="{supervisord_config_file(config).absolute()}"', shell=True)
+ assert res == 0
+
+
+async def _stop_supervisord(config: KresConfig):
+ pid = int(await readfile(supervisord_pid_file(config)))
+ kill(pid, signal.SIGINT)
+ await wait_for_process_termination(pid)
+
+
+async def _update_config(config: KresConfig, instances: Set["SupervisordSubprocess"]):
+ await _write_config_file(config, instances)
+ await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" update', shell=True)
+
+
+async def _restart(config: KresConfig, id_: KresID):
+ await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" restart {id_}', shell=True)
+
+
+async def _is_supervisord_available() -> bool:
+ i = await call("supervisorctl -h > /dev/null", shell=True, discard_output=True)
+ i += await call("supervisord -h > /dev/null", shell=True, discard_output=True)
+ return i == 0
+
+
+async def _get_supervisord_pid(config: KresConfig) -> Optional[int]:
+ if not Path(supervisord_pid_file(config)).exists():
+ return None
+
+ return int(await readfile(supervisord_pid_file(config)))
+
+
+def _is_process_runinng(pid: int) -> bool:
+ try:
+ # kill with signal 0 is a safe way to test that a process exists
+ kill(pid, 0)
+ return True
+ except ProcessLookupError:
+ return False
+
+
+async def _is_supervisord_running(config: KresConfig) -> bool:
+ pid = await _get_supervisord_pid(config)
+ if pid is None:
+ return False
+ elif not _is_process_runinng(pid):
+ supervisord_pid_file(config).unlink()
+ return False
+ else:
+ return True
+
+
+def _list_subprocesses(config: KresConfig) -> Dict[KresID, SubprocessStatus]:
+ proxy = ServerProxy(
+ "http://127.0.0.1",
+ transport=supervisor.xmlrpc.SupervisorTransport(
+ None, None, serverurl="unix://" + str(supervisord_sock_file(config))
+ ),
+ )
+ processes: Any = proxy.supervisor.getAllProcessInfo()
+
+ def convert(proc: Any) -> SubprocessStatus:
+ conversion_tbl = {
+ "FATAL": SubprocessStatus.FAILED,
+ "EXITED": SubprocessStatus.FAILED,
+ "RUNNING": SubprocessStatus.RUNNING,
+ }
+
+ if proc["statename"] in conversion_tbl:
+ status = conversion_tbl[proc["statename"]]
+ else:
+ status = SubprocessStatus.UNKNOWN
+ return status
+
+ return {lookup_from_string(pr["name"]): convert(pr) for pr in processes}
+
+
+async def _list_ids_from_existing_config(cfg: KresConfig) -> List[Tuple[SubprocessType, KresID]]:
+ config = await readfile(supervisord_config_file(cfg))
+ cp = configparser.ConfigParser()
+ cp.read_string(config)
+
+ res: List[Tuple[SubprocessType, KresID]] = []
+ for section in cp.sections():
+ if section.startswith("program:"):
+ program_id = section.replace("program:", "")
+ iid = lookup_from_string(program_id)
+ typ = SubprocessType[cp[section].get("type")]
+ res.append((typ, iid))
+ return res
+
+
class SupervisordSubprocess(Subprocess):
- def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType):
+ def __init__(
+ self, config: KresConfig, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType
+ ):
+ super().__init__(config)
self._controller: "SupervisordSubprocessController" = controller
self._id: KresID = id_
self._type: SubprocessType = type_
return self._type
@property
- def id(self) -> str:
- return create_id(self._type, self._id)
-
- async def is_running(self) -> bool:
- return self._controller.should_be_running(self)
+ def id(self) -> KresID:
+ return self._id
- async def start(self) -> None:
+ async def _start(self) -> None:
return await self._controller.start_subprocess(self)
- async def stop(self) -> None:
+ async def _stop(self) -> None:
return await self._controller.stop_subprocess(self)
- async def restart(self) -> None:
+ async def _restart(self) -> None:
return await self._controller.restart_subprocess(self)
+ def get_used_config(self) -> KresConfig:
+ return self._config
+
class SupervisordSubprocessController(SubprocessController):
def __init__(self):
self._running_instances: Set[SupervisordSubprocess] = set()
+ self._controller_config: Optional[KresConfig] = None
def __str__(self):
return "supervisord"
def should_be_running(self, subprocess: SupervisordSubprocess):
return subprocess in self._running_instances
- async def is_controller_available(self) -> bool:
- res = await is_supervisord_available()
+ async def is_controller_available(self, config: KresConfig) -> bool:
+ res = await _is_supervisord_available()
if not res:
logger.info("Failed to find usable supervisord.")
logger.debug("Detection - supervisord controller is available for use")
return res
- async def _update_config_with_real_state(self):
- running = await is_supervisord_running()
+ async def _update_config_with_real_state(self, config: KresConfig):
+ assert self._controller_config is not None
+
+ running = await _is_supervisord_running(config)
if running:
- ids = await list_ids_from_existing_config()
+ ids = await _list_ids_from_existing_config(config)
for tp, id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp))
+ self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_, tp))
async def get_all_running_instances(self) -> Iterable[Subprocess]:
- await self._update_config_with_real_state()
+ assert self._controller_config is not None
+
+ await self._update_config_with_real_state(self._controller_config)
return iter(self._running_instances)
- def _create_config(self) -> SupervisordConfig:
- return SupervisordConfig(instances=self._running_instances) # type: ignore
+ async def initialize_controller(self, config: KresConfig) -> None:
+ self._controller_config = config
- async def initialize_controller(self) -> None:
- if not await is_supervisord_running():
- config = self._create_config()
- await start_supervisord(config)
+ if not await _is_supervisord_running(config):
+ await _start_supervisord(config)
async def shutdown_controller(self) -> None:
- await stop_supervisord()
+ assert self._controller_config is not None
+ await _stop_supervisord(self._controller_config)
async def start_subprocess(self, subprocess: SupervisordSubprocess):
+ assert self._controller_config is not None
assert subprocess not in self._running_instances
self._running_instances.add(subprocess)
- await update_config(self._create_config())
+ await _update_config(self._controller_config, self._running_instances)
async def stop_subprocess(self, subprocess: SupervisordSubprocess):
+ assert self._controller_config is not None
assert subprocess in self._running_instances
self._running_instances.remove(subprocess)
- await update_config(self._create_config())
+ await _update_config(self._controller_config, self._running_instances)
async def restart_subprocess(self, subprocess: SupervisordSubprocess):
+ assert self._controller_config is not None
assert subprocess in self._running_instances
- await restart(subprocess.id)
+ await _restart(self._controller_config, subprocess.id)
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
- return SupervisordSubprocess(self, id_hint, subprocess_type)
+ async def create_subprocess(
+ self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
+ ) -> Subprocess:
+ return SupervisordSubprocess(subprocess_config, self, id_hint, subprocess_type)
- async def get_subprocess_info(self) -> List[SubprocessInfo]:
- return await to_thread(list_subprocesses)
+ async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
+ return await to_thread(_list_subprocesses)
+++ /dev/null
-import configparser
-import logging
-import os
-import signal
-from os import kill
-from pathlib import Path
-from typing import Any, List, Optional, Set, Tuple
-from xmlrpc.client import ServerProxy
-
-import supervisor.xmlrpc
-from jinja2 import Template
-
-from knot_resolver_manager.compat.dataclasses import dataclass
-from knot_resolver_manager.constants import (
- GC_EXECUTABLE,
- KRES_CACHE_DIR,
- KRES_GC_SUPERVISORD_ARGS,
- KRESD_EXECUTABLE,
- KRESD_SUPERVISORD_ARGS,
- SUPERVISORD_CONFIG_FILE,
- SUPERVISORD_CONFIG_FILE_TMP,
- SUPERVISORD_LOGFILE,
- SUPERVISORD_PID_FILE,
- SUPERVISORD_SOCK,
- SUPERVISORD_SUBPROCESS_LOG_DIR,
-)
-from knot_resolver_manager.kresd_controller.interface import (
- Subprocess,
- SubprocessInfo,
- SubprocessStatus,
- SubprocessType,
-)
-from knot_resolver_manager.utils.async_utils import (
- call,
- read_resource,
- readfile,
- wait_for_process_termination,
- writefile,
-)
-
-WATCHDOG_INTERVAL: int = 15
-
-logger = logging.getLogger(__name__)
-
-
-@dataclass
-class SupervisordConfig:
- instances: Set[Subprocess]
- unix_http_server: str = str(SUPERVISORD_SOCK.absolute())
- pid_file: str = str(SUPERVISORD_PID_FILE.absolute())
-
-
-async def _create_config_file(config: SupervisordConfig):
- template = await read_resource(__package__, "supervisord.conf.j2")
- assert template is not None
- template = template.decode("utf8")
- config_string = Template(template).render(
- config=config,
- gc_args=KRES_GC_SUPERVISORD_ARGS,
- kresd_args=KRESD_SUPERVISORD_ARGS,
- kresd_executable=KRESD_EXECUTABLE,
- gc_executable=GC_EXECUTABLE,
- cache_dir=KRES_CACHE_DIR,
- log_file=SUPERVISORD_LOGFILE,
- workdir=KRES_CACHE_DIR,
- log_dir=SUPERVISORD_SUBPROCESS_LOG_DIR,
- )
- await writefile(SUPERVISORD_CONFIG_FILE_TMP, config_string)
- # atomically replace
- os.rename(SUPERVISORD_CONFIG_FILE_TMP, SUPERVISORD_CONFIG_FILE)
-
-
-async def start_supervisord(config: SupervisordConfig):
- await _create_config_file(config)
- res = await call(f'supervisord --configuration="{SUPERVISORD_CONFIG_FILE.absolute()}"', shell=True)
- assert res == 0
-
-
-async def stop_supervisord():
- pid = int(await readfile(SUPERVISORD_PID_FILE))
- kill(pid, signal.SIGINT)
- await wait_for_process_termination(pid)
-
-
-async def update_config(config: SupervisordConfig):
- await _create_config_file(config)
- await call(f'supervisorctl -c "{SUPERVISORD_CONFIG_FILE.absolute()}" update', shell=True)
-
-
-async def restart(id_: str):
- await call(f'supervisorctl -c "{SUPERVISORD_CONFIG_FILE.absolute()}" restart {id_}', shell=True)
-
-
-async def is_supervisord_available() -> bool:
- i = await call("supervisorctl -h > /dev/null", shell=True, discard_output=True)
- i += await call("supervisord -h > /dev/null", shell=True, discard_output=True)
- return i == 0
-
-
-async def get_supervisord_pid() -> Optional[int]:
- if not Path(SUPERVISORD_PID_FILE).exists():
- return None
-
- return int(await readfile(SUPERVISORD_PID_FILE))
-
-
-def is_process_runinng(pid: int) -> bool:
- try:
- # kill with signal 0 is a safe way to test that a process exists
- kill(pid, 0)
- return True
- except ProcessLookupError:
- return False
-
-
-async def is_supervisord_running() -> bool:
- pid = await get_supervisord_pid()
- if pid is None:
- return False
- elif not is_process_runinng(pid):
- SUPERVISORD_PID_FILE.unlink()
- return False
- else:
- return True
-
-
-def list_subprocesses() -> List[SubprocessInfo]:
- proxy = ServerProxy(
- "http://127.0.0.1",
- transport=supervisor.xmlrpc.SupervisorTransport(None, None, serverurl="unix://" + str(SUPERVISORD_SOCK)),
- )
- processes: Any = proxy.supervisor.getAllProcessInfo()
-
- def convert(proc: Any) -> SubprocessInfo:
- conversion_tbl = {
- "FATAL": SubprocessStatus.FAILED,
- "EXITED": SubprocessStatus.FAILED,
- "RUNNING": SubprocessStatus.RUNNING,
- }
-
- if proc["statename"] in conversion_tbl:
- status = conversion_tbl[proc["statename"]]
- else:
- status = SubprocessStatus.UNKNOWN
-
- return SubprocessInfo(id=proc["name"], status=status)
-
- return [convert(pr) for pr in processes]
-
-
-def create_id(type_name: SubprocessType, id_: object) -> str:
- return f"{type_name.name}_{id_}"
-
-
-def parse_id(id_: str) -> Tuple[SubprocessType, str]:
- tp, id_ = id_.split("_", maxsplit=1)
- return (SubprocessType[tp], id_)
-
-
-async def list_ids_from_existing_config() -> List[Tuple[SubprocessType, str]]:
- config = await readfile(SUPERVISORD_CONFIG_FILE)
- cp = configparser.ConfigParser()
- cp.read_string(config)
-
- res: List[Tuple[SubprocessType, str]] = []
- for section in cp.sections():
- if section.startswith("program:"):
- program_id = section.replace("program:", "")
- res.append(parse_id(program_id))
- return res
[supervisord]
pidfile = {{ config.pid_file }}
-directory = {{ workdir }}
+directory = {{ config.workdir }}
nodaemon = false
-logfile = {{ log_file }}
+logfile = {{ config.log_file }}
logfile_maxbytes = 50MB
{# user=root #}
-{% for instance in config.instances %}
+{% for instance in instances %}
[program:{{ instance.id }}]
+type={{ instance.type }}
redirect_stderr=false
-stdout_logfile={{ log_dir / (instance.id + ".log") }}
-stderr_logfile={{ log_dir / (instance.id + ".log") }}
-
-{%- if instance.type.name == "KRESD" %}
-
-directory={{ workdir }}
-command={{ kresd_executable }} {{ kresd_args }}
-environment=SYSTEMD_INSTANCE={{ instance.id }}
-{%- elif instance.type.name == "GC" %}
-
-directory={{ workdir }}
-command={{ gc_executable}} {{ gc_args }}
-
-{%- else %}
-
-{# other subprocess types are not implemented, fail #}
-{{ 0 / 0 }}
-
-{% endif %}
+stdout_logfile={{ instance.logfile }}
+stderr_logfile={{ instance.logfile }}
+directory={{ instance.workdir }}
+command={{ instance.command }}
+environment={{ instance.environment }}
{%- endfor -%}
\ No newline at end of file
import logging
import os
-from enum import Enum, auto
-from typing import Iterable, List
+from typing import Dict, Iterable, List, Optional
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.kres_id import KresID, alloc_from_string
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kres_id import KresID, alloc_from_string, lookup_from_string
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
- SubprocessInfo,
SubprocessStatus,
SubprocessType,
)
+from knot_resolver_manager.utils import phantom_use
from knot_resolver_manager.utils.async_utils import call
from . import dbus_api as systemd
logger = logging.getLogger(__name__)
-class SystemdPersistanceType(Enum):
- PERSISTENT = auto()
- TRANSIENT = auto()
-
-
class SystemdSubprocess(Subprocess):
def __init__(
self,
- type_: SubprocessType,
+ config: KresConfig,
+ typ: SubprocessType,
id_: KresID,
systemd_type: systemd.SystemdType,
- persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
):
- self._type = type_
+ super().__init__(config)
+ self._type = typ
self._id: KresID = id_
self._systemd_type = systemd_type
- self._persistance_type = persistance_type
@property
- def id(self):
+ def id(self) -> KresID:
+ return self._id
+
+ @property
+ def systemd_id(self) -> str:
if self._type is SubprocessType.GC:
return "kres-cache-gc.service"
else:
- sep = {SystemdPersistanceType.PERSISTENT: "@", SystemdPersistanceType.TRANSIENT: "_"}[
- self._persistance_type
- ]
- return f"kresd{sep}{self._id}.service"
+ return f"kresd_{self._id}.service"
@staticmethod
- def id_could_be_ours(unit_name: str) -> bool:
+ def is_unit_name_ours(unit_name: str) -> bool:
is_ours = unit_name == "kres-cache-gc.service"
- is_ours |= unit_name.startswith("kresd") and unit_name.endswith(".service")
+ is_ours |= unit_name.startswith("kresd_") and unit_name.endswith(".service")
return is_ours
@property
async def is_running(self) -> bool:
raise NotImplementedError()
- async def _on_unexpected_termination(self):
- logger.warning("Detected unexpected termination of unit %s", self.id)
-
- async def start(self):
- if self._persistance_type is SystemdPersistanceType.PERSISTENT:
- await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id)
- elif self._persistance_type is SystemdPersistanceType.TRANSIENT:
- await compat.asyncio.to_thread(systemd.start_transient_unit, self._systemd_type, self.id, self._type)
+ async def _start(self):
+ await compat.asyncio.to_thread(
+ systemd.start_transient_kresd_unit, self._config, self._systemd_type, self.id, self._type
+ )
async def stop(self):
- await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.id)
+ await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.systemd_id)
- async def restart(self):
- await compat.asyncio.to_thread(systemd.restart_unit, self._systemd_type, self.id)
+ async def _restart(self):
+ await compat.asyncio.to_thread(systemd.restart_unit, self._systemd_type, self.systemd_id)
class SystemdSubprocessController(SubprocessController):
- def __init__(
- self,
- systemd_type: systemd.SystemdType,
- persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
- ):
+ def __init__(self, systemd_type: systemd.SystemdType):
self._systemd_type = systemd_type
- self._persistance_type = persistance_type
+ self._controller_config: Optional[KresConfig] = None
def __str__(self):
if self._systemd_type == systemd.SystemdType.SESSION:
- if self._persistance_type is SystemdPersistanceType.TRANSIENT:
- return "systemd-session-transient"
- else:
- return "systemd-session"
+ return "systemd-session"
elif self._systemd_type == systemd.SystemdType.SYSTEM:
return "systemd"
else:
raise NotImplementedError("unknown systemd type")
- async def is_controller_available(self) -> bool:
+ async def is_controller_available(self, config: KresConfig) -> bool:
+ # communication with systemd is not dependent on the config, its always the same
+ # so we should just make sure, that analysis tools do not complain
+ phantom_use(config)
+
# try to run systemctl (should be quite fast)
cmd = f"systemctl {'--user' if self._systemd_type == systemd.SystemdType.SESSION else ''} status"
ret = await call(cmd, shell=True, discard_output=True)
)
return False
+ # check that we run under root for non-session systemd
try:
- if self._persistance_type is SystemdPersistanceType.PERSISTENT and not await compat.asyncio.to_thread(
- systemd.can_load_unit, self._systemd_type, "kresd@1.service"
- ):
- logger.info("Systemd (%s) accessible, but no 'kresd@.service' unit detected.", self._systemd_type)
- return False
-
if self._systemd_type is systemd.SystemdType.SYSTEM and os.geteuid() != 0:
logger.info(
"Systemd (%s) looks functional, but we are not running as root. Assuming not enough privileges",
return False
async def get_all_running_instances(self) -> Iterable[Subprocess]:
+ assert self._controller_config is not None
+
res: List[SystemdSubprocess] = []
units = await compat.asyncio.to_thread(systemd.list_units, self._systemd_type)
for unit in units:
if unit.name.startswith("kresd") and unit.name.endswith(".service"):
iden = unit.name.replace("kresd", "")[1:].replace(".service", "")
- persistance_type = (
- SystemdPersistanceType.PERSISTENT if "@" in unit.name else SystemdPersistanceType.TRANSIENT
- )
if unit.state == "failed":
# if a unit is failed, remove it from the system by reseting its state
res.append(
SystemdSubprocess(
+ self._controller_config,
SubprocessType.KRESD,
alloc_from_string(iden),
self._systemd_type,
- persistance_type,
)
)
elif unit.name == "kres-cache-gc.service":
# we can't easily check, if the unit is transient or not without additional systemd call
# we ignore it for now and assume the default persistency state. It shouldn't cause any
# problems, because interactions with the process are done the same way in all cases
- res.append(SystemdSubprocess(SubprocessType.GC, alloc_from_string("gc"), self._systemd_type))
+ res.append(
+ SystemdSubprocess(
+ self._controller_config, SubprocessType.GC, alloc_from_string("gc"), self._systemd_type
+ )
+ )
return res
- async def initialize_controller(self) -> None:
- pass
+ async def initialize_controller(self, config: KresConfig) -> None:
+ self._controller_config = config
async def shutdown_controller(self) -> None:
pass
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
- return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type)
+ async def create_subprocess(
+ self, subprocess_config: KresConfig, subprocess_type: SubprocessType, id_hint: KresID
+ ) -> Subprocess:
+ assert self._controller_config is not None
+ return SystemdSubprocess(subprocess_config, subprocess_type, id_hint, self._systemd_type)
- async def get_subprocess_info(self) -> List[SubprocessInfo]:
- def convert(u: systemd.Unit) -> SubprocessInfo:
- status_lookup_table = {"failed": SubprocessStatus.FAILED, "running": SubprocessStatus.RUNNING}
+ async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
+ assert self._controller_config is not None
- if u.state in status_lookup_table:
- status = status_lookup_table[u.state]
+ def convert(val: str) -> SubprocessStatus:
+ status_lookup_table = {"failed": SubprocessStatus.FAILED, "running": SubprocessStatus.RUNNING}
+ if val in status_lookup_table:
+ return status_lookup_table[val]
else:
- status = SubprocessStatus.UNKNOWN
-
- return SubprocessInfo(id=u.name, status=status)
+ return SubprocessStatus.UNKNOWN
- return list(map(convert, await to_thread(systemd.list_units, self._systemd_type)))
+ data: List[systemd.Unit] = await to_thread(systemd.list_units, self._systemd_type)
+ our_data = filter(lambda u: SystemdSubprocess.is_unit_name_ours(u.name), data)
+ return {lookup_from_string(u.name): convert(u.state) for u in our_data}
# pyright: reportMissingTypeStubs=false
import logging
-from dataclasses import dataclass
+import os
from enum import Enum, auto
from threading import Thread
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pydbus.bus import SessionBus
from typing_extensions import Literal
-from knot_resolver_manager.constants import KRES_CACHE_DIR, KRESD_CONFIG_FILE, RUNTIME_DIR
+from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.constants import GC_EXECUTABLE, KRESD_EXECUTABLE, kresd_cache_dir, kresd_config_file
+from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
+from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import SubprocessType
logger = logging.getLogger(__name__)
job_path: Optional[str] = None
def _wait_for_job_completion_handler(loop: Any) -> Any:
- completed_jobs: Dict[str, str] = dict()
+ completed_jobs: Dict[str, str] = {}
def event_hander(_job_id: Any, path: Any, _unit: Any, state: Any):
nonlocal result_state
_wait_for_job_completion(systemd, job)
-def _kresd_unit_properties(unit_name: str) -> List[Tuple[str, str]]:
+def _kresd_unit_properties(config: KresConfig, kres_id: KresID) -> List[Tuple[str, str]]:
val: Any = [
("Description", GLib.Variant("s", "transient Knot Resolver unit started by Knot Resolver Manager")),
("Type", GLib.Variant("s", "notify")),
- ("WorkingDirectory", GLib.Variant("s", str(RUNTIME_DIR))),
+ ("WorkingDirectory", GLib.Variant("s", os.getcwd())),
(
"ExecStart",
GLib.Variant(
- "a(sasb)", [("/usr/bin/kresd", ["/usr/bin/kresd", "-c", str(KRESD_CONFIG_FILE), "-n"], False)]
+ "a(sasb)",
+ [
+ (
+ str(KRESD_EXECUTABLE),
+ [str(KRESD_EXECUTABLE), "-c", str(kresd_config_file(config, kres_id)), "-n"],
+ False,
+ )
+ ],
),
),
("TimeoutStopUSec", GLib.Variant("t", 10000000)),
("WatchdogUSec", GLib.Variant("t", 10000000)),
("Restart", GLib.Variant("s", "on-abnormal")),
("LimitNOFILE", GLib.Variant("t", 524288)),
- ("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={unit_name}"])),
+ ("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={kres_id}"])),
]
return val
-def _gc_unit_properties() -> Any:
+def _gc_unit_properties(config: KresConfig) -> Any:
val: Any = [
(
"Description",
GLib.Variant("s", "transient Knot Resolver Garbage Collector unit started by Knot Resolver Manager"),
),
("Type", GLib.Variant("s", "simple")),
- ("WorkingDirectory", GLib.Variant("s", str(RUNTIME_DIR))),
+ ("WorkingDirectory", GLib.Variant("s", str(config.server.management.rundir.to_path()))),
(
"ExecStart",
GLib.Variant(
"a(sasb)",
- [("/usr/bin/kres-cache-gc", ["/usr/bin/kres-cache-gc", "-c", str(KRES_CACHE_DIR), "-d", "1000"], True)],
+ [
+ (
+ str(GC_EXECUTABLE),
+ [str(GC_EXECUTABLE), "-c", str(kresd_cache_dir(config)), "-d", "1000"],
+ True,
+ )
+ ],
),
),
("Restart", GLib.Variant("s", "on-failure")),
return val
-def start_transient_unit(type_: SystemdType, unit_name: str, subprocess_type: SubprocessType):
- properties = {SubprocessType.KRESD: _kresd_unit_properties(unit_name), SubprocessType.GC: _gc_unit_properties()}[
- subprocess_type
- ]
+def start_transient_kresd_unit(
+ config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType
+):
+ name, properties = {
+ SubprocessType.KRESD: (f"kresd_{kres_id}.service", _kresd_unit_properties(config, kres_id)),
+ SubprocessType.GC: ("kres-cache-gc.service", _gc_unit_properties(config)),
+ }[subprocess_type]
systemd = _create_manager_proxy(type_)
def job():
- return systemd.StartTransientUnit(unit_name, "fail", properties, [])
+ return systemd.StartTransientUnit(name, "fail", properties, [])
- _wait_for_job_completion(systemd, job)
+ try:
+ _wait_for_job_completion(systemd, job)
+ except SubprocessControllerException as e:
+ raise SubprocessControllerException(f"Failed to start systemd transient service '{name}'") from e
def start_unit(type_: SystemdType, unit_name: str):
import asyncio
import logging
+import os
import sys
from http import HTTPStatus
from pathlib import Path
from aiohttp.web_response import json_response
from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
-from knot_resolver_manager.constants import MANAGER_CONFIG_FILE
+from knot_resolver_manager.config_store import ConfigStore
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.types import Listen, ListenType
from knot_resolver_manager.exceptions import DataException, KresdManagerException, TreeException
# This is top-level class containing pretty much everything. Instead of global
# variables, we use instance attributes. That's why there are so many and it's
# ok.
- def __init__(self, manager: KresManager):
+ def __init__(self, store: ConfigStore):
+ # config store & server dynamic reconfiguration
+ self.config_store = store
- self.manager = manager
+ # HTTP server
self.app = Application(middlewares=[error_handler])
self.runner = AppRunner(self.app)
-
self.listen: Optional[Listen] = None
self.site: Union[NoneType, TCPSite, UnixSite] = None
self.listen_lock = asyncio.Lock()
await self._reconfigure_listen_address(config)
async def start(self):
- config = self.manager.get_last_used_config()
- self.setup_routes()
+ self._setup_routes()
await self.runner.setup()
- await self._reconfigure(config)
+ await self.config_store.register_on_change_callback(self._reconfigure)
async def wait_for_shutdown(self):
await self.shutdown_event.wait()
# parse the incoming data
document_path = request.match_info["path"]
- last: ParsedTree = self.manager.get_last_used_config().get_unparsed_data()
+ last: ParsedTree = self.config_store.get().get_unparsed_data()
new_partial: ParsedTree = parse(await request.text(), request.content_type)
config = last.update(document_path, new_partial)
config_validated = KresConfig(config)
# apply config
- await self._reconfigure(config_validated)
- await self.manager.apply_config(config_validated)
+ await self.config_store.update(config_validated)
# return success
return web.Response()
logger.info("Shutdown event triggered...")
return web.Response(text="Shutting down...")
- def setup_routes(self):
+ def _setup_routes(self):
self.app.add_routes(
[
web.get("/", self._handler_index),
_DEFAULT_SENTINEL = _DefaultSentinel()
-async def _init_manager(config: Union[Path, ParsedTree, _DefaultSentinel]) -> KresManager:
+async def _init_config_store(config: Union[Path, ParsedTree, _DefaultSentinel]) -> ConfigStore:
+ # Initial configuration of the manager
+ if isinstance(config, _DefaultSentinel):
+ # use default
+ config = DEFAULT_MANAGER_CONFIG_FILE
+ if isinstance(config, Path):
+ if not config.exists():
+ logger.error(
+ "Manager is configured to load config file at %s on startup, but the file does not exist.",
+ config,
+ )
+ sys.exit(1)
+ else:
+ logger.info("Loading initial configuration from %s", config)
+ config = parse_yaml(await readfile(config))
+
+ # validate the initial configuration
+ assert isinstance(config, ParsedTree)
+ logger.info("Validating initial configuration...")
+ config_validated = KresConfig(config)
+
+ return ConfigStore(config_validated)
+
+
+async def _init_manager(config_store: ConfigStore) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
try:
- # Initial configuration of the manager
- if isinstance(config, _DefaultSentinel):
- # use default
- config = MANAGER_CONFIG_FILE
- if isinstance(config, Path):
- if not config.exists():
- logger.error(
- "Manager is configured to load config file at %s on startup, but the file does not exist.",
- config,
- )
- sys.exit(1)
- else:
- logger.info("Loading initial configuration from %s", config)
- config = parse_yaml(await readfile(config))
-
- # validate the initial configuration
- assert isinstance(config, ParsedTree)
- logger.info("Validating initial configuration...")
- config_validated = KresConfig(config)
-
# if configured, create a subprocess controller manually
controller: Optional[SubprocessController] = None
- if config_validated.server.management.backend != "auto":
- controller = await get_controller_by_name(config_validated.server.management.backend)
+ if config_store.get().server.management.backend != "auto":
+ controller = await get_controller_by_name(config_store.get(), config_store.get().server.management.backend)
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
- manager = await KresManager.create(controller, config_validated)
+ manager = await KresManager.create(controller, config_store)
logger.info("Initial configuration applied. Process manager initialized...")
return manager
sys.exit(1)
+def _set_working_directory(config: KresConfig):
+ os.chdir(config.server.management.rundir.to_path())
+
+
async def start_server(config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL):
start_time = time()
# before starting server, initialize the subprocess controller etc.
- manager = await _init_manager(config)
+ config_store = await _init_config_store(config)
+ _set_working_directory(config_store.get())
+ manager = await _init_manager(config_store)
- server = Server(manager)
+ server = Server(config_store)
await server.start()
# stop the server gracefully and cleanup everything
if isinstance(e, exceptions): # pyright: reportUnnecessaryIsInstance=false
return default
else:
- raise e
+ raise e # pyright: reportGeneralTypeIssues=false
return f
return ignore_exceptions_optional(type(default), default, *exceptions)
+def phantom_use(var: Any) -> None: # pylint: disable=unused-argument
+ """
+ Function, which consumes its argument doing absolutely nothing with it. Useful
+ for convincing pylint, that we need the variable even when its unused.
+ """
+
+
__all__ = [
"CustomValueType",
"SchemaNode",
"""
def readfile_sync(path: Union[str, PurePath]):
- with open(path, "r") as f:
+ with open(path, "r", encoding="utf8") as f:
return f.read()
return await to_thread(readfile_sync, path)
"""
def writefile_sync(path: Union[str, PurePath], content: str):
- with open(path, "w") as f:
+ with open(path, "w", encoding="utf8") as f:
return f.write(content)
await to_thread(writefile_sync, path, content)
-from typing import Callable, Iterable, TypeVar
+from enum import Enum, auto
+from typing import Any, Callable, Generic, Iterable, TypeVar
T = TypeVar("T")
def all_matches(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
return foldl(lambda x, y: x and y, True, map(cond, arr))
+
+
+Succ = TypeVar("Succ")
+Err = TypeVar("Err")
+
+
+class _Status(Enum):
+ OK = auto()
+ ERROR = auto()
+
+
+class Result(Generic[Succ, Err]):
+ @staticmethod
+ def ok(succ: Succ) -> "Result[Succ, Any]":
+ return Result(_Status.OK, succ=succ)
+
+ @staticmethod
+ def err(err: Err) -> "Result[Any, Err]":
+ return Result(_Status.ERROR, err=err)
+
+ def __init__(self, status: _Status, succ: Succ = None, err: Err = None) -> None:
+ super().__init__()
+ self._status: _Status = status
+ self._succ: Succ = succ
+ self._err: Err = err
+
+ def unwrap(self) -> Succ:
+ assert self._status is _Status.OK
+ return self._succ
+
+ def unwrap_err(self) -> Err:
+ assert self._status is _Status.ERROR
+ return self._err
+
+ def is_ok(self) -> bool:
+ return self._status is _Status.OK
+
+ def is_err(self) -> bool:
+ return self._status is _Status.ERROR
+# pylint: disable=comparison-with-callable
+
+
import enum
import inspect
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
./poe test
"""
-# pylint configuration copied from apkg (https://gitlab.nic.cz/packaging/apkg/-/blob/master/pylintrc)
[tool.pylint."MESSAGES CONTROL"]
disable= [
"broad-except",
"raising-bad-type", # handled by type checker
"too-many-arguments", # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
"no-member", # checked by pyright
+ "import-error", # checked by pyright (and pylint does not do it properly)
]
[tool.pylint.SIMILARITIES]