From: Vasek Sraier Date: Fri, 17 Jun 2022 13:03:43 +0000 (+0200) Subject: made supervisord sd_notify() plugin properly functional + supervisord config changes X-Git-Tag: v6.0.0a1~33^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bc3f97d970ea67f829b68dfabecb62b1aecb1369;p=thirdparty%2Fknot-resolver.git made supervisord sd_notify() plugin properly functional + supervisord config changes - X-SUPERVISORD-TYPE=notify in a process's environment should make the process behave similarly to Type=notify systemd service units - startsec with the above means time, after which it will get killed without ready notification --- diff --git a/manager/knot_resolver_manager/constants.py b/manager/knot_resolver_manager/constants.py index 7d181ef76..ef3097425 100644 --- a/manager/knot_resolver_manager/constants.py +++ b/manager/knot_resolver_manager/constants.py @@ -34,6 +34,10 @@ def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path: return Path(f"{kres_id}.conf") +def kresd_config_file_supervisord_pattern(_config: "KresConfig") -> Path: + return Path("kresd%(process_num)d.conf") + + def supervisord_config_file(_config: "KresConfig") -> Path: return Path("supervisord.conf") diff --git a/manager/knot_resolver_manager/kres_manager.py b/manager/knot_resolver_manager/kres_manager.py index 1c6f91f63..de6066153 100644 --- a/manager/knot_resolver_manager/kres_manager.py +++ b/manager/knot_resolver_manager/kres_manager.py @@ -175,7 +175,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes # if it keeps running, the config is valid and others will soon join as well # if it crashes and the startup fails, then well, it's not running anymore... :) await self._spawn_new_worker(new) - except SubprocessError: + except (SubprocessError, SubprocessControllerException): logger.error("kresd with the new config failed to start, rejecting config") return Result.err("Canary kresd instance failed to start. Config is invalid.") diff --git a/manager/knot_resolver_manager/kresd_controller/__init__.py b/manager/knot_resolver_manager/kresd_controller/__init__.py index 24967673e..07d68f76f 100644 --- a/manager/knot_resolver_manager/kresd_controller/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/__init__.py @@ -101,5 +101,5 @@ async def get_controller_by_name(config: KresConfig, name: str) -> SubprocessCon # run the imports on module load -try_systemd() try_supervisord() +try_systemd() diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py index 0f84ee5a9..2fa571f8d 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py @@ -1,8 +1,9 @@ import logging from os import kill from pathlib import Path +from time import sleep from typing import Any, Dict, Iterable, Optional, Union, cast -from xmlrpc.client import ServerProxy +from xmlrpc.client import Fault, ServerProxy import supervisor.xmlrpc # type: ignore[import] @@ -40,7 +41,14 @@ async def _reload_supervisord(config: KresConfig) -> None: @async_in_a_thread def _stop_supervisord(config: KresConfig) -> None: supervisord = _create_supervisord_proxy(config) + pid = supervisord.getPID() supervisord.shutdown() + try: + while True: + kill(pid, 0) + sleep(0.1) + except ProcessLookupError: + pass # there is finally no supervisord process supervisord_config_file(config).unlink() @@ -126,21 +134,30 @@ class SupervisordSubprocess(Subprocess): super().__init__(config, base_id) self._controller: "SupervisordSubprocessController" = controller + def _name(self): + if self.type is SubprocessType.GC: + return str(self.id) + else: + return f"kresd:{self.id}" + @async_in_a_thread def _start(self) -> None: - supervisord = _create_supervisord_proxy(self._config) - supervisord.startProcess(str(self.id)) + try: + supervisord = _create_supervisord_proxy(self._config) + supervisord.startProcess(self._name()) + except Fault as e: + raise SubprocessControllerException(f"failed to start '{self.id}'") from e @async_in_a_thread def _stop(self) -> None: supervisord = _create_supervisord_proxy(self._config) - supervisord.stopProcess(str(self.id)) + supervisord.stopProcess(self._name()) @async_in_a_thread def _restart(self) -> None: supervisord = _create_supervisord_proxy(self._config) - supervisord.stopProcess(str(self.id)) - supervisord.startProcess(str(self.id)) + supervisord.stopProcess(self._name()) + supervisord.startProcess(self._name()) def get_used_config(self) -> KresConfig: return self._config diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py b/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py index 10075e6a8..49af21c8f 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py @@ -1,5 +1,4 @@ import os -from typing import List from jinja2 import Template @@ -7,7 +6,7 @@ from knot_resolver_manager.compat.dataclasses import dataclass from knot_resolver_manager.constants import ( kres_gc_executable, kresd_cache_dir, - kresd_config_file, + kresd_config_file_supervisord_pattern, kresd_executable, supervisord_config_file, supervisord_config_file_tmp, @@ -43,48 +42,38 @@ class SupervisordKresID(KresID): raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}") -def _get_command_based_on_type(config: KresConfig, i: "SupervisordKresID") -> str: - if i.subprocess_type is SubprocessType.KRESD: - return f"{kresd_executable()} -c {kresd_config_file(config, i)} -n" - elif i.subprocess_type is SubprocessType.GC: - return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000" - else: - raise NotImplementedError("This subprocess type is not supported") - - @dataclass -class _Instance: +class ProcessTypeConfig: """ Data structure holding data for supervisord config template """ - type: str logfile: str - id: str workdir: str command: str environment: str + max_procs: int = 1 @staticmethod - def create_list(config: KresConfig) -> List["_Instance"]: + def create_gc_config(config: KresConfig) -> "ProcessTypeConfig": cwd = str(os.getcwd()) + return ProcessTypeConfig( # type: ignore[call-arg] + logfile=supervisord_subprocess_log_dir(config) / "gc.log", + workdir=cwd, + command=f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000", + environment="", + ) - instances = [ - SupervisordKresID(SubprocessType.KRESD, i, _i_know_what_i_am_doing=True) - for i in range(1, int(config.max_workers) + 1) - ] + [SupervisordKresID(SubprocessType.GC, -1, _i_know_what_i_am_doing=True)] - - return [ - _Instance( # type: ignore[call-arg] - type=i.subprocess_type.name, - logfile=supervisord_subprocess_log_dir(config) / f"{i}.log", - id=str(i), - workdir=cwd, - command=_get_command_based_on_type(config, i), - environment=f"SYSTEMD_INSTANCE={i}", - ) - for i in instances - ] + @staticmethod + def create_kresd_config(config: KresConfig) -> "ProcessTypeConfig": + cwd = str(os.getcwd()) + return ProcessTypeConfig( # type: ignore[call-arg] + logfile=supervisord_subprocess_log_dir(config) / "kresd%(process_num)d.log", + workdir=cwd, + command=f"{kresd_executable()} -c {kresd_config_file_supervisord_pattern(config)} -n", + environment='SYSTEMD_INSTANCE="%(process_num)d",X-SUPERVISORD-TYPE=notify', + max_procs=config.max_workers, + ) @dataclass @@ -113,7 +102,8 @@ async def write_config_file(config: KresConfig) -> None: assert template is not None template = template.decode("utf8") config_string = Template(template).render( # pyright: reportUnknownMemberType=false - instances=_Instance.create_list(config), + gc=ProcessTypeConfig.create_gc_config(config), + kresd=ProcessTypeConfig.create_kresd_config(config), config=SupervisordConfig.create(config), ) await writefile(supervisord_config_file_tmp(config), config_string) diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py index 2f9f7df87..96f98a486 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py @@ -1,10 +1,14 @@ +# type: ignore +# pylint: disable=protected-access +# pylint: disable=c-extension-no-member import os import signal -from typing import Any, Dict, List, Optional, Tuple +import time +from functools import partial +from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe from supervisor.medusa.asyncore_25 import compact_traceback -from supervisor.options import ServerOptions from supervisor.process import Subprocess from supervisor.states import ProcessStates from supervisor.supervisord import Supervisor @@ -14,6 +18,10 @@ from knot_resolver_manager.kresd_controller.supervisord.plugin import notify starting_processes: List[Subprocess] = [] +def is_type_notify(proc: Subprocess) -> bool: + return proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify" + + class NotifySocketDispatcher: """ See supervisor.dispatcher @@ -25,7 +33,7 @@ class NotifySocketDispatcher: self.closed = False # True if close() has been called def __repr__(self): - return "<%s with fd=%s>" % (self.__class__.__name__, self.fd) + return f"<{self.__class__.__name__} with fd={self.fd}>" def readable(self): return True @@ -34,37 +42,47 @@ class NotifySocketDispatcher: return False def handle_read_event(self): + logger: Any = self._supervisor.options.logger + res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd) if res is None: return None # there was some junk pid, data = res if data.startswith(b"READY=1"): - # some process is really ready - global starting_processes + # handle case, when some process is really ready + # pylint: disable=undefined-loop-variable for proc in starting_processes: if proc.pid == pid: break else: - print(f"[notify] we've got ready notification from some unknown pid={pid}") + logger.warn(f"ignoring ready notification from unregistered PID={pid}") return None - print("[notify] received ready notification, marking as RUNNING") - proc._assertInState(ProcessStates.STARTING) - proc.change_state(ProcessStates.RUNNING) + if is_type_notify(proc): + proc._assertInState(ProcessStates.STARTING) + proc.change_state(ProcessStates.RUNNING) + logger.info( + f"success: {proc.config.name} entered RUNNING state, process sent ready notification via $NOTIFY_SOCKET" + ) + else: + logger.warn(f"ignoring ready notification from {proc.config.name}, which is not configured to send it") + else: - # we got some junk - print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'") + # handle case, when we got something unexpected + logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'") return None def handle_write_event(self): raise ValueError("this dispatcher is not writable") def handle_error(self): - nil, t, v, tbinfo = compact_traceback() + _nil, t, v, tbinfo = compact_traceback() - print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo)) + self._supervisor.options.logger.error( + f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})" + ) self.close() def close(self): @@ -76,7 +94,7 @@ class NotifySocketDispatcher: pass -def on_process_state_change(event: ProcessStateEvent) -> None: +def keep_track_of_starting_processes(event: ProcessStateEvent) -> None: global starting_processes proc: Subprocess = event.process @@ -93,36 +111,80 @@ def on_process_state_change(event: ProcessStateEvent) -> None: notify_dispatcher: Optional[NotifySocketDispatcher] = None -def monkeypatch(supervisord: Supervisor) -> None: - """We inject ourselves into supervisord code: +def process_transition(slf: Subprocess) -> None: + if not is_type_notify(slf): + return slf + + # modified version of upstream process transition code + if slf.state == ProcessStates.STARTING: + if time.time() - slf.laststart > slf.config.startsecs: + # STARTING -> STOPPING if the process has not sent ready notification + # within proc.config.startsecs + slf.config.options.logger.warn( + f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing" + ) + slf.kill(signal.SIGKILL) + slf.laststart = time.time() # prevent further state transition from happening + + # return self for chaining + return slf - - inject our notify socket to the event loop - - subscribe to all state change events - """ - old: Any = supervisord.get_process_map +def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]: + global notify_dispatcher + if notify_dispatcher is None: + notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket()) + supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop") - def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]: - global notify_dispatcher - if notify_dispatcher is None: - notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket()) - supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop") + # add our dispatcher to the result + assert notify_dispatcher.fd not in mp + mp[notify_dispatcher.fd] = notify_dispatcher - # call the old method - res = old(*args, **kwargs) + return mp - # add our dispatcher to the result - assert notify_dispatcher.fd not in res - res[notify_dispatcher.fd] = notify_dispatcher - return res +def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]: + if is_type_notify(slf): + slf.config.environment["NOTIFY_SOCKET"] = "@knot-resolver-control-socket" + slf.config.options.logger.info("setting env") + return (slf, *args) - supervisord.get_process_map = get_process_map - # subscribe to events - subscribe(ProcessStateEvent, on_process_state_change) +T = TypeVar("T") +U = TypeVar("U") -def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any: +def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]: + def wrapper(*args: Any, **kwargs: Any) -> T: + res = first(*args, **kwargs) + if isinstance(res, tuple): + return second(*res) + else: + return second(res) + + return wrapper + + +def monkeypatch(supervisord: Supervisor) -> None: + """Inject ourselves into supervisord code""" + + # append notify socket handler to event loopo + supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord)) + + # prepend timeout handler to transition method + Subprocess.transition = chain(process_transition, Subprocess.transition) + + # add environment variable $NOTIFY_SOCKET to starting processes + Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child) + + # keep references to starting subprocesses + subscribe(ProcessStateEvent, keep_track_of_starting_processes) + + +def make_rpcinterface(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return monkeypatch(supervisord) + + # this method is called by supervisord when loading the plugin, + # it should return XML-RPC object, which we don't care about + # That's why why are returning just None return None diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 b/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 index ee505369e..2887ae00e 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 @@ -4,6 +4,7 @@ directory = {{ config.workdir }} nodaemon = false logfile = {{ config.logfile }} logfile_maxbytes = 50MB +loglevel = trace {# user=root #} [unix_http_server] @@ -19,18 +20,30 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin:make_rpcinterface -{% for instance in instances %} - -[program:{{ instance.id }}] -type={{ instance.type }} +[program:kresd] +process_name=%(program_name)s%(process_num)d +numprocs={{ kresd.max_procs }} redirect_stderr=false -stdout_logfile={{ instance.logfile }} -stderr_logfile={{ instance.logfile }} -directory={{ instance.workdir }} -command={{ instance.command }} +stdout_logfile={{ kresd.logfile }} +stderr_logfile={{ kresd.logfile }} +directory={{ kresd.workdir }} +command={{ kresd.command }} autostart=false autorestart=true -startsecs=10 # this is what makes start operation slow, it would be better replaced with something similar to systemd notify -environment={{ instance.environment }},NOTIFY_SOCKET="@knot-resolver-control-socket" +stopsignal=TERM +killasgroup=true +startsecs=1 +environment={{ kresd.environment }} -{%- endfor -%} \ No newline at end of file +[program:gc] +redirect_stderr=false +stdout_logfile={{ gc.logfile }} +stderr_logfile={{ gc.logfile }} +directory={{ gc.workdir }} +command={{ gc.command }} +autostart=false +autorestart=true +stopsignal=TERM +killasgroup=true +startsecs=1 +environment={{ gc.environment }} \ No newline at end of file diff --git a/manager/setup.py b/manager/setup.py index 3512111b4..cc67ae585 100644 --- a/manager/setup.py +++ b/manager/setup.py @@ -9,6 +9,7 @@ packages = \ 'knot_resolver_manager.datamodel.types', 'knot_resolver_manager.kresd_controller', 'knot_resolver_manager.kresd_controller.supervisord', + 'knot_resolver_manager.kresd_controller.supervisord.plugin', 'knot_resolver_manager.kresd_controller.systemd', 'knot_resolver_manager.utils']