from pathlib import Path
from typing import TYPE_CHECKING, Optional
-from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.utils import which
from knot_resolver_manager.utils.functional import Result
if TYPE_CHECKING:
+ from knot_resolver_manager.config_store import ConfigStore
+ from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kresd_controller.interface import KresID
STARTUP_LOG_LEVEL = logging.DEBUG
MANAGER_FIX_ATTEMPT_MAX_COUNTER = 2
FIX_COUNTER_DECREASE_INTERVAL_SEC = 30 * 60
PID_FILE_NAME = "manager.pid"
+MAX_WORKERS = 256
def kresd_executable() -> Path:
return which.which("kres-cache-gc")
-def kresd_cache_dir(config: KresConfig) -> Path:
+def kresd_cache_dir(config: "KresConfig") -> Path:
return config.cache.storage.to_path()
-def kresd_config_file(_config: KresConfig, kres_id: "KresID") -> Path:
+def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path:
return Path(f"{kres_id}.conf")
-def supervisord_config_file(_config: KresConfig) -> Path:
+def supervisord_config_file(_config: "KresConfig") -> Path:
return Path("supervisord.conf")
-def supervisord_config_file_tmp(_config: KresConfig) -> Path:
+def supervisord_config_file_tmp(_config: "KresConfig") -> Path:
return Path("supervisord.conf.tmp")
-def supervisord_log_file(_config: KresConfig) -> Path:
+def supervisord_log_file(_config: "KresConfig") -> Path:
return Path("supervisord.log")
-def supervisord_pid_file(_config: KresConfig) -> Path:
+def supervisord_pid_file(_config: "KresConfig") -> Path:
return Path("supervisord.pid")
-def supervisord_sock_file(_config: KresConfig) -> Path:
+def supervisord_sock_file(_config: "KresConfig") -> Path:
return Path("supervisord.sock")
-def supervisord_subprocess_log_dir(_config: KresConfig) -> Path:
+def supervisord_subprocess_log_dir(_config: "KresConfig") -> Path:
return Path("logs")
Class for accessing constants, which are technically not constants as they are user configurable.
"""
- def __init__(self, config_store: ConfigStore) -> None:
+ def __init__(self, config_store: "ConfigStore") -> None:
self._config_store = config_store
@property
_user_constants: Optional[_UserConstants] = None
-async def _deny_id_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+async def _deny_id_changes(config_old: "KresConfig", config_new: "KresConfig") -> Result[None, str]:
if config_old.id != config_new.id:
return Result.err(
"/id: Based on the ID, the manager recognizes subprocesses,"
return Result.ok(None)
-async def init_user_constants(config_store: ConfigStore) -> None:
+async def init_user_constants(config_store: "ConfigStore") -> None:
global _user_constants
_user_constants = _UserConstants(config_store)
-import configparser
import logging
-import os
-import signal
from os import kill
from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Set, Union
+from typing import Any, Dict, Iterable, Optional, Union, cast
from xmlrpc.client import ServerProxy
import supervisor.xmlrpc # type: ignore[import]
-from jinja2 import Template
-
-from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.compat.dataclasses import dataclass
-from knot_resolver_manager.constants import (
- kres_gc_executable,
- kresd_cache_dir,
- kresd_config_file,
- kresd_executable,
- supervisord_config_file,
- supervisord_config_file_tmp,
- supervisord_log_file,
- supervisord_pid_file,
- supervisord_sock_file,
- supervisord_subprocess_log_dir,
-)
+
+from knot_resolver_manager.compat.asyncio import async_in_a_thread
+from knot_resolver_manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import (
SubprocessStatus,
SubprocessType,
)
-from knot_resolver_manager.utils.async_utils import (
- call,
- read_resource,
- readfile,
- wait_for_process_termination,
- writefile,
-)
+from knot_resolver_manager.kresd_controller.supervisord.config_file import SupervisordKresID, write_config_file
+from knot_resolver_manager.utils.async_utils import call, readfile
logger = logging.getLogger(__name__)
-class SupervisordKresID(KresID):
- @staticmethod
- def from_string(val: str) -> "SupervisordKresID":
- if val == "gc":
- return SupervisordKresID.new(SubprocessType.GC, -1)
- else:
- val = val.replace("kresd", "")
- return SupervisordKresID.new(SubprocessType.KRESD, int(val))
-
- def __str__(self) -> str:
- if self.subprocess_type is SubprocessType.GC:
- return "gc"
- elif self.subprocess_type is SubprocessType.KRESD:
- return f"kresd{self._id}"
- else:
- raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
-
-
-@dataclass
-class _Instance:
- """
- Data structure holding data for supervisord config template
- """
-
- type: str
- logfile: str
- id: str
- workdir: str
- command: str
- environment: str
-
-
-def _get_command_based_on_type(config: KresConfig, i: "SupervisordSubprocess") -> str:
- if i.type is SubprocessType.KRESD:
- return f"{kresd_executable()} -c {kresd_config_file(config, i.id)} -n"
- elif i.type is SubprocessType.GC:
- return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
- else:
- raise NotImplementedError("This subprocess type is not supported")
-
-
-async def _write_config_file(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
- @dataclass
- class SupervisordConfig:
- unix_http_server: str
- pid_file: str
- workdir: str
- logfile: str
-
- template = await read_resource(__package__, "supervisord.conf.j2")
- assert template is not None
- template = template.decode("utf8")
- cwd = str(os.getcwd())
- if not supervisord_subprocess_log_dir(config).exists():
- supervisord_subprocess_log_dir(config).mkdir(exist_ok=True)
- config_string = Template(template).render( # pyright: reportUnknownMemberType=false
- instances=[
- _Instance( # type: ignore[call-arg]
- type=i.type.name,
- logfile=supervisord_subprocess_log_dir(config) / f"{i.id}.log",
- id=str(i.id),
- workdir=cwd,
- command=_get_command_based_on_type(config, i),
- environment=f"SYSTEMD_INSTANCE={i.id}",
- )
- for i in instances
- ],
- config=SupervisordConfig( # type: ignore[call-arg]
- unix_http_server=supervisord_sock_file(config),
- pid_file=supervisord_pid_file(config),
- workdir=cwd,
- logfile=supervisord_log_file(config),
- ),
- )
- await writefile(supervisord_config_file_tmp(config), config_string)
- # atomically replace
- os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))
-
-
async def _start_supervisord(config: KresConfig) -> None:
- await _write_config_file(config, set())
+ await write_config_file(config)
res = await call(f'supervisord --configuration="{supervisord_config_file(config).absolute()}"', shell=True)
if res != 0:
raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
-async def _stop_supervisord(config: KresConfig) -> None:
- pid = int(await readfile(supervisord_pid_file(config)))
- kill(pid, signal.SIGTERM)
- await wait_for_process_termination(pid)
-
-
-async def _update_config(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
- await _write_config_file(config, instances)
- await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" update', shell=True)
+async def _reload_supervisord(config: KresConfig) -> None:
+ await write_config_file(config)
+ res = await call(f'supervisorctl --configuration="{supervisord_config_file(config).absolute()}" update', shell=True)
+ if res != 0:
+ raise SubprocessControllerException(f"Supervisord reload failed with exit code {res}")
-async def _restart(config: KresConfig, id_: KresID) -> None:
- await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" restart {id_}', shell=True)
+@async_in_a_thread
+def _stop_supervisord(config: KresConfig) -> None:
+ supervisord = _create_supervisord_proxy(config)
+ supervisord.shutdown()
+ supervisord_config_file(config).unlink()
async def _is_supervisord_available() -> bool:
return True
-def _list_subprocesses(config: KresConfig) -> Dict[KresID, SubprocessStatus]:
+def _create_supervisord_proxy(config: KresConfig) -> Any:
proxy = ServerProxy(
"http://127.0.0.1",
transport=supervisor.xmlrpc.SupervisorTransport(
None, None, serverurl="unix://" + str(supervisord_sock_file(config))
),
)
- processes: Any = proxy.supervisor.getAllProcessInfo()
+ return getattr(proxy, "supervisor")
+
+
+def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, SubprocessStatus]:
+ supervisord = _create_supervisord_proxy(config)
+ processes: Any = supervisord.getAllProcessInfo()
def convert(proc: Any) -> SubprocessStatus:
conversion_tbl = {
- "FATAL": SubprocessStatus.FAILED,
- "EXITED": SubprocessStatus.FAILED,
+ # "STOPPED": None, # filtered out elsewhere
+ "STARTING": SubprocessStatus.RUNNING,
"RUNNING": SubprocessStatus.RUNNING,
+ "BACKOFF": SubprocessStatus.RUNNING,
+ "STOPPING": SubprocessStatus.RUNNING,
+ "EXITED": SubprocessStatus.FAILED,
+ "FATAL": SubprocessStatus.FAILED,
+ "UNKNOWN": SubprocessStatus.UNKNOWN,
}
if proc["statename"] in conversion_tbl:
status = conversion_tbl[proc["statename"]]
else:
+ logger.warning(f"Unknown supervisord process state {proc['statename']}")
status = SubprocessStatus.UNKNOWN
return status
- return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes}
-
-
-async def _list_ids_from_existing_config(cfg: KresConfig) -> List[SupervisordKresID]:
- config = await readfile(supervisord_config_file(cfg))
- cp = configparser.ConfigParser()
- cp.read_string(config)
-
- res: List[SupervisordKresID] = []
- for section in cp.sections():
- if section.startswith("program:"):
- program_id = section.replace("program:", "")
- kid = SupervisordKresID.from_string(program_id)
- res.append(kid)
- return res
+ return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
class SupervisordSubprocess(Subprocess):
super().__init__(config, base_id)
self._controller: "SupervisordSubprocessController" = controller
- async def _start(self) -> None:
- return await self._controller.start_subprocess(self)
+ @async_in_a_thread
+ def _start(self) -> None:
+ supervisord = _create_supervisord_proxy(self._config)
+ supervisord.startProcess(str(self.id))
- async def _stop(self) -> None:
- return await self._controller.stop_subprocess(self)
+ @async_in_a_thread
+ def _stop(self) -> None:
+ supervisord = _create_supervisord_proxy(self._config)
+ supervisord.stopProcess(str(self.id))
- async def _restart(self) -> None:
- return await self._controller.restart_subprocess(self)
+ @async_in_a_thread
+ def _restart(self) -> None:
+ supervisord = _create_supervisord_proxy(self._config)
+ supervisord.stopProcess(str(self.id))
+ supervisord.startProcess(str(self.id))
def get_used_config(self) -> KresConfig:
return self._config
class SupervisordSubprocessController(SubprocessController):
def __init__(self):
- self._running_instances: Set[SupervisordSubprocess] = set()
self._controller_config: Optional[KresConfig] = None
def __str__(self):
return "supervisord"
- def should_be_running(self, subprocess: SupervisordSubprocess) -> bool:
- return subprocess in self._running_instances
-
async def is_controller_available(self, config: KresConfig) -> bool:
res = await _is_supervisord_available()
if not res:
logger.debug("Detection - supervisord controller is available for use")
return res
- async def _update_config_with_real_state(self, config: KresConfig) -> None:
- assert self._controller_config is not None
-
- running = await _is_supervisord_running(config)
- if running:
- ids = await _list_ids_from_existing_config(config)
- for id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_))
-
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
- await self._update_config_with_real_state(self._controller_config)
- return iter(self._running_instances)
+ if await _is_supervisord_running(self._controller_config):
+ states = _list_running_subprocesses(self._controller_config)
+ return [
+ SupervisordSubprocess(self._controller_config, self, id_)
+ for id_ in states
+ if states[id_] == SubprocessStatus.RUNNING
+ ]
+ else:
+ return []
async def initialize_controller(self, config: KresConfig) -> None:
self._controller_config = config
if not await _is_supervisord_running(config):
await _start_supervisord(config)
+ else:
+ await _reload_supervisord(config)
async def shutdown_controller(self) -> None:
assert self._controller_config is not None
await _stop_supervisord(self._controller_config)
- async def start_subprocess(self, subprocess: SupervisordSubprocess) -> None:
- assert self._controller_config is not None
- assert subprocess not in self._running_instances
- self._running_instances.add(subprocess)
- await _update_config(self._controller_config, self._running_instances)
-
- async def stop_subprocess(self, subprocess: SupervisordSubprocess) -> None:
- assert self._controller_config is not None
- assert subprocess in self._running_instances
- self._running_instances.remove(subprocess)
- await _update_config(self._controller_config, self._running_instances)
-
- async def restart_subprocess(self, subprocess: SupervisordSubprocess) -> None:
- assert self._controller_config is not None
- assert subprocess in self._running_instances
- await _restart(self._controller_config, subprocess.id)
-
async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
return SupervisordSubprocess(subprocess_config, self, subprocess_type)
- async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
- return await to_thread(_list_subprocesses, self._controller_config)
+ @async_in_a_thread
+ def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
+ assert self._controller_config is not None
+ return cast(Dict[KresID, SubprocessStatus], _list_running_subprocesses(self._controller_config))
--- /dev/null
+import os
+from typing import List
+
+from jinja2 import Template
+
+from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.constants import (
+ kres_gc_executable,
+ kresd_cache_dir,
+ kresd_config_file,
+ kresd_executable,
+ supervisord_config_file,
+ supervisord_config_file_tmp,
+ supervisord_log_file,
+ supervisord_pid_file,
+ supervisord_sock_file,
+ supervisord_subprocess_log_dir,
+)
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
+from knot_resolver_manager.utils.async_utils import read_resource, writefile
+
+
+class SupervisordKresID(KresID):
+ # WARNING: be really careful with renaming. If the naming schema is changing,
+ # we should be able to parse the old one as well, otherwise updating manager will
+ # cause weird behavior
+
+ @staticmethod
+ def from_string(val: str) -> "SupervisordKresID":
+ if val == "gc":
+ return SupervisordKresID.new(SubprocessType.GC, -1)
+ else:
+ val = val.replace("kresd", "")
+ return SupervisordKresID.new(SubprocessType.KRESD, int(val))
+
+ def __str__(self) -> str:
+ if self.subprocess_type is SubprocessType.GC:
+ return "gc"
+ elif self.subprocess_type is SubprocessType.KRESD:
+ return f"kresd{self._id}"
+ else:
+ raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+
+
+def _get_command_based_on_type(config: KresConfig, i: "SupervisordKresID") -> str:
+ if i.subprocess_type is SubprocessType.KRESD:
+ return f"{kresd_executable()} -c {kresd_config_file(config, i)} -n"
+ elif i.subprocess_type is SubprocessType.GC:
+ return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
+ else:
+ raise NotImplementedError("This subprocess type is not supported")
+
+
+@dataclass
+class _Instance:
+ """
+ Data structure holding data for supervisord config template
+ """
+
+ type: str
+ logfile: str
+ id: str
+ workdir: str
+ command: str
+ environment: str
+
+ @staticmethod
+ def create_list(config: KresConfig) -> List["_Instance"]:
+ cwd = str(os.getcwd())
+
+ instances = [
+ SupervisordKresID(SubprocessType.KRESD, i, _i_know_what_i_am_doing=True)
+ for i in range(1, int(config.max_workers) + 1)
+ ] + [SupervisordKresID(SubprocessType.GC, -1, _i_know_what_i_am_doing=True)]
+
+ return [
+ _Instance( # type: ignore[call-arg]
+ type=i.subprocess_type.name,
+ logfile=supervisord_subprocess_log_dir(config) / f"{i}.log",
+ id=str(i),
+ workdir=cwd,
+ command=_get_command_based_on_type(config, i),
+ environment=f"SYSTEMD_INSTANCE={i}",
+ )
+ for i in instances
+ ]
+
+
+@dataclass
+class SupervisordConfig:
+ unix_http_server: str
+ pid_file: str
+ workdir: str
+ logfile: str
+
+ @staticmethod
+ def create(config: KresConfig) -> "SupervisordConfig":
+ cwd = str(os.getcwd())
+ return SupervisordConfig( # type: ignore[call-arg]
+ unix_http_server=supervisord_sock_file(config),
+ pid_file=supervisord_pid_file(config),
+ workdir=cwd,
+ logfile=supervisord_log_file(config),
+ )
+
+
+async def write_config_file(config: KresConfig) -> None:
+ if not supervisord_subprocess_log_dir(config).exists():
+ supervisord_subprocess_log_dir(config).mkdir(exist_ok=True)
+
+ template = await read_resource(__package__, "supervisord.conf.j2")
+ assert template is not None
+ template = template.decode("utf8")
+ config_string = Template(template).render( # pyright: reportUnknownMemberType=false
+ instances=_Instance.create_list(config),
+ config=SupervisordConfig.create(config),
+ )
+ await writefile(supervisord_config_file_tmp(config), config_string)
+ # atomically replace (we don't technically need this right now, but better safe then sorry)
+ os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))