]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
made supervisord sd_notify() plugin properly functional + supervisord config changes
authorVasek Sraier <git@vakabus.cz>
Fri, 17 Jun 2022 13:03:43 +0000 (15:03 +0200)
committerVaclav Sraier <vaclav.sraier@nic.cz>
Fri, 24 Jun 2022 13:22:07 +0000 (13:22 +0000)
- X-SUPERVISORD-TYPE=notify in a process's environment should make the process behave similarly to Type=notify systemd service units
- startsec with the above means time, after which it will get killed without ready notification

manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py
manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2
manager/setup.py

index 7d181ef7649bea8a587fbafbe6cd072629cdb52a..ef30974250bbb8a074d11e531cbc9e8fbef74cad 100644 (file)
@@ -34,6 +34,10 @@ def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path:
     return Path(f"{kres_id}.conf")
 
 
+def kresd_config_file_supervisord_pattern(_config: "KresConfig") -> Path:
+    return Path("kresd%(process_num)d.conf")
+
+
 def supervisord_config_file(_config: "KresConfig") -> Path:
     return Path("supervisord.conf")
 
index 1c6f91f63ed9552ba5b0ef334078cdbc71db3c94..de6066153e3544534ae73c8c6307fc14413360e4 100644 (file)
@@ -175,7 +175,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 #   if it keeps running, the config is valid and others will soon join as well
                 #   if it crashes and the startup fails, then well, it's not running anymore... :)
                 await self._spawn_new_worker(new)
-            except SubprocessError:
+            except (SubprocessError, SubprocessControllerException):
                 logger.error("kresd with the new config failed to start, rejecting config")
                 return Result.err("Canary kresd instance failed to start. Config is invalid.")
 
index 24967673e5c45aa7a9d1cd2dd565d1c5104b8e2c..07d68f76f2cd1027022a138ff05d4d2ec5c9c7a7 100644 (file)
@@ -101,5 +101,5 @@ async def get_controller_by_name(config: KresConfig, name: str) -> SubprocessCon
 
 
 # run the imports on module load
-try_systemd()
 try_supervisord()
+try_systemd()
index 0f84ee5a9cc3573bae8965bb041c394c62b1318a..2fa571f8dabd7c1f91721656a4cafbaa5023203a 100644 (file)
@@ -1,8 +1,9 @@
 import logging
 from os import kill
 from pathlib import Path
+from time import sleep
 from typing import Any, Dict, Iterable, Optional, Union, cast
-from xmlrpc.client import ServerProxy
+from xmlrpc.client import Fault, ServerProxy
 
 import supervisor.xmlrpc  # type: ignore[import]
 
@@ -40,7 +41,14 @@ async def _reload_supervisord(config: KresConfig) -> None:
 @async_in_a_thread
 def _stop_supervisord(config: KresConfig) -> None:
     supervisord = _create_supervisord_proxy(config)
+    pid = supervisord.getPID()
     supervisord.shutdown()
+    try:
+        while True:
+            kill(pid, 0)
+            sleep(0.1)
+    except ProcessLookupError:
+        pass  # there is finally no supervisord process
     supervisord_config_file(config).unlink()
 
 
@@ -126,21 +134,30 @@ class SupervisordSubprocess(Subprocess):
             super().__init__(config, base_id)
         self._controller: "SupervisordSubprocessController" = controller
 
+    def _name(self):
+        if self.type is SubprocessType.GC:
+            return str(self.id)
+        else:
+            return f"kresd:{self.id}"
+
     @async_in_a_thread
     def _start(self) -> None:
-        supervisord = _create_supervisord_proxy(self._config)
-        supervisord.startProcess(str(self.id))
+        try:
+            supervisord = _create_supervisord_proxy(self._config)
+            supervisord.startProcess(self._name())
+        except Fault as e:
+            raise SubprocessControllerException(f"failed to start '{self.id}'") from e
 
     @async_in_a_thread
     def _stop(self) -> None:
         supervisord = _create_supervisord_proxy(self._config)
-        supervisord.stopProcess(str(self.id))
+        supervisord.stopProcess(self._name())
 
     @async_in_a_thread
     def _restart(self) -> None:
         supervisord = _create_supervisord_proxy(self._config)
-        supervisord.stopProcess(str(self.id))
-        supervisord.startProcess(str(self.id))
+        supervisord.stopProcess(self._name())
+        supervisord.startProcess(self._name())
 
     def get_used_config(self) -> KresConfig:
         return self._config
index 10075e6a85bb5d2c6f977d93e61e5f86ae18380b..49af21c8fb0502827eafd6b0a55b04c76992f701 100644 (file)
@@ -1,5 +1,4 @@
 import os
-from typing import List
 
 from jinja2 import Template
 
@@ -7,7 +6,7 @@ from knot_resolver_manager.compat.dataclasses import dataclass
 from knot_resolver_manager.constants import (
     kres_gc_executable,
     kresd_cache_dir,
-    kresd_config_file,
+    kresd_config_file_supervisord_pattern,
     kresd_executable,
     supervisord_config_file,
     supervisord_config_file_tmp,
@@ -43,48 +42,38 @@ class SupervisordKresID(KresID):
             raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
 
 
-def _get_command_based_on_type(config: KresConfig, i: "SupervisordKresID") -> str:
-    if i.subprocess_type is SubprocessType.KRESD:
-        return f"{kresd_executable()} -c {kresd_config_file(config, i)} -n"
-    elif i.subprocess_type is SubprocessType.GC:
-        return f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000"
-    else:
-        raise NotImplementedError("This subprocess type is not supported")
-
-
 @dataclass
-class _Instance:
+class ProcessTypeConfig:
     """
     Data structure holding data for supervisord config template
     """
 
-    type: str
     logfile: str
-    id: str
     workdir: str
     command: str
     environment: str
+    max_procs: int = 1
 
     @staticmethod
-    def create_list(config: KresConfig) -> List["_Instance"]:
+    def create_gc_config(config: KresConfig) -> "ProcessTypeConfig":
         cwd = str(os.getcwd())
+        return ProcessTypeConfig(  # type: ignore[call-arg]
+            logfile=supervisord_subprocess_log_dir(config) / "gc.log",
+            workdir=cwd,
+            command=f"{kres_gc_executable()} -c {kresd_cache_dir(config)} -d 1000",
+            environment="",
+        )
 
-        instances = [
-            SupervisordKresID(SubprocessType.KRESD, i, _i_know_what_i_am_doing=True)
-            for i in range(1, int(config.max_workers) + 1)
-        ] + [SupervisordKresID(SubprocessType.GC, -1, _i_know_what_i_am_doing=True)]
-
-        return [
-            _Instance(  # type: ignore[call-arg]
-                type=i.subprocess_type.name,
-                logfile=supervisord_subprocess_log_dir(config) / f"{i}.log",
-                id=str(i),
-                workdir=cwd,
-                command=_get_command_based_on_type(config, i),
-                environment=f"SYSTEMD_INSTANCE={i}",
-            )
-            for i in instances
-        ]
+    @staticmethod
+    def create_kresd_config(config: KresConfig) -> "ProcessTypeConfig":
+        cwd = str(os.getcwd())
+        return ProcessTypeConfig(  # type: ignore[call-arg]
+            logfile=supervisord_subprocess_log_dir(config) / "kresd%(process_num)d.log",
+            workdir=cwd,
+            command=f"{kresd_executable()} -c {kresd_config_file_supervisord_pattern(config)} -n",
+            environment='SYSTEMD_INSTANCE="%(process_num)d",X-SUPERVISORD-TYPE=notify',
+            max_procs=config.max_workers,
+        )
 
 
 @dataclass
@@ -113,7 +102,8 @@ async def write_config_file(config: KresConfig) -> None:
     assert template is not None
     template = template.decode("utf8")
     config_string = Template(template).render(  # pyright: reportUnknownMemberType=false
-        instances=_Instance.create_list(config),
+        gc=ProcessTypeConfig.create_gc_config(config),
+        kresd=ProcessTypeConfig.create_kresd_config(config),
         config=SupervisordConfig.create(config),
     )
     await writefile(supervisord_config_file_tmp(config), config_string)
index 2f9f7df8703651a6618da71360cad5ba9b08435f..96f98a4866e2bbf54b05c13d81b87423e54d2c83 100644 (file)
@@ -1,10 +1,14 @@
+# type: ignore
+# pylint: disable=protected-access
+# pylint: disable=c-extension-no-member
 import os
 import signal
-from typing import Any, Dict, List, Optional, Tuple
+import time
+from functools import partial
+from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
 
 from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
 from supervisor.medusa.asyncore_25 import compact_traceback
-from supervisor.options import ServerOptions
 from supervisor.process import Subprocess
 from supervisor.states import ProcessStates
 from supervisor.supervisord import Supervisor
@@ -14,6 +18,10 @@ from knot_resolver_manager.kresd_controller.supervisord.plugin import notify
 starting_processes: List[Subprocess] = []
 
 
+def is_type_notify(proc: Subprocess) -> bool:
+    return proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
+
+
 class NotifySocketDispatcher:
     """
     See supervisor.dispatcher
@@ -25,7 +33,7 @@ class NotifySocketDispatcher:
         self.closed = False  # True if close() has been called
 
     def __repr__(self):
-        return "<%s with fd=%s>" % (self.__class__.__name__, self.fd)
+        return f"<{self.__class__.__name__} with fd={self.fd}>"
 
     def readable(self):
         return True
@@ -34,37 +42,47 @@ class NotifySocketDispatcher:
         return False
 
     def handle_read_event(self):
+        logger: Any = self._supervisor.options.logger
+
         res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
         if res is None:
             return None  # there was some junk
         pid, data = res
 
         if data.startswith(b"READY=1"):
-            # some process is really ready
-            global starting_processes
+            # handle case, when some process is really ready
 
+            # pylint: disable=undefined-loop-variable
             for proc in starting_processes:
                 if proc.pid == pid:
                     break
             else:
-                print(f"[notify] we've got ready notification from some unknown pid={pid}")
+                logger.warn(f"ignoring ready notification from unregistered PID={pid}")
                 return None
 
-            print("[notify] received ready notification, marking as RUNNING")
-            proc._assertInState(ProcessStates.STARTING)
-            proc.change_state(ProcessStates.RUNNING)
+            if is_type_notify(proc):
+                proc._assertInState(ProcessStates.STARTING)
+                proc.change_state(ProcessStates.RUNNING)
+                logger.info(
+                    f"success: {proc.config.name} entered RUNNING state, process sent ready notification via $NOTIFY_SOCKET"
+                )
+            else:
+                logger.warn(f"ignoring ready notification from {proc.config.name}, which is not configured to send it")
+
         else:
-            # we got some junk
-            print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'")
+            # handle case, when we got something unexpected
+            logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
             return None
 
     def handle_write_event(self):
         raise ValueError("this dispatcher is not writable")
 
     def handle_error(self):
-        nil, t, v, tbinfo = compact_traceback()
+        _nil, t, v, tbinfo = compact_traceback()
 
-        print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo))
+        self._supervisor.options.logger.error(
+            f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
+        )
         self.close()
 
     def close(self):
@@ -76,7 +94,7 @@ class NotifySocketDispatcher:
         pass
 
 
-def on_process_state_change(event: ProcessStateEvent) -> None:
+def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
     global starting_processes
 
     proc: Subprocess = event.process
@@ -93,36 +111,80 @@ def on_process_state_change(event: ProcessStateEvent) -> None:
 notify_dispatcher: Optional[NotifySocketDispatcher] = None
 
 
-def monkeypatch(supervisord: Supervisor) -> None:
-    """We inject ourselves into supervisord code:
+def process_transition(slf: Subprocess) -> None:
+    if not is_type_notify(slf):
+        return slf
+
+    # modified version of upstream process transition code
+    if slf.state == ProcessStates.STARTING:
+        if time.time() - slf.laststart > slf.config.startsecs:
+            # STARTING -> STOPPING if the process has not sent ready notification
+            # within proc.config.startsecs
+            slf.config.options.logger.warn(
+                f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
+            )
+            slf.kill(signal.SIGKILL)
+            slf.laststart = time.time()  # prevent further state transition from happening
+
+    # return self for chaining
+    return slf
 
-    - inject our notify socket to the event loop
-    - subscribe to all state change events
-    """
 
-    old: Any = supervisord.get_process_map
+def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
+    global notify_dispatcher
+    if notify_dispatcher is None:
+        notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+        supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
 
-    def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]:
-        global notify_dispatcher
-        if notify_dispatcher is None:
-            notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
-            supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
+    # add our dispatcher to the result
+    assert notify_dispatcher.fd not in mp
+    mp[notify_dispatcher.fd] = notify_dispatcher
 
-        # call the old method
-        res = old(*args, **kwargs)
+    return mp
 
-        # add our dispatcher to the result
-        assert notify_dispatcher.fd not in res
-        res[notify_dispatcher.fd] = notify_dispatcher
 
-        return res
+def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
+    if is_type_notify(slf):
+        slf.config.environment["NOTIFY_SOCKET"] = "@knot-resolver-control-socket"
+        slf.config.options.logger.info("setting env")
+    return (slf, *args)
 
-    supervisord.get_process_map = get_process_map
 
-    # subscribe to events
-    subscribe(ProcessStateEvent, on_process_state_change)
+T = TypeVar("T")
+U = TypeVar("U")
 
 
-def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any:
+def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
+    def wrapper(*args: Any, **kwargs: Any) -> T:
+        res = first(*args, **kwargs)
+        if isinstance(res, tuple):
+            return second(*res)
+        else:
+            return second(res)
+
+    return wrapper
+
+
+def monkeypatch(supervisord: Supervisor) -> None:
+    """Inject ourselves into supervisord code"""
+
+    # append notify socket handler to event loopo
+    supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
+
+    # prepend timeout handler to transition method
+    Subprocess.transition = chain(process_transition, Subprocess.transition)
+
+    # add environment variable $NOTIFY_SOCKET to starting processes
+    Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
+
+    # keep references to starting subprocesses
+    subscribe(ProcessStateEvent, keep_track_of_starting_processes)
+
+
+def make_rpcinterface(supervisord: Supervisor, **_config: Any) -> Any:  # pylint: disable=useless-return
     monkeypatch(supervisord)
+
+    # this method is called by supervisord when loading the plugin,
+    # it should return XML-RPC object, which we don't care about
+    # That's why why are returning just None
     return None
index ee505369ef0f403bfe487a81ebe261d6b0107be0..2887ae00e8a270742df6d6649c7b787201ad2e08 100644 (file)
@@ -4,6 +4,7 @@ directory = {{ config.workdir }}
 nodaemon = false
 logfile = {{ config.logfile }}
 logfile_maxbytes = 50MB
+loglevel = trace
 {# user=root #}
 
 [unix_http_server]
@@ -19,18 +20,30 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin:make_rpcinterface
 
 
-{% for instance in instances %}
-
-[program:{{ instance.id }}]
-type={{ instance.type }}
+[program:kresd]
+process_name=%(program_name)s%(process_num)d
+numprocs={{ kresd.max_procs }}
 redirect_stderr=false
-stdout_logfile={{ instance.logfile }}
-stderr_logfile={{ instance.logfile }}
-directory={{ instance.workdir }}
-command={{ instance.command }}
+stdout_logfile={{ kresd.logfile }}
+stderr_logfile={{ kresd.logfile }}
+directory={{ kresd.workdir }}
+command={{ kresd.command }}
 autostart=false
 autorestart=true
-startsecs=10  # this is what makes start operation slow, it would be better replaced with something similar to systemd notify
-environment={{ instance.environment }},NOTIFY_SOCKET="@knot-resolver-control-socket"
+stopsignal=TERM
+killasgroup=true
+startsecs=1
+environment={{ kresd.environment }}
 
-{%- endfor -%}
\ No newline at end of file
+[program:gc]
+redirect_stderr=false
+stdout_logfile={{ gc.logfile }}
+stderr_logfile={{ gc.logfile }}
+directory={{ gc.workdir }}
+command={{ gc.command }}
+autostart=false
+autorestart=true
+stopsignal=TERM
+killasgroup=true
+startsecs=1
+environment={{ gc.environment }}
\ No newline at end of file
index 3512111b40255fee2fb10fe6b1dcc7ac7ecb6249..cc67ae585f28d98da866d0dcfab3b7941ee21f22 100644 (file)
@@ -9,6 +9,7 @@ packages = \
  'knot_resolver_manager.datamodel.types',
  'knot_resolver_manager.kresd_controller',
  'knot_resolver_manager.kresd_controller.supervisord',
+ 'knot_resolver_manager.kresd_controller.supervisord.plugin',
  'knot_resolver_manager.kresd_controller.systemd',
  'knot_resolver_manager.utils']