async def _async_init(self):
self._controller = await get_best_controller_implementation()
+ await self._controller.initialize_controller()
await self.load_system_state()
def __init__(self):
@staticmethod
async def get_all_running_instances() -> Iterable["BaseKresdController"]:
raise NotImplementedError()
+
+ @staticmethod
+ async def initialize_controller() -> None:
+ raise NotImplementedError()
--- /dev/null
+import logging
+from typing import Iterable
+
+from knot_resolver_manager.kresd_controller.base import BaseKresdController
+
+from .config import (
+ SupervisordConfig,
+ is_supervisord_available,
+ is_supervisord_running,
+ list_ids_from_existing_config,
+ restart,
+ start_supervisord,
+ update_config,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class SupervisordKresdController(BaseKresdController):
+ _config = SupervisordConfig()
+
+ async def is_running(self) -> bool:
+ return self.id in SupervisordKresdController._config.instances
+
+ async def start(self):
+ # note: O(n) test, but the number of instances will be very small
+ if self.id in SupervisordKresdController._config.instances:
+ raise RuntimeError("Can't start an instance with the same ID as already started instance")
+
+ SupervisordKresdController._config.instances.append(self.id)
+ await update_config(SupervisordKresdController._config)
+
+ async def stop(self):
+ # note: O(n) test, but the number of instances will be very small
+ if self.id not in SupervisordKresdController._config.instances:
+ raise RuntimeError("Can't stop an instance that is not started")
+
+ SupervisordKresdController._config.instances.remove(self.id)
+ await update_config(SupervisordKresdController._config)
+
+ async def restart(self):
+ # note: O(n) test, but the number of instances will be very small
+ if self.id not in SupervisordKresdController._config.instances:
+ raise RuntimeError("Can't restart an instance that is not started")
+
+ await restart(self.id)
+
+ @staticmethod
+ async def is_controller_available() -> bool:
+ return await is_supervisord_available()
+
+ @staticmethod
+ async def get_all_running_instances() -> Iterable["BaseKresdController"]:
+ running = await is_supervisord_running()
+ if running:
+ ids = await list_ids_from_existing_config()
+ return [SupervisordKresdController(id) for id in ids]
+ else:
+ return []
+
+ @staticmethod
+ async def initialize_controller() -> None:
+ if not await is_supervisord_running():
+ await start_supervisord(SupervisordKresdController._config)
--- /dev/null
+import os.path
+import signal
+from os import kill
+from typing import List
+
+from jinja2 import Template
+
+from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.utils.async_utils import call, readfile, wait_for_process_termination, writefile
+
+CONFIG_FILE = "/tmp/knot-resolver-manager-supervisord.conf"
+PID_FILE = "/tmp/knot-resolver-manager-supervisord.pid"
+SERVER_SOCK = "/tmp/knot-resolver-manager-supervisord.sock" # created pseudorandomly from ASCII codes for K.R.M:SU
+
+
+@dataclass
+class SupervisordConfig:
+ unix_http_server: str = SERVER_SOCK
+ instances: List[str] = []
+
+
+async def _create_config_file(config: SupervisordConfig):
+ path = os.path.realpath(__file__)
+ template = await readfile(path)
+ config_string = Template(template).render(config=config)
+ await writefile(CONFIG_FILE, config_string)
+
+
+async def start_supervisord(config: SupervisordConfig):
+ await _create_config_file(config)
+ await call(f'supervisord --configuration="{CONFIG_FILE}" --pidfile="{PID_FILE}"', shell=True)
+
+
+async def stop_supervisord():
+ pid = int(await readfile(PID_FILE))
+ kill(pid, signal.SIGINT)
+ await wait_for_process_termination(pid)
+
+
+async def update_config(config: SupervisordConfig):
+ await _create_config_file(config)
+ await call(f'supervisorctl -s "{SERVER_SOCK}" update')
+
+
+async def restart(id_: str):
+ await call(f'supervisorctl -s "{SERVER_SOCK}" restart {id_}')
+
+
+async def is_supervisord_available() -> bool:
+ i = await call("supervisorctl -h", shell=True)
+ i += await call("supervisord -h", shell=True)
+ return i == 0
+
+
+async def is_supervisord_running() -> bool:
+ pid = int(await readfile(PID_FILE))
+ try:
+ kill(pid, 0)
+ return True
+ except ProcessLookupError:
+ return False
+
+
+async def list_ids_from_existing_config() -> List[str]:
+ config = await readfile(CONFIG_FILE)
+ res: List[str] = []
+ for line in config.splitlines():
+ if line.startswith("[program:"):
+ id_ = line.replace("[program:", "").replace("]", "").strip()
+ res.append(id_)
+ return res
--- /dev/null
+[supervisord]
+unix_http_server = {{ config.unix_http_server }}
+
+{% for id in config.instances -%}
+
+[program:{{ id }}]
+directory=/var/lib/knot-resolver
+command=/usr/sbin/kresd -c /usr/lib/knot-resolver/distro-preconfig.lua -c /etc/knot-resolver/kresd.conf -n
+environment=SYSTEMD_INSTANCE={{ id }}
+
+{% endfor %}
\ No newline at end of file
iden = u.replace("kresd@", "").replace(".service", "")
res.append(SystemdKresdController(kresd_id=iden))
return res
+
+ @staticmethod
+ async def initialize_controller() -> None:
+ pass
--- /dev/null
+import os
+import time
+from asyncio import create_subprocess_exec, create_subprocess_shell
+from pathlib import PurePath
+from typing import List, Union
+
+from knot_resolver_manager.compat.asyncio import to_thread
+
+
+async def call(cmd: Union[str, bytes, List[str], List[bytes]], shell: bool = False) -> int:
+ """
+ custom async alternative to subprocess.call()
+ """
+ if shell:
+ if isinstance(cmd, list):
+ raise RuntimeError("can't use list of arguments with shell=True")
+ proc = await create_subprocess_shell(cmd)
+ else:
+ if not isinstance(cmd, list):
+ raise RuntimeError(
+ "Please use list of arguments, not a single string. It will prevent ambiguity when parsing"
+ )
+ proc = await create_subprocess_exec(*cmd)
+
+ return await proc.wait()
+
+
+async def readfile(path: Union[str, PurePath]) -> str:
+ """
+ asynchronously read whole file and return its content
+ """
+
+ def readfile_sync(path: Union[str, PurePath]):
+ with open(path, "r") as f:
+ return f.read()
+
+ return await to_thread(readfile_sync, path)
+
+
+async def writefile(path: Union[str, PurePath], content: str):
+ """
+ asynchronously set content of a file to a given string `content`.
+ """
+
+ def writefile_sync(path: Union[str, PurePath], content: str):
+ with open(path, "w") as f:
+ return f.write(content)
+
+ await to_thread(writefile_sync, path, content)
+
+
+async def wait_for_process_termination(pid: int, sleep_sec: float = 0):
+ """
+ will wait for any process (does not have to be a child process) given by its PID to terminate
+
+ sleep_sec configures the granularity, with which we should return
+ """
+
+ def wait_sync(pid: int, sleep_sec: float):
+ while True:
+ try:
+ os.kill(pid, 0)
+ if sleep_sec == 0:
+ os.sched_yield()
+ else:
+ time.sleep(sleep_sec)
+ except ProcessLookupError:
+ break
+
+ await to_thread(wait_sync, pid, sleep_sec)