]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: initial support for inverted process tree (manager running under a subproces...
authorVasek Sraier <git@vakabus.cz>
Thu, 7 Jul 2022 10:53:36 +0000 (12:53 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Jul 2022 13:04:38 +0000 (13:04 +0000)
manager/etc/knot-resolver/config.dev.yml
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/config_schema.py
manager/knot_resolver_manager/exceptions.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py
manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/utils/systemd_notify.py [new file with mode: 0644]

index d4176ef27f6606315eaf18e40ed52eeaa018b764..02fe1250a5dd2018989dad54fa715c2ae859152c 100644 (file)
@@ -1,4 +1,3 @@
-id: dev
 rundir: etc/knot-resolver/runtime
 workers: 1
 management:
index ef30974250bbb8a074d11e531cbc9e8fbef74cad..4c2194c93f765b0fa9d2e45e1791fece752eb2f1 100644 (file)
@@ -73,31 +73,17 @@ class _UserConstants:
     Class for accessing constants, which are technically not constants as they are user configurable.
     """
 
-    def __init__(self, config_store: "ConfigStore") -> None:
+    def __init__(self, config_store: "ConfigStore", working_directory_on_startup: str) -> None:
         self._config_store = config_store
-
-    @property
-    def ID(self) -> str:
-        return str(self._config_store.get().id)
+        self.working_directory_on_startup = working_directory_on_startup
 
 
 _user_constants: Optional[_UserConstants] = None
 
 
-async def _deny_id_changes(config_old: "KresConfig", config_new: "KresConfig") -> Result[None, str]:
-    if config_old.id != config_new.id:
-        return Result.err(
-            "/id: Based on the ID, the manager recognizes subprocesses,"
-            " so it is not possible to change it while services are running."
-        )
-    return Result.ok(None)
-
-
-async def init_user_constants(config_store: "ConfigStore") -> None:
+async def init_user_constants(config_store: "ConfigStore", working_directory_on_startup: str) -> None:
     global _user_constants
-    _user_constants = _UserConstants(config_store)
-
-    await config_store.register_verifier(_deny_id_changes)
+    _user_constants = _UserConstants(config_store, working_directory_on_startup)
 
 
 def user_constants() -> _UserConstants:
index 86fab7a250ddc9adf9940d1699cf8966edbd243c..75d733b14844d19e18beee3c9ddb2cfe44ccae45 100644 (file)
@@ -85,7 +85,6 @@ class KresConfig(SchemaNode):
         Knot Resolver declarative configuration.
 
         ---
-        id: System-wide unique identifier of this instance. Used for grouping logs and tagging workers.
         nsid: Name Server Identifier (RFC 5001) which allows DNS clients to request resolver to send back its NSID along with the reply to a DNS request.
         hostname: Internal DNS resolver hostname. Default is machine hostname.
         rundir: Directory where the resolver can create files and which will be it's cwd.
@@ -110,7 +109,6 @@ class KresConfig(SchemaNode):
         lua: Custom Lua configuration.
         """
 
-        id: IDPattern
         nsid: Optional[str] = None
         hostname: Optional[str] = None
         rundir: UncheckedPath = UncheckedPath(".")
@@ -136,7 +134,6 @@ class KresConfig(SchemaNode):
 
     _PREVIOUS_SCHEMA = Raw
 
-    id: IDPattern
     nsid: Optional[str]
     hostname: str
     rundir: UncheckedPath
index c899dba93fe61be72383d22c1dba62ceb76ebaaa..0509038318039a3db176910c97f9352dcb5a9119 100644 (file)
@@ -1,6 +1,18 @@
 from typing import Iterable, List
 
 
+class CancelStartupExecInsteadException(Exception):
+    """
+    Exception used for terminating system startup and instead
+    causing an exec of something else. Could be used by subprocess
+    controllers such as supervisord to allow them to run as top-level
+    process in a process tree.
+    """
+    def __init__(self, exec_args: List[str], *args: object) -> None:
+        self.exec_args = exec_args
+        super().__init__(*args)
+
+
 class KresManagerException(Exception):
     """
     Base class for all custom exceptions we use in our code
index 57942ca2322aeecfd37884b1348eee6b34d74f26..dcc4d0299efcee7fc01b63ecd098f40ff5d6cb76 100644 (file)
@@ -1,8 +1,8 @@
 import logging
-from os import kill
+from os import execl, kill
 from pathlib import Path
 from time import sleep
-from typing import Any, Dict, Iterable, Optional, Union, cast
+from typing import Any, Dict, Iterable, NoReturn, Optional, Union, cast
 from xmlrpc.client import Fault, ServerProxy
 
 import supervisor.xmlrpc  # type: ignore[import]
@@ -10,7 +10,7 @@ import supervisor.xmlrpc  # type: ignore[import]
 from knot_resolver_manager.compat.asyncio import async_in_a_thread
 from knot_resolver_manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
 from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.exceptions import SubprocessControllerException
+from knot_resolver_manager.exceptions import CancelStartupExecInsteadException, SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import (
     KresID,
     Subprocess,
@@ -34,6 +34,13 @@ async def _start_supervisord(config: KresConfig) -> None:
         raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
 
 
+async def _exec_supervisord(config: KresConfig) -> NoReturn:
+    logger.debug("Writing supervisord config")
+    await write_config_file(config)
+    logger.debug("Execing supervisord")
+    raise CancelStartupExecInsteadException([str(which.which("supervisord")), "supervisord", "--configuration", str(supervisord_config_file(config).absolute()), "-n"])
+
+
 async def _reload_supervisord(config: KresConfig) -> None:
     await write_config_file(config)
     res = await call(f'supervisorctl --configuration="{supervisord_config_file(config).absolute()}" update', shell=True)
@@ -138,6 +145,10 @@ def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, Su
             status = SubprocessStatus.UNKNOWN
         return status
 
+    # there will be a manager process as well, but we don't want to report anything on ourselves
+    processes = [pr for pr in processes if pr["name"] != "manager"]
+
+    # convert all the names
     return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
 
 
@@ -216,7 +227,8 @@ class SupervisordSubprocessController(SubprocessController):
         self._controller_config = config
 
         if not await _is_supervisord_running(config):
-            await _start_supervisord(config)
+            #await _start_supervisord(config)
+            await _exec_supervisord(config)
         else:
             await _reload_supervisord(config)
 
index 49af21c8fb0502827eafd6b0a55b04c76992f701..346ebf5a94eea3ca76915cb505c55cdb8941dc42 100644 (file)
@@ -14,6 +14,7 @@ from knot_resolver_manager.constants import (
     supervisord_pid_file,
     supervisord_sock_file,
     supervisord_subprocess_log_dir,
+    user_constants,
 )
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
@@ -75,6 +76,20 @@ class ProcessTypeConfig:
             max_procs=config.max_workers,
         )
 
+    @staticmethod
+    def create_manager_config(config: KresConfig) -> "ProcessTypeConfig":
+        # read original command from /proc
+        with open("/proc/self/cmdline", 'rb') as f:
+            args = [s.decode('utf-8') for s in f.read()[:-1].split(b'\0')]
+        cmd = '"' + '" "'.join(args) + '"'
+
+        return ProcessTypeConfig(  # type: ignore[call-arg]
+            workdir=user_constants().working_directory_on_startup,
+            command=cmd,
+            environment='X-SUPERVISORD-TYPE=notify',
+            logfile=""  # this will be ignored
+        )
+
 
 @dataclass
 class SupervisordConfig:
@@ -104,6 +119,7 @@ async def write_config_file(config: KresConfig) -> None:
     config_string = Template(template).render(  # pyright: reportUnknownMemberType=false
         gc=ProcessTypeConfig.create_gc_config(config),
         kresd=ProcessTypeConfig.create_kresd_config(config),
+        manager=ProcessTypeConfig.create_manager_config(config),
         config=SupervisordConfig.create(config),
     )
     await writefile(supervisord_config_file_tmp(config), config_string)
index 19f11a26e56869fa7aa209b586e2166f549e9516..c78ecbed2cbb6ba6f0fb117d70671f39eab097b2 100644 (file)
@@ -4,7 +4,7 @@ directory = {{ config.workdir }}
 nodaemon = false
 logfile = {{ config.logfile }}
 logfile_maxbytes = 50MB
-loglevel = trace
+loglevel = info
 {# user=root #}
 
 [unix_http_server]
@@ -22,6 +22,18 @@ supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervi
 [rpcinterface:fast]
 supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.fast_rpcinterface:make_main_rpcinterface
 
+[program:manager]
+redirect_stderr=false
+stdout_logfile=/proc/self/fd/1
+stdout_logfile_maxbytes=0
+directory={{ manager.workdir }}
+command={{ manager.command }}
+stopsignal=SIGINT
+killasgroup=true
+autorestart=true
+autostart=true
+startsecs=5
+environment={{ manager.environment }}
 
 [program:kresd]
 process_name=%(program_name)s%(process_num)d
index 6ad64c582f3041722ece239fe526f3b58f14a728..bbca9e183895b1059877093227487ec8ed663552 100644 (file)
@@ -22,12 +22,13 @@ from knot_resolver_manager.config_store import ConfigStore
 from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.datamodel.management_schema import ManagementSchema
-from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
+from knot_resolver_manager.exceptions import CancelStartupExecInsteadException, DataException, KresManagerException, SchemaException
 from knot_resolver_manager.kresd_controller import get_best_controller_implementation
 from knot_resolver_manager.utils.async_utils import readfile
 from knot_resolver_manager.utils.functional import Result
 from knot_resolver_manager.utils.parsing import ParsedTree, parse, parse_yaml
 from knot_resolver_manager.utils.types import NoneType
+from knot_resolver_manager.utils.systemd_notify import systemd_notify
 
 from .kres_manager import KresManager
 
@@ -304,7 +305,6 @@ async def _load_config(config: ParsedTree) -> KresConfig:
 async def _init_config_store(config: ParsedTree) -> ConfigStore:
     config_validated = await _load_config(config)
     config_store = ConfigStore(config_validated)
-    await init_user_constants(config_store)
     return config_store
 
 
@@ -392,6 +392,7 @@ async def _sigterm_while_shutting_down():
 
 async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
     start_time = time()
+    working_directory_on_startup = os.getcwd()
     manager: Optional[KresManager] = None
 
     # Block signals during initialization to force their processing once everything is ready
@@ -422,6 +423,9 @@ async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_
         # After the working directory is set, we can initialize proper config store with a newly parsed configuration.
         config_store = await _init_config_store(config_raw)
 
+        # Some "constants" need to be loaded from the initial config, some need to be stored from the initial run conditions
+        await init_user_constants(config_store, working_directory_on_startup)
+
         # This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
         # It would cause strange problems because every other path configuration depends on it. Therefore, we have to
         # add a check to the config store, which disallows changes.
@@ -440,9 +444,28 @@ async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_
 
         # After we have loaded the configuration, we can start worring about subprocess management.
         manager = await _init_manager(config_store, server)
+
+    except CancelStartupExecInsteadException as e:
+        # if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
+        # be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
+        # the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
+        # we'll exec will invoke us again.
+        logger.info("Exec requested with arguments: %s", str(e.exec_args))
+
+        # unblock signals, this could actually terminate us straight away
+        signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
+
+        # run exit functions
+        atexit._run_exitfuncs()
+
+        # and finally exec what we we told to exec
+        os.execl(*e.exec_args)
+
     except KresManagerException as e:
+        # We caught an error with a pretty error message. Just print it and exit.
         logger.error(e)
         return 1
+
     except BaseException:
         logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
         return 1
@@ -466,6 +489,9 @@ async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_
 
     logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
 
+    # notify systemd/anything compatible that we are ready
+    systemd_notify(READY="1")
+
     await server.wait_for_shutdown()
 
     # Ok, now we are tearing everything down.
diff --git a/manager/knot_resolver_manager/utils/systemd_notify.py b/manager/knot_resolver_manager/utils/systemd_notify.py
new file mode 100644 (file)
index 0000000..44e8dee
--- /dev/null
@@ -0,0 +1,54 @@
+import enum
+import logging
+import os
+import socket
+
+logger = logging.getLogger(__name__)
+
+
+class _Status(enum.Enum):
+    NOT_INITIALIZED = 1
+    FUNCTIONAL = 2
+    FAILED = 3
+
+
+_status = _Status.NOT_INITIALIZED
+_socket = None
+
+
+def systemd_notify(**values: str) -> None:
+    global _status
+    global _socket
+
+    if _status is _Status.NOT_INITIALIZED:
+        socket_addr = os.getenv("NOTIFY_SOCKET")
+        os.unsetenv("NOTIFY_SOCKET")
+        if socket_addr is None:
+            _status = _Status.FAILED
+            return
+        if socket_addr.startswith("@"):
+            socket_addr = socket_addr.replace("@", "\0", 1)
+
+        try:
+            _socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+            _socket.connect(socket_addr)
+            _status = _Status.FUNCTIONAL
+        except Exception:
+            _socket = None
+            _status = _Status.FAILED
+            logger.warning(f"Failed to connect to $NOTIFY_SOCKET at '{socket_addr}'", exc_info=True)
+            return
+
+    elif _status is _Status.FAILED:
+        return
+
+    if _status is _Status.FUNCTIONAL:
+        assert _socket is not None
+        payload = "\n".join((f"{key}={value}" for key, value in values.items()))
+        try:
+            _socket.send(payload.encode("utf8"))
+        except Exception:
+            logger.warning("Failed to send notification to systemd", exc_info=True)
+            _status = _Status.FAILED
+            _socket.close()
+            _socket = None