]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
changed supervisord controller so that we don't need to be rewriting config with...
authorVasek Sraier <git@vakabus.cz>
Sat, 26 Mar 2022 21:56:53 +0000 (22:56 +0100)
committerVaclav Sraier <vaclav.sraier@nic.cz>
Fri, 24 Jun 2022 13:22:07 +0000 (13:22 +0000)
manager/knot_resolver_manager/compat/asyncio.py
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/config_schema.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py [new file with mode: 0644]
manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2
manager/pyproject.toml

index bae4b8df7f593be6b6af641a17f066b618bbc9d2..f9615c3e2e4629a0ef7bb46c203243928f5bec2d 100644 (file)
@@ -36,6 +36,13 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
         return res
 
 
+def async_in_a_thread(func: Callable[..., T]) -> Callable[..., Coroutine[None, None, T]]:
+    async def wrapper(*args: Any, **kwargs: Any) -> T:
+        return await to_thread(func, *args, **kwargs)
+
+    return wrapper
+
+
 def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task[T]":
     # version 3.8 and higher, call directly
     if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
index 4ba851799634a3fb0653ccd9ea5ab485884c1f8e..7d181ef7649bea8a587fbafbe6cd072629cdb52a 100644 (file)
@@ -2,12 +2,12 @@ import logging
 from pathlib import Path
 from typing import TYPE_CHECKING, Optional
 
-from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.utils import which
 from knot_resolver_manager.utils.functional import Result
 
 if TYPE_CHECKING:
+    from knot_resolver_manager.config_store import ConfigStore
+    from knot_resolver_manager.datamodel.config_schema import KresConfig
     from knot_resolver_manager.kresd_controller.interface import KresID
 
 STARTUP_LOG_LEVEL = logging.DEBUG
@@ -15,6 +15,7 @@ DEFAULT_MANAGER_CONFIG_FILE = Path("/etc/knot-resolver/config.yml")
 MANAGER_FIX_ATTEMPT_MAX_COUNTER = 2
 FIX_COUNTER_DECREASE_INTERVAL_SEC = 30 * 60
 PID_FILE_NAME = "manager.pid"
+MAX_WORKERS = 256
 
 
 def kresd_executable() -> Path:
@@ -25,35 +26,35 @@ def kres_gc_executable() -> Path:
     return which.which("kres-cache-gc")
 
 
-def kresd_cache_dir(config: KresConfig) -> Path:
+def kresd_cache_dir(config: "KresConfig") -> Path:
     return config.cache.storage.to_path()
 
 
-def kresd_config_file(_config: KresConfig, kres_id: "KresID") -> Path:
+def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path:
     return Path(f"{kres_id}.conf")
 
 
-def supervisord_config_file(_config: KresConfig) -> Path:
+def supervisord_config_file(_config: "KresConfig") -> Path:
     return Path("supervisord.conf")
 
 
-def supervisord_config_file_tmp(_config: KresConfig) -> Path:
+def supervisord_config_file_tmp(_config: "KresConfig") -> Path:
     return Path("supervisord.conf.tmp")
 
 
-def supervisord_log_file(_config: KresConfig) -> Path:
+def supervisord_log_file(_config: "KresConfig") -> Path:
     return Path("supervisord.log")
 
 
-def supervisord_pid_file(_config: KresConfig) -> Path:
+def supervisord_pid_file(_config: "KresConfig") -> Path:
     return Path("supervisord.pid")
 
 
-def supervisord_sock_file(_config: KresConfig) -> Path:
+def supervisord_sock_file(_config: "KresConfig") -> Path:
     return Path("supervisord.sock")
 
 
-def supervisord_subprocess_log_dir(_config: KresConfig) -> Path:
+def supervisord_subprocess_log_dir(_config: "KresConfig") -> Path:
     return Path("logs")
 
 
@@ -68,7 +69,7 @@ class _UserConstants:
     Class for accessing constants, which are technically not constants as they are user configurable.
     """
 
-    def __init__(self, config_store: ConfigStore) -> None:
+    def __init__(self, config_store: "ConfigStore") -> None:
         self._config_store = config_store
 
     @property
@@ -79,7 +80,7 @@ class _UserConstants:
 _user_constants: Optional[_UserConstants] = None
 
 
-async def _deny_id_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+async def _deny_id_changes(config_old: "KresConfig", config_new: "KresConfig") -> Result[None, str]:
     if config_old.id != config_new.id:
         return Result.err(
             "/id: Based on the ID, the manager recognizes subprocesses,"
@@ -88,7 +89,7 @@ async def _deny_id_changes(config_old: KresConfig, config_new: KresConfig) -> Re
     return Result.ok(None)
 
 
-async def init_user_constants(config_store: ConfigStore) -> None:
+async def init_user_constants(config_store: "ConfigStore") -> None:
     global _user_constants
     _user_constants = _UserConstants(config_store)
 
index 569524b5002eefb2bdb398f81cd002898bce3b3e..b9adcf23670ff25abb3bd86c36012381b8a38dc7 100644 (file)
@@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional, Union
 from jinja2 import Environment, FileSystemLoader, Template
 from typing_extensions import Literal
 
+from knot_resolver_manager.constants import MAX_WORKERS
 from knot_resolver_manager.datamodel.cache_schema import CacheSchema
 from knot_resolver_manager.datamodel.dns64_schema import Dns64Schema
 from knot_resolver_manager.datamodel.dnssec_schema import DnssecSchema
@@ -90,6 +91,7 @@ class KresConfig(SchemaNode):
         hostname: Internal DNS resolver hostname. Default is machine hostname.
         rundir: Directory where the resolver can create files and which will be it's cwd.
         workers: The number of running kresd (Knot Resolver daemon) workers. If set to 'auto', it is equal to number of CPUs available.
+        max_workers: The maximum number of workers allowed. Cannot be changed in runtime.
         management: Configuration of management HTTP API.
         webmgmt: Configuration of legacy web management endpoint.
         supervisor: Proceses supervisor configuration.
@@ -141,6 +143,7 @@ class KresConfig(SchemaNode):
     hostname: str
     rundir: UncheckedPath
     workers: IntPositive
+    max_workers: IntPositive = IntPositive(MAX_WORKERS)
     management: ManagementSchema
     webmgmt: Optional[WebmgmtSchema]
     supervisor: SupervisorSchema
index 040a6033f324da6c2ad335f98f740b460abceb0f..1c6f91f63ed9552ba5b0ef334078cdbc71db3c94 100644 (file)
@@ -53,6 +53,13 @@ class _FixCounter:
         return self._counter >= MANAGER_FIX_ATTEMPT_MAX_COUNTER
 
 
+async def _deny_max_worker_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+    if config_old.max_workers != config_new.max_workers:
+        return Result.err("Changing manager's `rundir` during runtime is not allowed.")
+
+    return Result.ok(None)
+
+
 class KresManager:  # pylint: disable=too-many-instance-attributes
     """
     Core of the whole operation. Orchestrates individual instances under some
@@ -108,6 +115,9 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         await config_store.register_verifier(self.validate_config)
         await config_store.register_on_change_callback(self.apply_config)
 
+        # register controller config change listeners
+        await config_store.register_verifier(_deny_max_worker_changes)
+
     async def _spawn_new_worker(self, config: KresConfig) -> None:
         subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD)
         await subprocess.start()
@@ -182,8 +192,8 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         try:
             async with self._manager_lock:
                 logger.debug("Applying config to all workers")
-                await self._ensure_number_of_children(config, int(config.workers))
                 await self._rolling_restart(config)
+                await self._ensure_number_of_children(config, int(config.workers))
 
                 if self._is_gc_running() != config.cache.garbage_collector:
                     if config.cache.garbage_collector:
index c42169bc32f17102c27af931615c926bf6a3b6c3..24967673e5c45aa7a9d1cd2dd565d1c5104b8e2c 100644 (file)
@@ -29,7 +29,7 @@ def try_supervisord():
 
         _registered_controllers.append(SupervisordSubprocessController())
     except ImportError:
-        logger.info("Failed to import modules related to supervisord service manager")
+        logger.error("Failed to import modules related to supervisord service manager", exc_info=True)
 
 
 def try_systemd():
index 8844bedcb89baa2339d003e5bbcb8685accdb404..0f84ee5a9cc3573bae8965bb041c394c62b1318a 100644 (file)
@@ -1,29 +1,13 @@
-import configparser
 import logging
-import os
-import signal
 from os import kill
 from pathlib import Path
-from typing import Any, Dict, Iterable, List, Optional, Set, Union
+from typing import Any, Dict, Iterable, Optional, Union, cast
 from xmlrpc.client import ServerProxy
 
 import supervisor.xmlrpc  # type: ignore[import]
-from jinja2 import Template
-
-from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.compat.dataclasses import dataclass
-from knot_resolver_manager.constants import (
-    kres_gc_executable,
-    kresd_cache_dir,
-    kresd_config_file,
-    kresd_executable,
-    supervisord_config_file,
-    supervisord_config_file_tmp,
-    supervisord_log_file,
-    supervisord_pid_file,
-    supervisord_sock_file,
-    supervisord_subprocess_log_dir,
-)
+
+from knot_resolver_manager.compat.asyncio import async_in_a_thread
+from knot_resolver_manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import (
@@ -33,116 +17,31 @@ from knot_resolver_manager.kresd_controller.interface import (
     SubprocessStatus,
     SubprocessType,
 )
-from knot_resolver_manager.utils.async_utils import (
-    call,
-    read_resource,
-    readfile,
-    wait_for_process_termination,
-    writefile,
-)
+from knot_resolver_manager.kresd_controller.supervisord.config_file import SupervisordKresID, write_config_file
+from knot_resolver_manager.utils.async_utils import call, readfile
 
 logger = logging.getLogger(__name__)
 
 
-class SupervisordKresID(KresID):
-    @staticmethod
-    def from_string(val: str) -> "SupervisordKresID":
-        if val == "gc":
-            return SupervisordKresID.new(SubprocessType.GC, -1)
-        else:
-            val = val.replace("kresd", "")
-            return SupervisordKresID.new(SubprocessType.KRESD, int(val))
-
-    def __str__(self) -> str:
-        if self.subprocess_type is SubprocessType.GC:
-            return "gc"
-        elif self.subprocess_type is SubprocessType.KRESD:
-            return f"kresd{self._id}"
-        else:
-            raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
-
-
-@dataclass
-class _Instance:
-    """
-    Data structure holding data for supervisord config template
-    """
-
-    type: str
-    logfile: str
-    id: str
-    workdir: str
-    command: str
-    environment: str
-
-
-def _get_command_based_on_type(config: KresConfig, i: "SupervisordSubprocess") -> str:
-    if i.type is SubprocessType.KRESD:
-        return f"{kresd_executable()} -c {kresd_config_file(config, i.id)} -n"
-    elif i.type is SubprocessType.GC:
-        return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
-    else:
-        raise NotImplementedError("This subprocess type is not supported")
-
-
-async def _write_config_file(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
-    @dataclass
-    class SupervisordConfig:
-        unix_http_server: str
-        pid_file: str
-        workdir: str
-        logfile: str
-
-    template = await read_resource(__package__, "supervisord.conf.j2")
-    assert template is not None
-    template = template.decode("utf8")
-    cwd = str(os.getcwd())
-    if not supervisord_subprocess_log_dir(config).exists():
-        supervisord_subprocess_log_dir(config).mkdir(exist_ok=True)
-    config_string = Template(template).render(  # pyright: reportUnknownMemberType=false
-        instances=[
-            _Instance(  # type: ignore[call-arg]
-                type=i.type.name,
-                logfile=supervisord_subprocess_log_dir(config) / f"{i.id}.log",
-                id=str(i.id),
-                workdir=cwd,
-                command=_get_command_based_on_type(config, i),
-                environment=f"SYSTEMD_INSTANCE={i.id}",
-            )
-            for i in instances
-        ],
-        config=SupervisordConfig(  # type: ignore[call-arg]
-            unix_http_server=supervisord_sock_file(config),
-            pid_file=supervisord_pid_file(config),
-            workdir=cwd,
-            logfile=supervisord_log_file(config),
-        ),
-    )
-    await writefile(supervisord_config_file_tmp(config), config_string)
-    # atomically replace
-    os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))
-
-
 async def _start_supervisord(config: KresConfig) -> None:
-    await _write_config_file(config, set())
+    await write_config_file(config)
     res = await call(f'supervisord --configuration="{supervisord_config_file(config).absolute()}"', shell=True)
     if res != 0:
         raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
 
 
-async def _stop_supervisord(config: KresConfig) -> None:
-    pid = int(await readfile(supervisord_pid_file(config)))
-    kill(pid, signal.SIGTERM)
-    await wait_for_process_termination(pid)
-
-
-async def _update_config(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
-    await _write_config_file(config, instances)
-    await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" update', shell=True)
+async def _reload_supervisord(config: KresConfig) -> None:
+    await write_config_file(config)
+    res = await call(f'supervisorctl --configuration="{supervisord_config_file(config).absolute()}" update', shell=True)
+    if res != 0:
+        raise SubprocessControllerException(f"Supervisord reload failed with exit code {res}")
 
 
-async def _restart(config: KresConfig, id_: KresID) -> None:
-    await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" restart {id_}', shell=True)
+@async_in_a_thread
+def _stop_supervisord(config: KresConfig) -> None:
+    supervisord = _create_supervisord_proxy(config)
+    supervisord.shutdown()
+    supervisord_config_file(config).unlink()
 
 
 async def _is_supervisord_available() -> bool:
@@ -178,43 +77,40 @@ async def _is_supervisord_running(config: KresConfig) -> bool:
         return True
 
 
-def _list_subprocesses(config: KresConfig) -> Dict[KresID, SubprocessStatus]:
+def _create_supervisord_proxy(config: KresConfig) -> Any:
     proxy = ServerProxy(
         "http://127.0.0.1",
         transport=supervisor.xmlrpc.SupervisorTransport(
             None, None, serverurl="unix://" + str(supervisord_sock_file(config))
         ),
     )
-    processes: Any = proxy.supervisor.getAllProcessInfo()
+    return getattr(proxy, "supervisor")
+
+
+def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, SubprocessStatus]:
+    supervisord = _create_supervisord_proxy(config)
+    processes: Any = supervisord.getAllProcessInfo()
 
     def convert(proc: Any) -> SubprocessStatus:
         conversion_tbl = {
-            "FATAL": SubprocessStatus.FAILED,
-            "EXITED": SubprocessStatus.FAILED,
+            # "STOPPED": None,  # filtered out elsewhere
+            "STARTING": SubprocessStatus.RUNNING,
             "RUNNING": SubprocessStatus.RUNNING,
+            "BACKOFF": SubprocessStatus.RUNNING,
+            "STOPPING": SubprocessStatus.RUNNING,
+            "EXITED": SubprocessStatus.FAILED,
+            "FATAL": SubprocessStatus.FAILED,
+            "UNKNOWN": SubprocessStatus.UNKNOWN,
         }
 
         if proc["statename"] in conversion_tbl:
             status = conversion_tbl[proc["statename"]]
         else:
+            logger.warning(f"Unknown supervisord process state {proc['statename']}")
             status = SubprocessStatus.UNKNOWN
         return status
 
-    return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes}
-
-
-async def _list_ids_from_existing_config(cfg: KresConfig) -> List[SupervisordKresID]:
-    config = await readfile(supervisord_config_file(cfg))
-    cp = configparser.ConfigParser()
-    cp.read_string(config)
-
-    res: List[SupervisordKresID] = []
-    for section in cp.sections():
-        if section.startswith("program:"):
-            program_id = section.replace("program:", "")
-            kid = SupervisordKresID.from_string(program_id)
-            res.append(kid)
-    return res
+    return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
 
 
 class SupervisordSubprocess(Subprocess):
@@ -230,14 +126,21 @@ class SupervisordSubprocess(Subprocess):
             super().__init__(config, base_id)
         self._controller: "SupervisordSubprocessController" = controller
 
-    async def _start(self) -> None:
-        return await self._controller.start_subprocess(self)
+    @async_in_a_thread
+    def _start(self) -> None:
+        supervisord = _create_supervisord_proxy(self._config)
+        supervisord.startProcess(str(self.id))
 
-    async def _stop(self) -> None:
-        return await self._controller.stop_subprocess(self)
+    @async_in_a_thread
+    def _stop(self) -> None:
+        supervisord = _create_supervisord_proxy(self._config)
+        supervisord.stopProcess(str(self.id))
 
-    async def _restart(self) -> None:
-        return await self._controller.restart_subprocess(self)
+    @async_in_a_thread
+    def _restart(self) -> None:
+        supervisord = _create_supervisord_proxy(self._config)
+        supervisord.stopProcess(str(self.id))
+        supervisord.startProcess(str(self.id))
 
     def get_used_config(self) -> KresConfig:
         return self._config
@@ -245,15 +148,11 @@ class SupervisordSubprocess(Subprocess):
 
 class SupervisordSubprocessController(SubprocessController):
     def __init__(self):
-        self._running_instances: Set[SupervisordSubprocess] = set()
         self._controller_config: Optional[KresConfig] = None
 
     def __str__(self):
         return "supervisord"
 
-    def should_be_running(self, subprocess: SupervisordSubprocess) -> bool:
-        return subprocess in self._running_instances
-
     async def is_controller_available(self, config: KresConfig) -> bool:
         res = await _is_supervisord_available()
         if not res:
@@ -262,50 +161,35 @@ class SupervisordSubprocessController(SubprocessController):
         logger.debug("Detection - supervisord controller is available for use")
         return res
 
-    async def _update_config_with_real_state(self, config: KresConfig) -> None:
-        assert self._controller_config is not None
-
-        running = await _is_supervisord_running(config)
-        if running:
-            ids = await _list_ids_from_existing_config(config)
-            for id_ in ids:
-                self._running_instances.add(SupervisordSubprocess(self._controller_config, self, id_))
-
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         assert self._controller_config is not None
 
-        await self._update_config_with_real_state(self._controller_config)
-        return iter(self._running_instances)
+        if await _is_supervisord_running(self._controller_config):
+            states = _list_running_subprocesses(self._controller_config)
+            return [
+                SupervisordSubprocess(self._controller_config, self, id_)
+                for id_ in states
+                if states[id_] == SubprocessStatus.RUNNING
+            ]
+        else:
+            return []
 
     async def initialize_controller(self, config: KresConfig) -> None:
         self._controller_config = config
 
         if not await _is_supervisord_running(config):
             await _start_supervisord(config)
+        else:
+            await _reload_supervisord(config)
 
     async def shutdown_controller(self) -> None:
         assert self._controller_config is not None
         await _stop_supervisord(self._controller_config)
 
-    async def start_subprocess(self, subprocess: SupervisordSubprocess) -> None:
-        assert self._controller_config is not None
-        assert subprocess not in self._running_instances
-        self._running_instances.add(subprocess)
-        await _update_config(self._controller_config, self._running_instances)
-
-    async def stop_subprocess(self, subprocess: SupervisordSubprocess) -> None:
-        assert self._controller_config is not None
-        assert subprocess in self._running_instances
-        self._running_instances.remove(subprocess)
-        await _update_config(self._controller_config, self._running_instances)
-
-    async def restart_subprocess(self, subprocess: SupervisordSubprocess) -> None:
-        assert self._controller_config is not None
-        assert subprocess in self._running_instances
-        await _restart(self._controller_config, subprocess.id)
-
     async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
         return SupervisordSubprocess(subprocess_config, self, subprocess_type)
 
-    async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
-        return await to_thread(_list_subprocesses, self._controller_config)
+    @async_in_a_thread
+    def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
+        assert self._controller_config is not None
+        return cast(Dict[KresID, SubprocessStatus], _list_running_subprocesses(self._controller_config))
diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py b/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py
new file mode 100644 (file)
index 0000000..10075e6
--- /dev/null
@@ -0,0 +1,121 @@
+import os
+from typing import List
+
+from jinja2 import Template
+
+from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.constants import (
+    kres_gc_executable,
+    kresd_cache_dir,
+    kresd_config_file,
+    kresd_executable,
+    supervisord_config_file,
+    supervisord_config_file_tmp,
+    supervisord_log_file,
+    supervisord_pid_file,
+    supervisord_sock_file,
+    supervisord_subprocess_log_dir,
+)
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
+from knot_resolver_manager.utils.async_utils import read_resource, writefile
+
+
+class SupervisordKresID(KresID):
+    # WARNING: be really careful with renaming. If the naming schema is changing,
+    # we should be able to parse the old one as well, otherwise updating manager will
+    # cause weird behavior
+
+    @staticmethod
+    def from_string(val: str) -> "SupervisordKresID":
+        if val == "gc":
+            return SupervisordKresID.new(SubprocessType.GC, -1)
+        else:
+            val = val.replace("kresd", "")
+            return SupervisordKresID.new(SubprocessType.KRESD, int(val))
+
+    def __str__(self) -> str:
+        if self.subprocess_type is SubprocessType.GC:
+            return "gc"
+        elif self.subprocess_type is SubprocessType.KRESD:
+            return f"kresd{self._id}"
+        else:
+            raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+
+
+def _get_command_based_on_type(config: KresConfig, i: "SupervisordKresID") -> str:
+    if i.subprocess_type is SubprocessType.KRESD:
+        return f"{kresd_executable()} -c {kresd_config_file(config, i)} -n"
+    elif i.subprocess_type is SubprocessType.GC:
+        return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
+    else:
+        raise NotImplementedError("This subprocess type is not supported")
+
+
+@dataclass
+class _Instance:
+    """
+    Data structure holding data for supervisord config template
+    """
+
+    type: str
+    logfile: str
+    id: str
+    workdir: str
+    command: str
+    environment: str
+
+    @staticmethod
+    def create_list(config: KresConfig) -> List["_Instance"]:
+        cwd = str(os.getcwd())
+
+        instances = [
+            SupervisordKresID(SubprocessType.KRESD, i, _i_know_what_i_am_doing=True)
+            for i in range(1, int(config.max_workers) + 1)
+        ] + [SupervisordKresID(SubprocessType.GC, -1, _i_know_what_i_am_doing=True)]
+
+        return [
+            _Instance(  # type: ignore[call-arg]
+                type=i.subprocess_type.name,
+                logfile=supervisord_subprocess_log_dir(config) / f"{i}.log",
+                id=str(i),
+                workdir=cwd,
+                command=_get_command_based_on_type(config, i),
+                environment=f"SYSTEMD_INSTANCE={i}",
+            )
+            for i in instances
+        ]
+
+
+@dataclass
+class SupervisordConfig:
+    unix_http_server: str
+    pid_file: str
+    workdir: str
+    logfile: str
+
+    @staticmethod
+    def create(config: KresConfig) -> "SupervisordConfig":
+        cwd = str(os.getcwd())
+        return SupervisordConfig(  # type: ignore[call-arg]
+            unix_http_server=supervisord_sock_file(config),
+            pid_file=supervisord_pid_file(config),
+            workdir=cwd,
+            logfile=supervisord_log_file(config),
+        )
+
+
+async def write_config_file(config: KresConfig) -> None:
+    if not supervisord_subprocess_log_dir(config).exists():
+        supervisord_subprocess_log_dir(config).mkdir(exist_ok=True)
+
+    template = await read_resource(__package__, "supervisord.conf.j2")
+    assert template is not None
+    template = template.decode("utf8")
+    config_string = Template(template).render(  # pyright: reportUnknownMemberType=false
+        instances=_Instance.create_list(config),
+        config=SupervisordConfig.create(config),
+    )
+    await writefile(supervisord_config_file_tmp(config), config_string)
+    # atomically replace (we don't technically need this right now, but better safe then sorry)
+    os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))
index 84c7c55c322599c75f3ca1f380e49edeb5214a45..103f889f79e853d992aecc1832df7c85d7c09e18 100644 (file)
@@ -28,5 +28,8 @@ stderr_logfile={{ instance.logfile }}
 directory={{ instance.workdir }}
 command={{ instance.command }}
 environment={{ instance.environment }}
+autostart=false
+autorestart=true
+startsecs=1  # this is what makes start operation slow, it would be better replaced with something similar to systemd notify
 
 {%- endfor -%}
\ No newline at end of file
index 643a926a16e5b6ab682735a0dd583e187f6a76c0..0f9d1454e3332117d862404d2650ddcf7dfb2ce7 100644 (file)
@@ -125,6 +125,7 @@ disable= [
     "unsupported-delete-operation", # checked by pyright
     "unsubscriptable-object", # checked by pyright
     "unsupported-membership-test", # checked by pyright
+    "invalid-overridden-method",  # hopefully checked by type checkers
 ]
 
 [tool.pylint.SIMILARITIES]