import logging
from os import kill
from pathlib import Path
+from time import sleep
from typing import Any, Dict, Iterable, Optional, Union, cast
-from xmlrpc.client import ServerProxy
+from xmlrpc.client import Fault, ServerProxy
import supervisor.xmlrpc # type: ignore[import]
@async_in_a_thread
def _stop_supervisord(config: KresConfig) -> None:
supervisord = _create_supervisord_proxy(config)
+ pid = supervisord.getPID()
supervisord.shutdown()
+ try:
+ while True:
+ kill(pid, 0)
+ sleep(0.1)
+ except ProcessLookupError:
+ pass # there is finally no supervisord process
supervisord_config_file(config).unlink()
super().__init__(config, base_id)
self._controller: "SupervisordSubprocessController" = controller
+ def _name(self):
+ if self.type is SubprocessType.GC:
+ return str(self.id)
+ else:
+ return f"kresd:{self.id}"
+
@async_in_a_thread
def _start(self) -> None:
- supervisord = _create_supervisord_proxy(self._config)
- supervisord.startProcess(str(self.id))
+ try:
+ supervisord = _create_supervisord_proxy(self._config)
+ supervisord.startProcess(self._name())
+ except Fault as e:
+ raise SubprocessControllerException(f"failed to start '{self.id}'") from e
@async_in_a_thread
def _stop(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
- supervisord.stopProcess(str(self.id))
+ supervisord.stopProcess(self._name())
@async_in_a_thread
def _restart(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
- supervisord.stopProcess(str(self.id))
- supervisord.startProcess(str(self.id))
+ supervisord.stopProcess(self._name())
+ supervisord.startProcess(self._name())
def get_used_config(self) -> KresConfig:
return self._config
import os
-from typing import List
from jinja2 import Template
from knot_resolver_manager.constants import (
kres_gc_executable,
kresd_cache_dir,
- kresd_config_file,
+ kresd_config_file_supervisord_pattern,
kresd_executable,
supervisord_config_file,
supervisord_config_file_tmp,
raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
-def _get_command_based_on_type(config: KresConfig, i: "SupervisordKresID") -> str:
- if i.subprocess_type is SubprocessType.KRESD:
- return f"{kresd_executable()} -c {kresd_config_file(config, i)} -n"
- elif i.subprocess_type is SubprocessType.GC:
- return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
- else:
- raise NotImplementedError("This subprocess type is not supported")
-
-
@dataclass
-class _Instance:
+class ProcessTypeConfig:
"""
Data structure holding data for supervisord config template
"""
- type: str
logfile: str
- id: str
workdir: str
command: str
environment: str
+ max_procs: int = 1
@staticmethod
- def create_list(config: KresConfig) -> List["_Instance"]:
+ def create_gc_config(config: KresConfig) -> "ProcessTypeConfig":
cwd = str(os.getcwd())
+ return ProcessTypeConfig( # type: ignore[call-arg]
+ logfile=supervisord_subprocess_log_dir(config) / "gc.log",
+ workdir=cwd,
+ command=f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000",
+ environment="",
+ )
- instances = [
- SupervisordKresID(SubprocessType.KRESD, i, _i_know_what_i_am_doing=True)
- for i in range(1, int(config.max_workers) + 1)
- ] + [SupervisordKresID(SubprocessType.GC, -1, _i_know_what_i_am_doing=True)]
-
- return [
- _Instance( # type: ignore[call-arg]
- type=i.subprocess_type.name,
- logfile=supervisord_subprocess_log_dir(config) / f"{i}.log",
- id=str(i),
- workdir=cwd,
- command=_get_command_based_on_type(config, i),
- environment=f"SYSTEMD_INSTANCE={i}",
- )
- for i in instances
- ]
+ @staticmethod
+ def create_kresd_config(config: KresConfig) -> "ProcessTypeConfig":
+ cwd = str(os.getcwd())
+ return ProcessTypeConfig( # type: ignore[call-arg]
+ logfile=supervisord_subprocess_log_dir(config) / "kresd%(process_num)d.log",
+ workdir=cwd,
+ command=f"{kresd_executable()} -c {kresd_config_file_supervisord_pattern(config)} -n",
+ environment='SYSTEMD_INSTANCE="%(process_num)d",X-SUPERVISORD-TYPE=notify',
+ max_procs=config.max_workers,
+ )
@dataclass
assert template is not None
template = template.decode("utf8")
config_string = Template(template).render( # pyright: reportUnknownMemberType=false
- instances=_Instance.create_list(config),
+ gc=ProcessTypeConfig.create_gc_config(config),
+ kresd=ProcessTypeConfig.create_kresd_config(config),
config=SupervisordConfig.create(config),
)
await writefile(supervisord_config_file_tmp(config), config_string)
+# type: ignore
+# pylint: disable=protected-access
+# pylint: disable=c-extension-no-member
import os
import signal
-from typing import Any, Dict, List, Optional, Tuple
+import time
+from functools import partial
+from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
from supervisor.medusa.asyncore_25 import compact_traceback
-from supervisor.options import ServerOptions
from supervisor.process import Subprocess
from supervisor.states import ProcessStates
from supervisor.supervisord import Supervisor
starting_processes: List[Subprocess] = []
+def is_type_notify(proc: Subprocess) -> bool:
+ return proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
+
+
class NotifySocketDispatcher:
"""
See supervisor.dispatcher
self.closed = False # True if close() has been called
def __repr__(self):
- return "<%s with fd=%s>" % (self.__class__.__name__, self.fd)
+ return f"<{self.__class__.__name__} with fd={self.fd}>"
def readable(self):
return True
return False
def handle_read_event(self):
+ logger: Any = self._supervisor.options.logger
+
res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
if res is None:
return None # there was some junk
pid, data = res
if data.startswith(b"READY=1"):
- # some process is really ready
- global starting_processes
+ # handle case, when some process is really ready
+ # pylint: disable=undefined-loop-variable
for proc in starting_processes:
if proc.pid == pid:
break
else:
- print(f"[notify] we've got ready notification from some unknown pid={pid}")
+ logger.warn(f"ignoring ready notification from unregistered PID={pid}")
return None
- print("[notify] received ready notification, marking as RUNNING")
- proc._assertInState(ProcessStates.STARTING)
- proc.change_state(ProcessStates.RUNNING)
+ if is_type_notify(proc):
+ proc._assertInState(ProcessStates.STARTING)
+ proc.change_state(ProcessStates.RUNNING)
+ logger.info(
+ f"success: {proc.config.name} entered RUNNING state, process sent ready notification via $NOTIFY_SOCKET"
+ )
+ else:
+ logger.warn(f"ignoring ready notification from {proc.config.name}, which is not configured to send it")
+
else:
- # we got some junk
- print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'")
+ # handle case, when we got something unexpected
+ logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
return None
def handle_write_event(self):
raise ValueError("this dispatcher is not writable")
def handle_error(self):
- nil, t, v, tbinfo = compact_traceback()
+ _nil, t, v, tbinfo = compact_traceback()
- print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo))
+ self._supervisor.options.logger.error(
+ f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
+ )
self.close()
def close(self):
pass
-def on_process_state_change(event: ProcessStateEvent) -> None:
+def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
global starting_processes
proc: Subprocess = event.process
notify_dispatcher: Optional[NotifySocketDispatcher] = None
-def monkeypatch(supervisord: Supervisor) -> None:
- """We inject ourselves into supervisord code:
+def process_transition(slf: Subprocess) -> None:
+ if not is_type_notify(slf):
+ return slf
+
+ # modified version of upstream process transition code
+ if slf.state == ProcessStates.STARTING:
+ if time.time() - slf.laststart > slf.config.startsecs:
+ # STARTING -> STOPPING if the process has not sent ready notification
+ # within proc.config.startsecs
+ slf.config.options.logger.warn(
+ f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+ )
+ slf.kill(signal.SIGKILL)
+ slf.laststart = time.time() # prevent further state transition from happening
+
+ # return self for chaining
+ return slf
- - inject our notify socket to the event loop
- - subscribe to all state change events
- """
- old: Any = supervisord.get_process_map
+def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
+ global notify_dispatcher
+ if notify_dispatcher is None:
+ notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+ supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
- def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]:
- global notify_dispatcher
- if notify_dispatcher is None:
- notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
- supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
+ # add our dispatcher to the result
+ assert notify_dispatcher.fd not in mp
+ mp[notify_dispatcher.fd] = notify_dispatcher
- # call the old method
- res = old(*args, **kwargs)
+ return mp
- # add our dispatcher to the result
- assert notify_dispatcher.fd not in res
- res[notify_dispatcher.fd] = notify_dispatcher
- return res
+def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
+ if is_type_notify(slf):
+ slf.config.environment["NOTIFY_SOCKET"] = "@knot-resolver-control-socket"
+ slf.config.options.logger.info("setting env")
+ return (slf, *args)
- supervisord.get_process_map = get_process_map
- # subscribe to events
- subscribe(ProcessStateEvent, on_process_state_change)
+T = TypeVar("T")
+U = TypeVar("U")
-def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any:
+def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
+ def wrapper(*args: Any, **kwargs: Any) -> T:
+ res = first(*args, **kwargs)
+ if isinstance(res, tuple):
+ return second(*res)
+ else:
+ return second(res)
+
+ return wrapper
+
+
+def monkeypatch(supervisord: Supervisor) -> None:
+ """Inject ourselves into supervisord code"""
+
+ # append notify socket handler to event loopo
+ supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
+
+ # prepend timeout handler to transition method
+ Subprocess.transition = chain(process_transition, Subprocess.transition)
+
+ # add environment variable $NOTIFY_SOCKET to starting processes
+ Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
+
+ # keep references to starting subprocesses
+ subscribe(ProcessStateEvent, keep_track_of_starting_processes)
+
+
+def make_rpcinterface(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
monkeypatch(supervisord)
+
+ # this method is called by supervisord when loading the plugin,
+ # it should return XML-RPC object, which we don't care about
+ # That's why why are returning just None
return None