import logging
import os
+import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from jinja2 import Template
-from knot_resolver.constants import KRES_CACHE_GC_EXECUTABLE, KRESD_EXECUTABLE
+from knot_resolver.constants import APPLE_SYS, KRES_CACHE_GC_EXECUTABLE, KRESD_EXECUTABLE, LINUX_SYS
from knot_resolver.controller.interface import KresID, SubprocessType
from knot_resolver.datamodel.config_schema import KresConfig, workers_max_count
from knot_resolver.datamodel.logging_schema import LogTargetEnum
logfile: Path
workdir: str
command: str
+ startsecs: int
environment: str
max_procs: int = 1
logfile=supervisord_subprocess_log_dir(config) / "gc.log",
workdir=cwd,
command=f"{KRES_CACHE_GC_EXECUTABLE} -c {kres_cache_dir(config)}{kres_cache_gc_args(config)}",
+ startsecs=0,
environment="",
)
logfile=supervisord_subprocess_log_dir(config) / "policy-loader.log",
workdir=cwd,
command=f"{KRESD_EXECUTABLE} -c {(policy_loader_config_file(config))} -c - -n",
+ startsecs=0,
environment="",
)
@staticmethod
def create_kresd_config(config: KresConfig) -> "ProcessTypeConfig":
cwd = str(os.getcwd())
+ startsecs = 3
+ environment = 'SYSTEMD_INSTANCE="%(process_num)d"'
+
+ if LINUX_SYS:
+ # Wait for NOTIFY message
+ startsecs = 60
+ environment += ",X-SUPERVISORD-TYPE=notify"
+ if APPLE_SYS:
+ # There is no need to wait for anything on macOS
+ # No NOTIFY message and only 1 kresd worker
+ startsecs = 0
+
return ProcessTypeConfig( # type: ignore[call-arg]
logfile=supervisord_subprocess_log_dir(config) / "kresd%(process_num)d.log",
workdir=cwd,
command=f"{KRESD_EXECUTABLE} -c {kresd_config_file_supervisord_pattern(config)} -n",
- environment='SYSTEMD_INSTANCE="%(process_num)d",X-SUPERVISORD-TYPE=notify',
+ startsecs=startsecs,
+ environment=environment,
max_procs=int(workers_max_count()) + 1, # +1 for the canary process
)
@staticmethod
def create_manager_config(_config: KresConfig) -> "ProcessTypeConfig":
- # read original command from /proc
- with open("/proc/self/cmdline", "rb") as f:
- args = [s.decode("utf-8") for s in f.read()[:-1].split(b"\0")]
+ if LINUX_SYS:
+ # read original command from /proc
+ with open("/proc/self/cmdline", "rb") as f:
+ args = [s.decode("utf-8") for s in f.read()[:-1].split(b"\0")]
+ else:
+ # other systems
+ args = [sys.executable] + sys.argv
# insert debugger when asked
if os.environ.get("KRES_DEBUG_MANAGER"):
args = args[:1] + ["-m", "debugpy", "--listen", "0.0.0.0:5678", "--wait-for-client"] + args[2:]
cmd = '"' + '" "'.join(args) + '"'
+ environment = "KRES_SUPRESS_LOG_PREFIX=true"
+ if LINUX_SYS:
+ environment += ",X-SUPERVISORD-TYPE=notify"
return ProcessTypeConfig( # type: ignore[call-arg]
workdir=user_constants().working_directory_on_startup,
command=cmd,
- environment="X-SUPERVISORD-TYPE=notify",
+ startsecs=600 if LINUX_SYS else 0,
+ environment=environment,
logfile=Path(""), # this will be ignored
)
logfile: Path
loglevel: Literal["critical", "error", "warn", "info", "debug", "trace", "blather"]
target: LogTargetEnum
+ linux_sys: bool
@staticmethod
def create(config: KresConfig) -> "SupervisordConfig":
logfile=Path("syslog" if config.logging.target == "syslog" else "/dev/null"),
loglevel=loglevel, # type: ignore[arg-type]
target=config.logging.target,
+ linux_sys=LINUX_SYS,
)
# ruff: noqa: SLF001
# pylint: disable=c-extension-no-member
-import os
-import signal
-import time
-from functools import partial
-from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+from knot_resolver.constants import LINUX_SYS
-from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
-from supervisor.medusa.asyncore_25 import compact_traceback
-from supervisor.process import Subprocess
-from supervisor.states import ProcessStates
-from supervisor.supervisord import Supervisor
+if LINUX_SYS:
+ import os
+ import signal
+ import time
+ from functools import partial
+ from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
-from knot_resolver.controller.supervisord.plugin import notify
+ from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
+ from supervisor.medusa.asyncore_25 import compact_traceback
+ from supervisor.process import Subprocess
+ from supervisor.states import ProcessStates
+ from supervisor.supervisord import Supervisor
-starting_processes: List[Subprocess] = []
+ from knot_resolver.controller.supervisord.plugin import notify
+ starting_processes: List[Subprocess] = []
-def is_type_notify(proc: Subprocess) -> bool:
- return proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
-
-
-class NotifySocketDispatcher:
- """
- See supervisor.dispatcher
- """
-
- def __init__(self, supervisor: Supervisor, fd: int):
- self._supervisor = supervisor
- self.fd = fd
- self.closed = False # True if close() has been called
+ def is_type_notify(proc: Subprocess) -> bool:
+ return (
+ proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
+ )
- def __repr__(self):
- return f"<{self.__class__.__name__} with fd={self.fd}>"
+ class NotifySocketDispatcher:
+ """
+ See supervisor.dispatcher
+ """
- def readable(self):
- return True
+ def __init__(self, supervisor: Supervisor, fd: int):
+ self._supervisor = supervisor
+ self.fd = fd
+ self.closed = False # True if close() has been called
- def writable(self):
- return False
+ def __repr__(self):
+ return f"<{self.__class__.__name__} with fd={self.fd}>"
- def handle_read_event(self):
- logger: Any = self._supervisor.options.logger
+ def readable(self):
+ return True
- res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
- if res is None:
- return # there was some junk
- pid, data = res
+ def writable(self):
+ return False
- # pylint: disable=undefined-loop-variable
- for proc in starting_processes:
- if proc.pid == pid:
- break
- else:
- logger.warn(f"ignoring ready notification from unregistered PID={pid}")
- return
+ def handle_read_event(self):
+ logger: Any = self._supervisor.options.logger
- if data.startswith(b"READY=1"):
- # handle case, when some process is really ready
+ res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
+ if res is None:
+ return # there was some junk
+ pid, data = res
- if is_type_notify(proc):
- proc._assertInState(ProcessStates.STARTING)
- proc.change_state(ProcessStates.RUNNING)
- logger.info(
- f"success: {proc.config.name} entered RUNNING state, process sent notification via $NOTIFY_SOCKET"
- )
+ # pylint: disable=undefined-loop-variable
+ for proc in starting_processes:
+ if proc.pid == pid:
+ break
else:
- logger.warn(f"ignoring READY notification from {proc.config.name}, which is not configured to send it")
+ logger.warn(f"ignoring ready notification from unregistered PID={pid}")
+ return
+
+ if data.startswith(b"READY=1"):
+ # handle case, when some process is really ready
+
+ if is_type_notify(proc):
+ proc._assertInState(ProcessStates.STARTING)
+ proc.change_state(ProcessStates.RUNNING)
+ logger.info(
+ f"success: {proc.config.name} entered RUNNING state, process sent notification via $NOTIFY_SOCKET"
+ )
+ else:
+ logger.warn(
+ f"ignoring READY notification from {proc.config.name}, which is not configured to send it"
+ )
+
+ elif data.startswith(b"STOPPING=1"):
+ # just accept the message, filter unwanted notifications and do nothing else
+
+ if is_type_notify(proc):
+ logger.info(
+ f"success: {proc.config.name} entered STOPPING state, process sent notification via $NOTIFY_SOCKET"
+ )
+ else:
+ logger.warn(
+ f"ignoring STOPPING notification from {proc.config.name}, which is not configured to send it"
+ )
- elif data.startswith(b"STOPPING=1"):
- # just accept the message, filter unwanted notifications and do nothing else
-
- if is_type_notify(proc):
- logger.info(
- f"success: {proc.config.name} entered STOPPING state, process sent notification via $NOTIFY_SOCKET"
- )
else:
- logger.warn(
- f"ignoring STOPPING notification from {proc.config.name}, which is not configured to send it"
- )
-
- else:
- # handle case, when we got something unexpected
- logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
- return
-
- def handle_write_event(self):
- raise ValueError("this dispatcher is not writable")
-
- def handle_error(self):
- _nil, t, v, tbinfo = compact_traceback()
-
- self._supervisor.options.logger.error(
- f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
- )
- self.close()
-
- def close(self):
- if not self.closed:
- os.close(self.fd)
- self.closed = True
+ # handle case, when we got something unexpected
+ logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
+ return
- def flush(self):
- pass
+ def handle_write_event(self):
+ raise ValueError("this dispatcher is not writable")
+ def handle_error(self):
+ _nil, t, v, tbinfo = compact_traceback()
-def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
- global starting_processes
-
- proc: Subprocess = event.process
-
- if isinstance(event, ProcessStateStartingEvent):
- # process is starting
- # if proc not in starting_processes:
- starting_processes.append(proc)
-
- else:
- # not starting
- starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
-
-
-notify_dispatcher: Optional[NotifySocketDispatcher] = None
-
-
-def process_transition(slf: Subprocess) -> None:
- if not is_type_notify(slf):
- return slf
-
- # modified version of upstream process transition code
- if slf.state == ProcessStates.STARTING:
- if time.time() - slf.laststart > slf.config.startsecs:
- # STARTING -> STOPPING if the process has not sent ready notification
- # within proc.config.startsecs
- slf.config.options.logger.warn(
- f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+ self._supervisor.options.logger.error(
+ f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
)
- slf.kill(signal.SIGKILL)
- slf.x_notifykilled = True # used in finish() function to set to FATAL state
- slf.laststart = time.time() + 1 # prevent immediate state transition to RUNNING from happening
-
- # return self for chaining
- return slf
+ self.close()
+ def close(self):
+ if not self.closed:
+ os.close(self.fd)
+ self.closed = True
-def subprocess_finish_tail(slf, pid, sts) -> Tuple[Any, Any, Any]:
- if getattr(slf, "x_notifykilled", False):
- # we want FATAL, not STOPPED state after timeout waiting for startup notification
- # why? because it's likely not gonna help to try starting the process up again if
- # it failed so early
- slf.change_state(ProcessStates.FATAL)
+ def flush(self):
+ pass
- # clear the marker value
- del slf.x_notifykilled
+ def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
+ global starting_processes
- # return for chaining
- return slf, pid, sts
+ proc: Subprocess = event.process
+ if isinstance(event, ProcessStateStartingEvent):
+ # process is starting
+ # if proc not in starting_processes:
+ starting_processes.append(proc)
-def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
- global notify_dispatcher
- if notify_dispatcher is None:
- notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
- supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
-
- # add our dispatcher to the result
- assert notify_dispatcher.fd not in mp
- mp[notify_dispatcher.fd] = notify_dispatcher
+ else:
+ # not starting
+ starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
+
+ notify_dispatcher: Optional[NotifySocketDispatcher] = None
+
+ def process_transition(slf: Subprocess) -> None:
+ if not is_type_notify(slf):
+ return slf
+
+ # modified version of upstream process transition code
+ if slf.state == ProcessStates.STARTING:
+ if time.time() - slf.laststart > slf.config.startsecs:
+ # STARTING -> STOPPING if the process has not sent ready notification
+ # within proc.config.startsecs
+ slf.config.options.logger.warn(
+ f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+ )
+ slf.kill(signal.SIGKILL)
+ slf.x_notifykilled = True # used in finish() function to set to FATAL state
+ slf.laststart = time.time() + 1 # prevent immediate state transition to RUNNING from happening
- return mp
+ # return self for chaining
+ return slf
+ def subprocess_finish_tail(slf, pid, sts) -> Tuple[Any, Any, Any]:
+ if getattr(slf, "x_notifykilled", False):
+ # we want FATAL, not STOPPED state after timeout waiting for startup notification
+ # why? because it's likely not gonna help to try starting the process up again if
+ # it failed so early
+ slf.change_state(ProcessStates.FATAL)
-def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
- if is_type_notify(slf):
- slf.config.environment["NOTIFY_SOCKET"] = os.getcwd() + "/supervisor-notify-socket"
- return (slf, *args)
+ # clear the marker value
+ del slf.x_notifykilled
+ # return for chaining
+ return slf, pid, sts
-T = TypeVar("T")
-U = TypeVar("U")
+ def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
+ global notify_dispatcher
+ if notify_dispatcher is None:
+ notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+ supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
+ # add our dispatcher to the result
+ assert notify_dispatcher.fd not in mp
+ mp[notify_dispatcher.fd] = notify_dispatcher
-def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
- def wrapper(*args: Any, **kwargs: Any) -> T:
- res = first(*args, **kwargs)
- if isinstance(res, tuple):
- return second(*res)
- return second(res)
+ return mp
- return wrapper
+ def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
+ if is_type_notify(slf):
+ slf.config.environment["NOTIFY_SOCKET"] = os.getcwd() + "/supervisor-notify-socket"
+ return (slf, *args)
+ T = TypeVar("T")
+ U = TypeVar("U")
-def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[..., T]:
- def wrapper(*args: Any, **kwargs: Any) -> T:
- res = first(*args, **kwargs)
- second(*args, **kwargs)
- return res
+ def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
+ def wrapper(*args: Any, **kwargs: Any) -> T:
+ res = first(*args, **kwargs)
+ if isinstance(res, tuple):
+ return second(*res)
+ return second(res)
- return wrapper
+ return wrapper
+ def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[..., T]:
+ def wrapper(*args: Any, **kwargs: Any) -> T:
+ res = first(*args, **kwargs)
+ second(*args, **kwargs)
+ return res
-def monkeypatch(supervisord: Supervisor) -> None:
- """Inject ourselves into supervisord code"""
+ return wrapper
- # append notify socket handler to event loop
- supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
+ def monkeypatch(supervisord: Supervisor) -> None:
+ """Inject ourselves into supervisord code"""
- # prepend timeout handler to transition method
- Subprocess.transition = chain(process_transition, Subprocess.transition)
- Subprocess.finish = append(Subprocess.finish, subprocess_finish_tail)
+ # append notify socket handler to event loop
+ supervisord.get_process_map = chain(
+ supervisord.get_process_map, partial(supervisord_get_process_map, supervisord)
+ )
- # add environment variable $NOTIFY_SOCKET to starting processes
- Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
+ # prepend timeout handler to transition method
+ Subprocess.transition = chain(process_transition, Subprocess.transition)
+ Subprocess.finish = append(Subprocess.finish, subprocess_finish_tail)
- # keep references to starting subprocesses
- subscribe(ProcessStateEvent, keep_track_of_starting_processes)
+ # add environment variable $NOTIFY_SOCKET to starting processes
+ Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
+ # keep references to starting subprocesses
+ subscribe(ProcessStateEvent, keep_track_of_starting_processes)
-def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
- monkeypatch(supervisord)
+ def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
+ monkeypatch(supervisord)
- # this method is called by supervisord when loading the plugin,
- # it should return XML-RPC object, which we don't care about
- # That's why why are returning just None
- return None
+ # this method is called by supervisord when loading the plugin,
+ # it should return XML-RPC object, which we don't care about
+ # That's why why are returning just None
+ return None