]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: experimental implementation of supervisord extension to support sd_notify() manager-supervisord-systemd-monkeypatch
authorVasek Sraier <git@vakabus.cz>
Wed, 25 May 2022 13:03:14 +0000 (15:03 +0200)
committerVasek Sraier <git@vakabus.cz>
Wed, 25 May 2022 13:03:14 +0000 (15:03 +0200)
contains:
- python module written in C, because Python does not support socket auxiliary messages like SCM_CREDENTIALS
- XML-RPC extension for supervisord, which actually does not do anything except for injecting code into supervisord internals

manager/etc/knot-resolver/config.dev.yml
manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile [new file with mode: 0644]
manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py [new file with mode: 0644]
manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c [new file with mode: 0644]
manager/knot_resolver_manager/kresd_controller/supervisord/supervisord.conf.j2

index 2adfd962949515f1553cfa462e4ab2f5dde1329e..a744ca345134c67dd75aab06530e46449505ed3d 100644 (file)
@@ -8,6 +8,8 @@ network:
   listen:
     - interface: 127.0.0.1@5353
 server:
+  backend: supervisord
+  watchdog: false
   id: dev
   workers: 1
   rundir: etc/knot-resolver/runtime
diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/Makefile
new file mode 100644 (file)
index 0000000..dcc66da
--- /dev/null
@@ -0,0 +1,11 @@
+# !!! Do not use this in production !!!!!
+# This makefile is a hack to simplify initial development
+#
+# Use variant of this instead:
+# https://docs.python.org/3/extending/building.html#building
+
+notify.so: notify.o
+       gcc -shared notify.o -L/usr/local/lib -o notify.so
+
+notify.o: notifymodule.c
+       gcc -DNDEBUG -g -O3 -Wall -Wstrict-prototypes -fPIC -I/usr/local/include -I/usr/include/python3.10 -c notifymodule.c -o notify.o
\ No newline at end of file
diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/__init__.py
new file mode 100644 (file)
index 0000000..2f9f7df
--- /dev/null
@@ -0,0 +1,128 @@
+import os
+import signal
+from typing import Any, Dict, List, Optional, Tuple
+
+from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
+from supervisor.medusa.asyncore_25 import compact_traceback
+from supervisor.options import ServerOptions
+from supervisor.process import Subprocess
+from supervisor.states import ProcessStates
+from supervisor.supervisord import Supervisor
+
+from knot_resolver_manager.kresd_controller.supervisord.plugin import notify
+
+starting_processes: List[Subprocess] = []
+
+
+class NotifySocketDispatcher:
+    """
+    See supervisor.dispatcher
+    """
+
+    def __init__(self, supervisor: Supervisor, fd: int):
+        self._supervisor = supervisor
+        self.fd = fd
+        self.closed = False  # True if close() has been called
+
+    def __repr__(self):
+        return "<%s with fd=%s>" % (self.__class__.__name__, self.fd)
+
+    def readable(self):
+        return True
+
+    def writable(self):
+        return False
+
+    def handle_read_event(self):
+        res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
+        if res is None:
+            return None  # there was some junk
+        pid, data = res
+
+        if data.startswith(b"READY=1"):
+            # some process is really ready
+            global starting_processes
+
+            for proc in starting_processes:
+                if proc.pid == pid:
+                    break
+            else:
+                print(f"[notify] we've got ready notification from some unknown pid={pid}")
+                return None
+
+            print("[notify] received ready notification, marking as RUNNING")
+            proc._assertInState(ProcessStates.STARTING)
+            proc.change_state(ProcessStates.RUNNING)
+        else:
+            # we got some junk
+            print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'")
+            return None
+
+    def handle_write_event(self):
+        raise ValueError("this dispatcher is not writable")
+
+    def handle_error(self):
+        nil, t, v, tbinfo = compact_traceback()
+
+        print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo))
+        self.close()
+
+    def close(self):
+        if not self.closed:
+            os.close(self.fd)
+            self.closed = True
+
+    def flush(self):
+        pass
+
+
+def on_process_state_change(event: ProcessStateEvent) -> None:
+    global starting_processes
+
+    proc: Subprocess = event.process
+
+    if isinstance(event, ProcessStateStartingEvent):
+        # process is starting
+        # if proc not in starting_processes:
+        starting_processes.append(proc)
+    else:
+        # not starting
+        starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
+
+
+notify_dispatcher: Optional[NotifySocketDispatcher] = None
+
+
+def monkeypatch(supervisord: Supervisor) -> None:
+    """We inject ourselves into supervisord code:
+
+    - inject our notify socket to the event loop
+    - subscribe to all state change events
+    """
+
+    old: Any = supervisord.get_process_map
+
+    def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]:
+        global notify_dispatcher
+        if notify_dispatcher is None:
+            notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+            supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
+
+        # call the old method
+        res = old(*args, **kwargs)
+
+        # add our dispatcher to the result
+        assert notify_dispatcher.fd not in res
+        res[notify_dispatcher.fd] = notify_dispatcher
+
+        return res
+
+    supervisord.get_process_map = get_process_map
+
+    # subscribe to events
+    subscribe(ProcessStateEvent, on_process_state_change)
+
+
+def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any:
+    monkeypatch(supervisord)
+    return None
diff --git a/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c b/manager/knot_resolver_manager/kresd_controller/supervisord/plugin/notifymodule.c
new file mode 100644 (file)
index 0000000..4d647d5
--- /dev/null
@@ -0,0 +1,173 @@
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <stddef.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#define CONTROL_SOCKET_NAME "knot-resolver-control-socket"
+#define NOTIFY_SOCKET_NAME "NOTIFY_SOCKET"
+#define MODULE_NAME "notify"
+#define RECEIVE_BUFFER_SIZE 2048
+
+static PyObject *NotifySocketError;
+
+static PyObject *init_control_socket(PyObject *self, PyObject *args)
+{
+       /* create socket */
+       int controlfd = socket(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0);
+       if (controlfd == -1) {
+               PyErr_SetFromErrno(NotifySocketError);
+               return NULL;
+       }
+
+       /* create address */
+       struct sockaddr_un server_addr;
+       bzero(&server_addr, sizeof(server_addr));
+       server_addr.sun_family = AF_UNIX;
+       server_addr.sun_path[0] = '\0'; // mark it as abstract namespace socket
+       strcpy(server_addr.sun_path + 1, CONTROL_SOCKET_NAME);
+       size_t addr_len = offsetof(struct sockaddr_un, sun_path) +
+                         strlen(CONTROL_SOCKET_NAME) + 1;
+
+       /* bind to the address */
+       int res = bind(controlfd, (struct sockaddr *)&server_addr, addr_len);
+       if (res < 0) {
+               PyErr_SetFromErrno(NotifySocketError);
+               return NULL;
+       }
+
+       /* make sure that we are send credentials */
+       int data = (int)true;
+       res = setsockopt(controlfd, SOL_SOCKET, SO_PASSCRED, &data,
+                        sizeof(data));
+       if (res < 0) {
+               PyErr_SetFromErrno(NotifySocketError);
+               return NULL;
+       }
+
+       /* store the name of the socket in env to fake systemd */
+       char *old_value = getenv(NOTIFY_SOCKET_NAME);
+       if (old_value != NULL) {
+               printf("[notify_socket] warning, running under systemd and overwriting $%s\n",
+                      NOTIFY_SOCKET_NAME);
+               // fixme
+       }
+
+       res = setenv(NOTIFY_SOCKET_NAME, "@" CONTROL_SOCKET_NAME, 1);
+       if (res < 0) {
+               PyErr_SetFromErrno(NotifySocketError);
+               return NULL;
+       }
+
+       return PyLong_FromLong((long)controlfd);
+}
+
+static PyObject *handle_control_socket_connection_event(PyObject *self,
+                                                       PyObject *args)
+{
+       long controlfd;
+       if (!PyArg_ParseTuple(args, "i", &controlfd))
+               return NULL;
+
+       /* read command assuming it fits and it was sent all at once */
+       // prepare space to read filedescriptors
+       struct msghdr msg;
+       msg.msg_name = NULL;
+       msg.msg_namelen = 0;
+
+       // prepare a place to read the actual message
+       char place_for_data[RECEIVE_BUFFER_SIZE];
+       bzero(&place_for_data, sizeof(place_for_data));
+       struct iovec iov = { .iov_base = &place_for_data,
+                            .iov_len = sizeof(place_for_data) };
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+       char cmsg[CMSG_SPACE(sizeof(struct ucred))];
+       msg.msg_control = cmsg;
+       msg.msg_controllen = sizeof(cmsg);
+
+       /* Receive real plus ancillary data */
+       int len = recvmsg(controlfd, &msg, 0);
+       if (len == -1) {
+               if (errno == EWOULDBLOCK || errno == EAGAIN) {
+                       Py_RETURN_NONE;
+               } else {
+                       PyErr_SetFromErrno(NotifySocketError);
+                       return NULL;
+               }
+       }
+
+       /* read the sender pid */
+       struct cmsghdr *cmsgp = CMSG_FIRSTHDR(&msg);
+       pid_t pid = -1;
+       while (cmsgp != NULL) {
+               if (cmsgp->cmsg_type == SCM_CREDENTIALS) {
+                       assert(cmsgp->cmsg_len ==
+                              CMSG_LEN(sizeof(struct ucred)));
+                       assert(cmsgp->cmsg_level == SOL_SOCKET);
+
+                       struct ucred cred;
+                       memcpy(&cred, CMSG_DATA(cmsgp), sizeof(cred));
+                       pid = cred.pid;
+               }
+               cmsgp = CMSG_NXTHDR(&msg, cmsgp);
+       }
+       if (pid == -1) {
+               printf("[notify_socket] ignoring received data without credentials: %s\n",
+                      place_for_data);
+               Py_RETURN_NONE;
+       }
+
+       /* return received data as a tuple (pid, data bytes) */
+       return Py_BuildValue("iy", pid, place_for_data);
+}
+
+static PyMethodDef NotifyMethods[] = {
+       { "init_socket", init_control_socket, METH_VARARGS,
+         "Init notify socket. Returns it's file descriptor." },
+       { "read_message", handle_control_socket_connection_event, METH_VARARGS,
+         "Reads datagram from notify socket. Returns tuple of PID and received bytes." },
+       { NULL, NULL, 0, NULL } /* Sentinel */
+};
+
+static struct PyModuleDef notifymodule = {
+       PyModuleDef_HEAD_INIT, MODULE_NAME, /* name of module */
+       NULL, /* module documentation, may be NULL */
+       -1, /* size of per-interpreter state of the module,
+           or -1 if the module keeps state in global variables. */
+       NotifyMethods
+};
+
+PyMODINIT_FUNC PyInit_notify(void)
+{
+       PyObject *m;
+
+       m = PyModule_Create(&notifymodule);
+       if (m == NULL)
+               return NULL;
+
+       NotifySocketError =
+               PyErr_NewException(MODULE_NAME ".error", NULL, NULL);
+       Py_XINCREF(NotifySocketError);
+       if (PyModule_AddObject(m, "error", NotifySocketError) < 0) {
+               Py_XDECREF(NotifySocketError);
+               Py_CLEAR(NotifySocketError);
+               Py_DECREF(m);
+               return NULL;
+       }
+
+       return m;
+}
\ No newline at end of file
index 84c7c55c322599c75f3ca1f380e49edeb5214a45..1782e54b0ff40c3b1e0793c87d9890e9be2288d6 100644 (file)
@@ -15,7 +15,8 @@ serverurl = unix://{{ config.unix_http_server }}
 [rpcinterface:supervisor]
 supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 
-
+[rpcinterface:notify]
+supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin:make_rpcinterface
 
 
 {% for instance in instances %}
@@ -27,6 +28,7 @@ stdout_logfile={{ instance.logfile }}
 stderr_logfile={{ instance.logfile }}
 directory={{ instance.workdir }}
 command={{ instance.command }}
-environment={{ instance.environment }}
+environment={{ instance.environment }},NOTIFY_SOCKET="@knot-resolver-control-socket"
+startsecs=10
 
 {%- endfor -%}
\ No newline at end of file