From: Vasek Sraier Date: Wed, 25 May 2022 13:03:14 +0000 (+0200) Subject: manager: experimental implementation of supervisord extension to support sd_notify() X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fmanager-supervisord-systemd-monkeypatch;p=thirdparty%2Fknot-resolver.git manager: experimental implementation of supervisord extension to support sd_notify() contains: - python module written in C, because Python does not support socket auxiliary messages like SCM_CREDENTIALS - XML-RPC extension for supervisord, which actually does not do anything except for injecting code into supervisord internals --- diff --git a/manager/etc/knot-resolver/config.dev.yml b/manager/etc/knot-resolver/config.dev.yml index 2adfd9629..a744ca345 100644 --- a/manager/etc/knot-resolver/config.dev.yml +++ b/manager/etc/knot-resolver/config.dev.yml @@ -8,6 +8,8 @@ network: listen: - interface: 127.0.0.1@5353 server: + backend: supervisord + watchdog: false id: dev workers: 1 rundir: etc/knot-resolver/runtime diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile new file mode 100644 index 000000000..dcc66dabd --- /dev/null +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile @@ -0,0 +1,11 @@ +# !!! Do not use this in production !!!!! +# This makefile is a hack to simplify initial development +# +# Use variant of this instead: +# https://docs.python.org/3/extending/building.html#building + +notify.so: notify.o + gcc -shared notify.o -L/usr/local/lib -o notify.so + +notify.o: notifymodule.c + gcc -DNDEBUG -g -O3 -Wall -Wstrict-prototypes -fPIC -I/usr/local/include -I/usr/include/python3.10 -c notifymodule.c -o notify.o \ No newline at end of file diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py new file mode 100644 index 000000000..2f9f7df87 --- /dev/null +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py @@ -0,0 +1,128 @@ +import os +import signal +from typing import Any, Dict, List, Optional, Tuple + +from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe +from supervisor.medusa.asyncore_25 import compact_traceback +from supervisor.options import ServerOptions +from supervisor.process import Subprocess +from supervisor.states import ProcessStates +from supervisor.supervisord import Supervisor + +from knot_resolver_manager.kresd_controller.supervisord.plugin import notify + +starting_processes: List[Subprocess] = [] + + +class NotifySocketDispatcher: + """ + See supervisor.dispatcher + """ + + def __init__(self, supervisor: Supervisor, fd: int): + self._supervisor = supervisor + self.fd = fd + self.closed = False # True if close() has been called + + def __repr__(self): + return "<%s with fd=%s>" % (self.__class__.__name__, self.fd) + + def readable(self): + return True + + def writable(self): + return False + + def handle_read_event(self): + res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd) + if res is None: + return None # there was some junk + pid, data = res + + if data.startswith(b"READY=1"): + # some process is really ready + global starting_processes + + for proc in starting_processes: + if proc.pid == pid: + break + else: + print(f"[notify] we've got ready notification from some unknown pid={pid}") + return None + + print("[notify] received ready notification, marking as RUNNING") + proc._assertInState(ProcessStates.STARTING) + proc.change_state(ProcessStates.RUNNING) + else: + # we got some junk + print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'") + return None + + def handle_write_event(self): + raise ValueError("this dispatcher is not writable") + + def handle_error(self): + nil, t, v, tbinfo = compact_traceback() + + print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo)) + self.close() + + def close(self): + if not self.closed: + os.close(self.fd) + self.closed = True + + def flush(self): + pass + + +def on_process_state_change(event: ProcessStateEvent) -> None: + global starting_processes + + proc: Subprocess = event.process + + if isinstance(event, ProcessStateStartingEvent): + # process is starting + # if proc not in starting_processes: + starting_processes.append(proc) + else: + # not starting + starting_processes = [p for p in starting_processes if p.pid is not proc.pid] + + +notify_dispatcher: Optional[NotifySocketDispatcher] = None + + +def monkeypatch(supervisord: Supervisor) -> None: + """We inject ourselves into supervisord code: + + - inject our notify socket to the event loop + - subscribe to all state change events + """ + + old: Any = supervisord.get_process_map + + def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]: + global notify_dispatcher + if notify_dispatcher is None: + notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket()) + supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop") + + # call the old method + res = old(*args, **kwargs) + + # add our dispatcher to the result + assert notify_dispatcher.fd not in res + res[notify_dispatcher.fd] = notify_dispatcher + + return res + + supervisord.get_process_map = get_process_map + + # subscribe to events + subscribe(ProcessStateEvent, on_process_state_change) + + +def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any: + monkeypatch(supervisord) + return None diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c new file mode 100644 index 000000000..4d647d5ae --- /dev/null +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c @@ -0,0 +1,173 @@ +#define PY_SSIZE_T_CLEAN +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CONTROL_SOCKET_NAME "knot-resolver-control-socket" +#define NOTIFY_SOCKET_NAME "NOTIFY_SOCKET" +#define MODULE_NAME "notify" +#define RECEIVE_BUFFER_SIZE 2048 + +static PyObject *NotifySocketError; + +static PyObject *init_control_socket(PyObject *self, PyObject *args) +{ + /* create socket */ + int controlfd = socket(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0); + if (controlfd == -1) { + PyErr_SetFromErrno(NotifySocketError); + return NULL; + } + + /* create address */ + struct sockaddr_un server_addr; + bzero(&server_addr, sizeof(server_addr)); + server_addr.sun_family = AF_UNIX; + server_addr.sun_path[0] = '\0'; // mark it as abstract namespace socket + strcpy(server_addr.sun_path + 1, CONTROL_SOCKET_NAME); + size_t addr_len = offsetof(struct sockaddr_un, sun_path) + + strlen(CONTROL_SOCKET_NAME) + 1; + + /* bind to the address */ + int res = bind(controlfd, (struct sockaddr *)&server_addr, addr_len); + if (res < 0) { + PyErr_SetFromErrno(NotifySocketError); + return NULL; + } + + /* make sure that we are send credentials */ + int data = (int)true; + res = setsockopt(controlfd, SOL_SOCKET, SO_PASSCRED, &data, + sizeof(data)); + if (res < 0) { + PyErr_SetFromErrno(NotifySocketError); + return NULL; + } + + /* store the name of the socket in env to fake systemd */ + char *old_value = getenv(NOTIFY_SOCKET_NAME); + if (old_value != NULL) { + printf("[notify_socket] warning, running under systemd and overwriting $%s\n", + NOTIFY_SOCKET_NAME); + // fixme + } + + res = setenv(NOTIFY_SOCKET_NAME, "@" CONTROL_SOCKET_NAME, 1); + if (res < 0) { + PyErr_SetFromErrno(NotifySocketError); + return NULL; + } + + return PyLong_FromLong((long)controlfd); +} + +static PyObject *handle_control_socket_connection_event(PyObject *self, + PyObject *args) +{ + long controlfd; + if (!PyArg_ParseTuple(args, "i", &controlfd)) + return NULL; + + /* read command assuming it fits and it was sent all at once */ + // prepare space to read filedescriptors + struct msghdr msg; + msg.msg_name = NULL; + msg.msg_namelen = 0; + + // prepare a place to read the actual message + char place_for_data[RECEIVE_BUFFER_SIZE]; + bzero(&place_for_data, sizeof(place_for_data)); + struct iovec iov = { .iov_base = &place_for_data, + .iov_len = sizeof(place_for_data) }; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + char cmsg[CMSG_SPACE(sizeof(struct ucred))]; + msg.msg_control = cmsg; + msg.msg_controllen = sizeof(cmsg); + + /* Receive real plus ancillary data */ + int len = recvmsg(controlfd, &msg, 0); + if (len == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + Py_RETURN_NONE; + } else { + PyErr_SetFromErrno(NotifySocketError); + return NULL; + } + } + + /* read the sender pid */ + struct cmsghdr *cmsgp = CMSG_FIRSTHDR(&msg); + pid_t pid = -1; + while (cmsgp != NULL) { + if (cmsgp->cmsg_type == SCM_CREDENTIALS) { + assert(cmsgp->cmsg_len == + CMSG_LEN(sizeof(struct ucred))); + assert(cmsgp->cmsg_level == SOL_SOCKET); + + struct ucred cred; + memcpy(&cred, CMSG_DATA(cmsgp), sizeof(cred)); + pid = cred.pid; + } + cmsgp = CMSG_NXTHDR(&msg, cmsgp); + } + if (pid == -1) { + printf("[notify_socket] ignoring received data without credentials: %s\n", + place_for_data); + Py_RETURN_NONE; + } + + /* return received data as a tuple (pid, data bytes) */ + return Py_BuildValue("iy", pid, place_for_data); +} + +static PyMethodDef NotifyMethods[] = { + { "init_socket", init_control_socket, METH_VARARGS, + "Init notify socket. Returns it's file descriptor." }, + { "read_message", handle_control_socket_connection_event, METH_VARARGS, + "Reads datagram from notify socket. Returns tuple of PID and received bytes." }, + { NULL, NULL, 0, NULL } /* Sentinel */ +}; + +static struct PyModuleDef notifymodule = { + PyModuleDef_HEAD_INIT, MODULE_NAME, /* name of module */ + NULL, /* module documentation, may be NULL */ + -1, /* size of per-interpreter state of the module, + or -1 if the module keeps state in global variables. */ + NotifyMethods +}; + +PyMODINIT_FUNC PyInit_notify(void) +{ + PyObject *m; + + m = PyModule_Create(¬ifymodule); + if (m == NULL) + return NULL; + + NotifySocketError = + PyErr_NewException(MODULE_NAME ".error", NULL, NULL); + Py_XINCREF(NotifySocketError); + if (PyModule_AddObject(m, "error", NotifySocketError) < 0) { + Py_XDECREF(NotifySocketError); + Py_CLEAR(NotifySocketError); + Py_DECREF(m); + return NULL; + } + + return m; +} \ No newline at end of file diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 b/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 index 84c7c55c3..1782e54b0 100644 --- a/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 +++ b/manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2 @@ -15,7 +15,8 @@ serverurl = unix://{{ config.unix_http_server }} [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface - +[rpcinterface:notify] +supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin:make_rpcinterface {% for instance in instances %} @@ -27,6 +28,7 @@ stdout_logfile={{ instance.logfile }} stderr_logfile={{ instance.logfile }} directory={{ instance.workdir }} command={{ instance.command }} -environment={{ instance.environment }} +environment={{ instance.environment }},NOTIFY_SOCKET="@knot-resolver-control-socket" +startsecs=10 {%- endfor -%} \ No newline at end of file