]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
controller/supervisord: platform portable config
authorAleš Mrázek <ales.mrazek@nic.cz>
Mon, 10 Nov 2025 20:31:33 +0000 (21:31 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Wed, 26 Nov 2025 17:28:05 +0000 (18:28 +0100)
python/knot_resolver/controller/supervisord/config_file.py
python/knot_resolver/controller/supervisord/plugin/sd_notify.py
python/knot_resolver/controller/supervisord/supervisord.conf.j2

index ccfc520d4b374d231e96725a8ec5416c2f78b1d9..9d02ba6cbfe4dc934aee588b5a426be349a473af 100644 (file)
@@ -1,12 +1,13 @@
 import logging
 import os
+import sys
 from dataclasses import dataclass
 from pathlib import Path
 from typing import Literal
 
 from jinja2 import Template
 
-from knot_resolver.constants import KRES_CACHE_GC_EXECUTABLE, KRESD_EXECUTABLE
+from knot_resolver.constants import APPLE_SYS, KRES_CACHE_GC_EXECUTABLE, KRESD_EXECUTABLE, LINUX_SYS
 from knot_resolver.controller.interface import KresID, SubprocessType
 from knot_resolver.datamodel.config_schema import KresConfig, workers_max_count
 from knot_resolver.datamodel.logging_schema import LogTargetEnum
@@ -82,6 +83,7 @@ class ProcessTypeConfig:
     logfile: Path
     workdir: str
     command: str
+    startsecs: int
     environment: str
     max_procs: int = 1
 
@@ -92,6 +94,7 @@ class ProcessTypeConfig:
             logfile=supervisord_subprocess_log_dir(config) / "gc.log",
             workdir=cwd,
             command=f"{KRES_CACHE_GC_EXECUTABLE} -c {kres_cache_dir(config)}{kres_cache_gc_args(config)}",
+            startsecs=0,
             environment="",
         )
 
@@ -102,25 +105,43 @@ class ProcessTypeConfig:
             logfile=supervisord_subprocess_log_dir(config) / "policy-loader.log",
             workdir=cwd,
             command=f"{KRESD_EXECUTABLE} -c {(policy_loader_config_file(config))} -c - -n",
+            startsecs=0,
             environment="",
         )
 
     @staticmethod
     def create_kresd_config(config: KresConfig) -> "ProcessTypeConfig":
         cwd = str(os.getcwd())
+        startsecs = 3
+        environment = 'SYSTEMD_INSTANCE="%(process_num)d"'
+
+        if LINUX_SYS:
+            # Wait for NOTIFY message
+            startsecs = 60
+            environment += ",X-SUPERVISORD-TYPE=notify"
+        if APPLE_SYS:
+            # There is no need to wait for anything on macOS
+            # No NOTIFY message and only 1 kresd worker
+            startsecs = 0
+
         return ProcessTypeConfig(  # type: ignore[call-arg]
             logfile=supervisord_subprocess_log_dir(config) / "kresd%(process_num)d.log",
             workdir=cwd,
             command=f"{KRESD_EXECUTABLE} -c {kresd_config_file_supervisord_pattern(config)} -n",
-            environment='SYSTEMD_INSTANCE="%(process_num)d",X-SUPERVISORD-TYPE=notify',
+            startsecs=startsecs,
+            environment=environment,
             max_procs=int(workers_max_count()) + 1,  # +1 for the canary process
         )
 
     @staticmethod
     def create_manager_config(_config: KresConfig) -> "ProcessTypeConfig":
-        # read original command from /proc
-        with open("/proc/self/cmdline", "rb") as f:
-            args = [s.decode("utf-8") for s in f.read()[:-1].split(b"\0")]
+        if LINUX_SYS:
+            # read original command from /proc
+            with open("/proc/self/cmdline", "rb") as f:
+                args = [s.decode("utf-8") for s in f.read()[:-1].split(b"\0")]
+        else:
+            # other systems
+            args = [sys.executable] + sys.argv
 
         # insert debugger when asked
         if os.environ.get("KRES_DEBUG_MANAGER"):
@@ -130,11 +151,15 @@ class ProcessTypeConfig:
             args = args[:1] + ["-m", "debugpy", "--listen", "0.0.0.0:5678", "--wait-for-client"] + args[2:]
 
         cmd = '"' + '" "'.join(args) + '"'
+        environment = "KRES_SUPRESS_LOG_PREFIX=true"
+        if LINUX_SYS:
+            environment += ",X-SUPERVISORD-TYPE=notify"
 
         return ProcessTypeConfig(  # type: ignore[call-arg]
             workdir=user_constants().working_directory_on_startup,
             command=cmd,
-            environment="X-SUPERVISORD-TYPE=notify",
+            startsecs=600 if LINUX_SYS else 0,
+            environment=environment,
             logfile=Path(""),  # this will be ignored
         )
 
@@ -147,6 +172,7 @@ class SupervisordConfig:
     logfile: Path
     loglevel: Literal["critical", "error", "warn", "info", "debug", "trace", "blather"]
     target: LogTargetEnum
+    linux_sys: bool
 
     @staticmethod
     def create(config: KresConfig) -> "SupervisordConfig":
@@ -170,6 +196,7 @@ class SupervisordConfig:
             logfile=Path("syslog" if config.logging.target == "syslog" else "/dev/null"),
             loglevel=loglevel,  # type: ignore[arg-type]
             target=config.logging.target,
+            linux_sys=LINUX_SYS,
         )
 
 
index f1558d94d7bf486af3a0333a2d679b4438f182ac..cfdb622d6b38c977149f40a25290b2390c28fc44 100644 (file)
 # ruff: noqa: SLF001
 # pylint: disable=c-extension-no-member
 
-import os
-import signal
-import time
-from functools import partial
-from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+from knot_resolver.constants import LINUX_SYS
 
-from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
-from supervisor.medusa.asyncore_25 import compact_traceback
-from supervisor.process import Subprocess
-from supervisor.states import ProcessStates
-from supervisor.supervisord import Supervisor
+if LINUX_SYS:
+    import os
+    import signal
+    import time
+    from functools import partial
+    from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
 
-from knot_resolver.controller.supervisord.plugin import notify
+    from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
+    from supervisor.medusa.asyncore_25 import compact_traceback
+    from supervisor.process import Subprocess
+    from supervisor.states import ProcessStates
+    from supervisor.supervisord import Supervisor
 
-starting_processes: List[Subprocess] = []
+    from knot_resolver.controller.supervisord.plugin import notify
 
+    starting_processes: List[Subprocess] = []
 
-def is_type_notify(proc: Subprocess) -> bool:
-    return proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
-
-
-class NotifySocketDispatcher:
-    """
-    See supervisor.dispatcher
-    """
-
-    def __init__(self, supervisor: Supervisor, fd: int):
-        self._supervisor = supervisor
-        self.fd = fd
-        self.closed = False  # True if close() has been called
+    def is_type_notify(proc: Subprocess) -> bool:
+        return (
+            proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
+        )
 
-    def __repr__(self):
-        return f"<{self.__class__.__name__} with fd={self.fd}>"
+    class NotifySocketDispatcher:
+        """
+        See supervisor.dispatcher
+        """
 
-    def readable(self):
-        return True
+        def __init__(self, supervisor: Supervisor, fd: int):
+            self._supervisor = supervisor
+            self.fd = fd
+            self.closed = False  # True if close() has been called
 
-    def writable(self):
-        return False
+        def __repr__(self):
+            return f"<{self.__class__.__name__} with fd={self.fd}>"
 
-    def handle_read_event(self):
-        logger: Any = self._supervisor.options.logger
+        def readable(self):
+            return True
 
-        res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
-        if res is None:
-            return  # there was some junk
-        pid, data = res
+        def writable(self):
+            return False
 
-        # pylint: disable=undefined-loop-variable
-        for proc in starting_processes:
-            if proc.pid == pid:
-                break
-        else:
-            logger.warn(f"ignoring ready notification from unregistered PID={pid}")
-            return
+        def handle_read_event(self):
+            logger: Any = self._supervisor.options.logger
 
-        if data.startswith(b"READY=1"):
-            # handle case, when some process is really ready
+            res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
+            if res is None:
+                return  # there was some junk
+            pid, data = res
 
-            if is_type_notify(proc):
-                proc._assertInState(ProcessStates.STARTING)
-                proc.change_state(ProcessStates.RUNNING)
-                logger.info(
-                    f"success: {proc.config.name} entered RUNNING state, process sent notification via $NOTIFY_SOCKET"
-                )
+            # pylint: disable=undefined-loop-variable
+            for proc in starting_processes:
+                if proc.pid == pid:
+                    break
             else:
-                logger.warn(f"ignoring READY notification from {proc.config.name}, which is not configured to send it")
+                logger.warn(f"ignoring ready notification from unregistered PID={pid}")
+                return
+
+            if data.startswith(b"READY=1"):
+                # handle case, when some process is really ready
+
+                if is_type_notify(proc):
+                    proc._assertInState(ProcessStates.STARTING)
+                    proc.change_state(ProcessStates.RUNNING)
+                    logger.info(
+                        f"success: {proc.config.name} entered RUNNING state, process sent notification via $NOTIFY_SOCKET"
+                    )
+                else:
+                    logger.warn(
+                        f"ignoring READY notification from {proc.config.name}, which is not configured to send it"
+                    )
+
+            elif data.startswith(b"STOPPING=1"):
+                # just accept the message, filter unwanted notifications and do nothing else
+
+                if is_type_notify(proc):
+                    logger.info(
+                        f"success: {proc.config.name} entered STOPPING state, process sent notification via $NOTIFY_SOCKET"
+                    )
+                else:
+                    logger.warn(
+                        f"ignoring STOPPING notification from {proc.config.name}, which is not configured to send it"
+                    )
 
-        elif data.startswith(b"STOPPING=1"):
-            # just accept the message, filter unwanted notifications and do nothing else
-
-            if is_type_notify(proc):
-                logger.info(
-                    f"success: {proc.config.name} entered STOPPING state, process sent notification via $NOTIFY_SOCKET"
-                )
             else:
-                logger.warn(
-                    f"ignoring STOPPING notification from {proc.config.name}, which is not configured to send it"
-                )
-
-        else:
-            # handle case, when we got something unexpected
-            logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
-            return
-
-    def handle_write_event(self):
-        raise ValueError("this dispatcher is not writable")
-
-    def handle_error(self):
-        _nil, t, v, tbinfo = compact_traceback()
-
-        self._supervisor.options.logger.error(
-            f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
-        )
-        self.close()
-
-    def close(self):
-        if not self.closed:
-            os.close(self.fd)
-            self.closed = True
+                # handle case, when we got something unexpected
+                logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
+                return
 
-    def flush(self):
-        pass
+        def handle_write_event(self):
+            raise ValueError("this dispatcher is not writable")
 
+        def handle_error(self):
+            _nil, t, v, tbinfo = compact_traceback()
 
-def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
-    global starting_processes
-
-    proc: Subprocess = event.process
-
-    if isinstance(event, ProcessStateStartingEvent):
-        # process is starting
-        # if proc not in starting_processes:
-        starting_processes.append(proc)
-
-    else:
-        # not starting
-        starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
-
-
-notify_dispatcher: Optional[NotifySocketDispatcher] = None
-
-
-def process_transition(slf: Subprocess) -> None:
-    if not is_type_notify(slf):
-        return slf
-
-    # modified version of upstream process transition code
-    if slf.state == ProcessStates.STARTING:
-        if time.time() - slf.laststart > slf.config.startsecs:
-            # STARTING -> STOPPING if the process has not sent ready notification
-            # within proc.config.startsecs
-            slf.config.options.logger.warn(
-                f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+            self._supervisor.options.logger.error(
+                f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
             )
-            slf.kill(signal.SIGKILL)
-            slf.x_notifykilled = True  # used in finish() function to set to FATAL state
-            slf.laststart = time.time() + 1  # prevent immediate state transition to RUNNING from happening
-
-    # return self for chaining
-    return slf
+            self.close()
 
+        def close(self):
+            if not self.closed:
+                os.close(self.fd)
+                self.closed = True
 
-def subprocess_finish_tail(slf, pid, sts) -> Tuple[Any, Any, Any]:
-    if getattr(slf, "x_notifykilled", False):
-        # we want FATAL, not STOPPED state after timeout waiting for startup notification
-        # why? because it's likely not gonna help to try starting the process up again if
-        # it failed so early
-        slf.change_state(ProcessStates.FATAL)
+        def flush(self):
+            pass
 
-        # clear the marker value
-        del slf.x_notifykilled
+    def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
+        global starting_processes
 
-    # return for chaining
-    return slf, pid, sts
+        proc: Subprocess = event.process
 
+        if isinstance(event, ProcessStateStartingEvent):
+            # process is starting
+            # if proc not in starting_processes:
+            starting_processes.append(proc)
 
-def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
-    global notify_dispatcher
-    if notify_dispatcher is None:
-        notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
-        supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
-
-    # add our dispatcher to the result
-    assert notify_dispatcher.fd not in mp
-    mp[notify_dispatcher.fd] = notify_dispatcher
+        else:
+            # not starting
+            starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
+
+    notify_dispatcher: Optional[NotifySocketDispatcher] = None
+
+    def process_transition(slf: Subprocess) -> None:
+        if not is_type_notify(slf):
+            return slf
+
+        # modified version of upstream process transition code
+        if slf.state == ProcessStates.STARTING:
+            if time.time() - slf.laststart > slf.config.startsecs:
+                # STARTING -> STOPPING if the process has not sent ready notification
+                # within proc.config.startsecs
+                slf.config.options.logger.warn(
+                    f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+                )
+                slf.kill(signal.SIGKILL)
+                slf.x_notifykilled = True  # used in finish() function to set to FATAL state
+                slf.laststart = time.time() + 1  # prevent immediate state transition to RUNNING from happening
 
-    return mp
+        # return self for chaining
+        return slf
 
+    def subprocess_finish_tail(slf, pid, sts) -> Tuple[Any, Any, Any]:
+        if getattr(slf, "x_notifykilled", False):
+            # we want FATAL, not STOPPED state after timeout waiting for startup notification
+            # why? because it's likely not gonna help to try starting the process up again if
+            # it failed so early
+            slf.change_state(ProcessStates.FATAL)
 
-def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
-    if is_type_notify(slf):
-        slf.config.environment["NOTIFY_SOCKET"] = os.getcwd() + "/supervisor-notify-socket"
-    return (slf, *args)
+            # clear the marker value
+            del slf.x_notifykilled
 
+        # return for chaining
+        return slf, pid, sts
 
-T = TypeVar("T")
-U = TypeVar("U")
+    def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
+        global notify_dispatcher
+        if notify_dispatcher is None:
+            notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+            supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
 
+        # add our dispatcher to the result
+        assert notify_dispatcher.fd not in mp
+        mp[notify_dispatcher.fd] = notify_dispatcher
 
-def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
-    def wrapper(*args: Any, **kwargs: Any) -> T:
-        res = first(*args, **kwargs)
-        if isinstance(res, tuple):
-            return second(*res)
-        return second(res)
+        return mp
 
-    return wrapper
+    def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
+        if is_type_notify(slf):
+            slf.config.environment["NOTIFY_SOCKET"] = os.getcwd() + "/supervisor-notify-socket"
+        return (slf, *args)
 
+    T = TypeVar("T")
+    U = TypeVar("U")
 
-def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[..., T]:
-    def wrapper(*args: Any, **kwargs: Any) -> T:
-        res = first(*args, **kwargs)
-        second(*args, **kwargs)
-        return res
+    def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
+        def wrapper(*args: Any, **kwargs: Any) -> T:
+            res = first(*args, **kwargs)
+            if isinstance(res, tuple):
+                return second(*res)
+            return second(res)
 
-    return wrapper
+        return wrapper
 
+    def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[..., T]:
+        def wrapper(*args: Any, **kwargs: Any) -> T:
+            res = first(*args, **kwargs)
+            second(*args, **kwargs)
+            return res
 
-def monkeypatch(supervisord: Supervisor) -> None:
-    """Inject ourselves into supervisord code"""
+        return wrapper
 
-    # append notify socket handler to event loop
-    supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
+    def monkeypatch(supervisord: Supervisor) -> None:
+        """Inject ourselves into supervisord code"""
 
-    # prepend timeout handler to transition method
-    Subprocess.transition = chain(process_transition, Subprocess.transition)
-    Subprocess.finish = append(Subprocess.finish, subprocess_finish_tail)
+        # append notify socket handler to event loop
+        supervisord.get_process_map = chain(
+            supervisord.get_process_map, partial(supervisord_get_process_map, supervisord)
+        )
 
-    # add environment variable $NOTIFY_SOCKET to starting processes
-    Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
+        # prepend timeout handler to transition method
+        Subprocess.transition = chain(process_transition, Subprocess.transition)
+        Subprocess.finish = append(Subprocess.finish, subprocess_finish_tail)
 
-    # keep references to starting subprocesses
-    subscribe(ProcessStateEvent, keep_track_of_starting_processes)
+        # add environment variable $NOTIFY_SOCKET to starting processes
+        Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
 
+        # keep references to starting subprocesses
+        subscribe(ProcessStateEvent, keep_track_of_starting_processes)
 
-def inject(supervisord: Supervisor, **_config: Any) -> Any:  # pylint: disable=useless-return
-    monkeypatch(supervisord)
+    def inject(supervisord: Supervisor, **_config: Any) -> Any:  # pylint: disable=useless-return
+        monkeypatch(supervisord)
 
-    # this method is called by supervisord when loading the plugin,
-    # it should return XML-RPC object, which we don't care about
-    # That's why why are returning just None
-    return None
+        # this method is called by supervisord when loading the plugin,
+        # it should return XML-RPC object, which we don't care about
+        # That's why why are returning just None
+        return None
index 846f3110aad4631f516819b2e282b285b6fe5f30..09b61f9ac27b5173e12b46a435f45630bf91dfd6 100644 (file)
@@ -26,8 +26,11 @@ target = {{ config.target }}
 [rpcinterface:manager_integration]
 supervisor.rpcinterface_factory = knot_resolver.controller.supervisord.plugin.manager_integration:inject
 
+{# sd_notify is supported only on Linux based systems #}
+{% if config.linux_sys -%}
 [rpcinterface:sd_notify]
 supervisor.rpcinterface_factory = knot_resolver.controller.supervisord.plugin.sd_notify:inject
+{%- endif %}
 
 {# Extensions for actual API control #}
 [rpcinterface:supervisor]
@@ -49,8 +52,8 @@ autostart=true
    i.e. it might take lots of time currently, if the user configured very large rulesets (e.g. huge RPZ).
    Let's permit it lots of time, assuming that useful work is being done.
 #}
-startsecs=600
-environment={{ manager.environment }},KRES_SUPRESS_LOG_PREFIX=true
+startsecs={{ manager.startsecs }}
+environment={{ manager.environment }}
 stdout_logfile=NONE
 stderr_logfile=NONE
 
@@ -63,7 +66,7 @@ autostart=false
 autorestart=true
 stopsignal=TERM
 killasgroup=true
-startsecs=60
+startsecs={{ kresd.startsecs }}
 environment={{ kresd.environment }}
 stdout_logfile=NONE
 stderr_logfile=NONE
@@ -74,8 +77,8 @@ command={{ loader.command }}
 autostart=false
 stopsignal=TERM
 killasgroup=true
-startsecs=0
 exitcodes=0
+startsecs={{ loader.startsecs }}
 environment={{ loader.environment }}
 stdout_logfile=NONE
 stderr_logfile=NONE
@@ -88,7 +91,7 @@ autostart=false
 autorestart=true
 stopsignal=TERM
 killasgroup=true
-startsecs=0
+startsecs={{ gc.startsecs }}
 environment={{ gc.environment }}
 stdout_logfile=NONE
 stderr_logfile=NONE