-id: dev
rundir: etc/knot-resolver/runtime
workers: 1
management:
Class for accessing constants, which are technically not constants as they are user configurable.
"""
- def __init__(self, config_store: "ConfigStore") -> None:
+ def __init__(self, config_store: "ConfigStore", working_directory_on_startup: str) -> None:
self._config_store = config_store
-
- @property
- def ID(self) -> str:
- return str(self._config_store.get().id)
+ self.working_directory_on_startup = working_directory_on_startup
_user_constants: Optional[_UserConstants] = None
-async def _deny_id_changes(config_old: "KresConfig", config_new: "KresConfig") -> Result[None, str]:
- if config_old.id != config_new.id:
- return Result.err(
- "/id: Based on the ID, the manager recognizes subprocesses,"
- " so it is not possible to change it while services are running."
- )
- return Result.ok(None)
-
-
-async def init_user_constants(config_store: "ConfigStore") -> None:
+async def init_user_constants(config_store: "ConfigStore", working_directory_on_startup: str) -> None:
global _user_constants
- _user_constants = _UserConstants(config_store)
-
- await config_store.register_verifier(_deny_id_changes)
+ _user_constants = _UserConstants(config_store, working_directory_on_startup)
def user_constants() -> _UserConstants:
Knot Resolver declarative configuration.
---
- id: System-wide unique identifier of this instance. Used for grouping logs and tagging workers.
nsid: Name Server Identifier (RFC 5001) which allows DNS clients to request resolver to send back its NSID along with the reply to a DNS request.
hostname: Internal DNS resolver hostname. Default is machine hostname.
rundir: Directory where the resolver can create files and which will be it's cwd.
lua: Custom Lua configuration.
"""
- id: IDPattern
nsid: Optional[str] = None
hostname: Optional[str] = None
rundir: UncheckedPath = UncheckedPath(".")
_PREVIOUS_SCHEMA = Raw
- id: IDPattern
nsid: Optional[str]
hostname: str
rundir: UncheckedPath
from typing import Iterable, List
+class CancelStartupExecInsteadException(Exception):
+ """
+ Exception used for terminating system startup and instead
+ causing an exec of something else. Could be used by subprocess
+ controllers such as supervisord to allow them to run as top-level
+ process in a process tree.
+ """
+ def __init__(self, exec_args: List[str], *args: object) -> None:
+ self.exec_args = exec_args
+ super().__init__(*args)
+
+
class KresManagerException(Exception):
"""
Base class for all custom exceptions we use in our code
import logging
-from os import kill
+from os import execl, kill
from pathlib import Path
from time import sleep
-from typing import Any, Dict, Iterable, Optional, Union, cast
+from typing import Any, Dict, Iterable, NoReturn, Optional, Union, cast
from xmlrpc.client import Fault, ServerProxy
import supervisor.xmlrpc # type: ignore[import]
from knot_resolver_manager.compat.asyncio import async_in_a_thread
from knot_resolver_manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.exceptions import SubprocessControllerException
+from knot_resolver_manager.exceptions import CancelStartupExecInsteadException, SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import (
KresID,
Subprocess,
raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
+async def _exec_supervisord(config: KresConfig) -> NoReturn:
+ logger.debug("Writing supervisord config")
+ await write_config_file(config)
+ logger.debug("Execing supervisord")
+ raise CancelStartupExecInsteadException([str(which.which("supervisord")), "supervisord", "--configuration", str(supervisord_config_file(config).absolute()), "-n"])
+
+
async def _reload_supervisord(config: KresConfig) -> None:
await write_config_file(config)
res = await call(f'supervisorctl --configuration="{supervisord_config_file(config).absolute()}" update', shell=True)
status = SubprocessStatus.UNKNOWN
return status
+ # there will be a manager process as well, but we don't want to report anything on ourselves
+ processes = [pr for pr in processes if pr["name"] != "manager"]
+
+ # convert all the names
return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
self._controller_config = config
if not await _is_supervisord_running(config):
- await _start_supervisord(config)
+ #await _start_supervisord(config)
+ await _exec_supervisord(config)
else:
await _reload_supervisord(config)
supervisord_pid_file,
supervisord_sock_file,
supervisord_subprocess_log_dir,
+ user_constants,
)
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
max_procs=config.max_workers,
)
+ @staticmethod
+ def create_manager_config(config: KresConfig) -> "ProcessTypeConfig":
+ # read original command from /proc
+ with open("/proc/self/cmdline", 'rb') as f:
+ args = [s.decode('utf-8') for s in f.read()[:-1].split(b'\0')]
+ cmd = '"' + '" "'.join(args) + '"'
+
+ return ProcessTypeConfig( # type: ignore[call-arg]
+ workdir=user_constants().working_directory_on_startup,
+ command=cmd,
+ environment='X-SUPERVISORD-TYPE=notify',
+ logfile="" # this will be ignored
+ )
+
@dataclass
class SupervisordConfig:
config_string = Template(template).render( # pyright: reportUnknownMemberType=false
gc=ProcessTypeConfig.create_gc_config(config),
kresd=ProcessTypeConfig.create_kresd_config(config),
+ manager=ProcessTypeConfig.create_manager_config(config),
config=SupervisordConfig.create(config),
)
await writefile(supervisord_config_file_tmp(config), config_string)
nodaemon = false
logfile = {{ config.logfile }}
logfile_maxbytes = 50MB
-loglevel = trace
+loglevel = info
{# user=root #}
[unix_http_server]
[rpcinterface:fast]
supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.fast_rpcinterface:make_main_rpcinterface
+[program:manager]
+redirect_stderr=false
+stdout_logfile=/proc/self/fd/1
+stdout_logfile_maxbytes=0
+directory={{ manager.workdir }}
+command={{ manager.command }}
+stopsignal=SIGINT
+killasgroup=true
+autorestart=true
+autostart=true
+startsecs=5
+environment={{ manager.environment }}
[program:kresd]
process_name=%(program_name)s%(process_num)d
from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.management_schema import ManagementSchema
-from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
+from knot_resolver_manager.exceptions import CancelStartupExecInsteadException, DataException, KresManagerException, SchemaException
from knot_resolver_manager.kresd_controller import get_best_controller_implementation
from knot_resolver_manager.utils.async_utils import readfile
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.parsing import ParsedTree, parse, parse_yaml
from knot_resolver_manager.utils.types import NoneType
+from knot_resolver_manager.utils.systemd_notify import systemd_notify
from .kres_manager import KresManager
async def _init_config_store(config: ParsedTree) -> ConfigStore:
config_validated = await _load_config(config)
config_store = ConfigStore(config_validated)
- await init_user_constants(config_store)
return config_store
async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
start_time = time()
+ working_directory_on_startup = os.getcwd()
manager: Optional[KresManager] = None
# Block signals during initialization to force their processing once everything is ready
# After the working directory is set, we can initialize proper config store with a newly parsed configuration.
config_store = await _init_config_store(config_raw)
+ # Some "constants" need to be loaded from the initial config, some need to be stored from the initial run conditions
+ await init_user_constants(config_store, working_directory_on_startup)
+
# This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
# It would cause strange problems because every other path configuration depends on it. Therefore, we have to
# add a check to the config store, which disallows changes.
# After we have loaded the configuration, we can start worring about subprocess management.
manager = await _init_manager(config_store, server)
+
+ except CancelStartupExecInsteadException as e:
+ # if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
+ # be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
+ # the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
+ # we'll exec will invoke us again.
+ logger.info("Exec requested with arguments: %s", str(e.exec_args))
+
+ # unblock signals, this could actually terminate us straight away
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
+
+ # run exit functions
+ atexit._run_exitfuncs()
+
+ # and finally exec what we we told to exec
+ os.execl(*e.exec_args)
+
except KresManagerException as e:
+ # We caught an error with a pretty error message. Just print it and exit.
logger.error(e)
return 1
+
except BaseException:
logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
return 1
logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
+ # notify systemd/anything compatible that we are ready
+ systemd_notify(READY="1")
+
await server.wait_for_shutdown()
# Ok, now we are tearing everything down.
--- /dev/null
+import enum
+import logging
+import os
+import socket
+
+logger = logging.getLogger(__name__)
+
+
+class _Status(enum.Enum):
+ NOT_INITIALIZED = 1
+ FUNCTIONAL = 2
+ FAILED = 3
+
+
+_status = _Status.NOT_INITIALIZED
+_socket = None
+
+
+def systemd_notify(**values: str) -> None:
+ global _status
+ global _socket
+
+ if _status is _Status.NOT_INITIALIZED:
+ socket_addr = os.getenv("NOTIFY_SOCKET")
+ os.unsetenv("NOTIFY_SOCKET")
+ if socket_addr is None:
+ _status = _Status.FAILED
+ return
+ if socket_addr.startswith("@"):
+ socket_addr = socket_addr.replace("@", "\0", 1)
+
+ try:
+ _socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ _socket.connect(socket_addr)
+ _status = _Status.FUNCTIONAL
+ except Exception:
+ _socket = None
+ _status = _Status.FAILED
+ logger.warning(f"Failed to connect to $NOTIFY_SOCKET at '{socket_addr}'", exc_info=True)
+ return
+
+ elif _status is _Status.FAILED:
+ return
+
+ if _status is _Status.FUNCTIONAL:
+ assert _socket is not None
+ payload = "\n".join((f"{key}={value}" for key, value in values.items()))
+ try:
+ _socket.send(payload.encode("utf8"))
+ except Exception:
+ logger.warning("Failed to send notification to systemd", exc_info=True)
+ _status = _Status.FAILED
+ _socket.close()
+ _socket = None