--- /dev/null
+import os
+import signal
+from typing import Any, Dict, List, Optional, Tuple
+
+from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
+from supervisor.medusa.asyncore_25 import compact_traceback
+from supervisor.options import ServerOptions
+from supervisor.process import Subprocess
+from supervisor.states import ProcessStates
+from supervisor.supervisord import Supervisor
+
+from knot_resolver_manager.kresd_controller.supervisord.plugin import notify
+
+starting_processes: List[Subprocess] = []
+
+
+class NotifySocketDispatcher:
+ """
+ See supervisor.dispatcher
+ """
+
+ def __init__(self, supervisor: Supervisor, fd: int):
+ self._supervisor = supervisor
+ self.fd = fd
+ self.closed = False # True if close() has been called
+
+ def __repr__(self):
+ return "<%s with fd=%s>" % (self.__class__.__name__, self.fd)
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def handle_read_event(self):
+ res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
+ if res is None:
+ return None # there was some junk
+ pid, data = res
+
+ if data.startswith(b"READY=1"):
+ # some process is really ready
+ global starting_processes
+
+ for proc in starting_processes:
+ if proc.pid == pid:
+ break
+ else:
+ print(f"[notify] we've got ready notification from some unknown pid={pid}")
+ return None
+
+ print("[notify] received ready notification, marking as RUNNING")
+ proc._assertInState(ProcessStates.STARTING)
+ proc.change_state(ProcessStates.RUNNING)
+ else:
+ # we got some junk
+ print(f"[notify] we've got some junk on the socket from pid={pid}: '{data!r}'")
+ return None
+
+ def handle_write_event(self):
+ raise ValueError("this dispatcher is not writable")
+
+ def handle_error(self):
+ nil, t, v, tbinfo = compact_traceback()
+
+ print("uncaptured python exception, closing notify socket %s (%s:%s %s)" % (repr(self), t, v, tbinfo))
+ self.close()
+
+ def close(self):
+ if not self.closed:
+ os.close(self.fd)
+ self.closed = True
+
+ def flush(self):
+ pass
+
+
+def on_process_state_change(event: ProcessStateEvent) -> None:
+ global starting_processes
+
+ proc: Subprocess = event.process
+
+ if isinstance(event, ProcessStateStartingEvent):
+ # process is starting
+ # if proc not in starting_processes:
+ starting_processes.append(proc)
+ else:
+ # not starting
+ starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
+
+
+notify_dispatcher: Optional[NotifySocketDispatcher] = None
+
+
+def monkeypatch(supervisord: Supervisor) -> None:
+ """We inject ourselves into supervisord code:
+
+ - inject our notify socket to the event loop
+ - subscribe to all state change events
+ """
+
+ old: Any = supervisord.get_process_map
+
+ def get_process_map(*args: Any, **kwargs: Any) -> Dict[Any, Any]:
+ global notify_dispatcher
+ if notify_dispatcher is None:
+ notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
+ supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
+
+ # call the old method
+ res = old(*args, **kwargs)
+
+ # add our dispatcher to the result
+ assert notify_dispatcher.fd not in res
+ res[notify_dispatcher.fd] = notify_dispatcher
+
+ return res
+
+ supervisord.get_process_map = get_process_map
+
+ # subscribe to events
+ subscribe(ProcessStateEvent, on_process_state_change)
+
+
+def make_rpcinterface(supervisord: Supervisor, **config: Any) -> Any:
+ monkeypatch(supervisord)
+ return None
--- /dev/null
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <stddef.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#define CONTROL_SOCKET_NAME "knot-resolver-control-socket"
+#define NOTIFY_SOCKET_NAME "NOTIFY_SOCKET"
+#define MODULE_NAME "notify"
+#define RECEIVE_BUFFER_SIZE 2048
+
+static PyObject *NotifySocketError;
+
+static PyObject *init_control_socket(PyObject *self, PyObject *args)
+{
+ /* create socket */
+ int controlfd = socket(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0);
+ if (controlfd == -1) {
+ PyErr_SetFromErrno(NotifySocketError);
+ return NULL;
+ }
+
+ /* create address */
+ struct sockaddr_un server_addr;
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sun_family = AF_UNIX;
+ server_addr.sun_path[0] = '\0'; // mark it as abstract namespace socket
+ strcpy(server_addr.sun_path + 1, CONTROL_SOCKET_NAME);
+ size_t addr_len = offsetof(struct sockaddr_un, sun_path) +
+ strlen(CONTROL_SOCKET_NAME) + 1;
+
+ /* bind to the address */
+ int res = bind(controlfd, (struct sockaddr *)&server_addr, addr_len);
+ if (res < 0) {
+ PyErr_SetFromErrno(NotifySocketError);
+ return NULL;
+ }
+
+ /* make sure that we are send credentials */
+ int data = (int)true;
+ res = setsockopt(controlfd, SOL_SOCKET, SO_PASSCRED, &data,
+ sizeof(data));
+ if (res < 0) {
+ PyErr_SetFromErrno(NotifySocketError);
+ return NULL;
+ }
+
+ /* store the name of the socket in env to fake systemd */
+ char *old_value = getenv(NOTIFY_SOCKET_NAME);
+ if (old_value != NULL) {
+ printf("[notify_socket] warning, running under systemd and overwriting $%s\n",
+ NOTIFY_SOCKET_NAME);
+ // fixme
+ }
+
+ res = setenv(NOTIFY_SOCKET_NAME, "@" CONTROL_SOCKET_NAME, 1);
+ if (res < 0) {
+ PyErr_SetFromErrno(NotifySocketError);
+ return NULL;
+ }
+
+ return PyLong_FromLong((long)controlfd);
+}
+
+static PyObject *handle_control_socket_connection_event(PyObject *self,
+ PyObject *args)
+{
+ long controlfd;
+ if (!PyArg_ParseTuple(args, "i", &controlfd))
+ return NULL;
+
+ /* read command assuming it fits and it was sent all at once */
+ // prepare space to read filedescriptors
+ struct msghdr msg;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+ // prepare a place to read the actual message
+ char place_for_data[RECEIVE_BUFFER_SIZE];
+ bzero(&place_for_data, sizeof(place_for_data));
+ struct iovec iov = { .iov_base = &place_for_data,
+ .iov_len = sizeof(place_for_data) };
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ char cmsg[CMSG_SPACE(sizeof(struct ucred))];
+ msg.msg_control = cmsg;
+ msg.msg_controllen = sizeof(cmsg);
+
+ /* Receive real plus ancillary data */
+ int len = recvmsg(controlfd, &msg, 0);
+ if (len == -1) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ Py_RETURN_NONE;
+ } else {
+ PyErr_SetFromErrno(NotifySocketError);
+ return NULL;
+ }
+ }
+
+ /* read the sender pid */
+ struct cmsghdr *cmsgp = CMSG_FIRSTHDR(&msg);
+ pid_t pid = -1;
+ while (cmsgp != NULL) {
+ if (cmsgp->cmsg_type == SCM_CREDENTIALS) {
+ assert(cmsgp->cmsg_len ==
+ CMSG_LEN(sizeof(struct ucred)));
+ assert(cmsgp->cmsg_level == SOL_SOCKET);
+
+ struct ucred cred;
+ memcpy(&cred, CMSG_DATA(cmsgp), sizeof(cred));
+ pid = cred.pid;
+ }
+ cmsgp = CMSG_NXTHDR(&msg, cmsgp);
+ }
+ if (pid == -1) {
+ printf("[notify_socket] ignoring received data without credentials: %s\n",
+ place_for_data);
+ Py_RETURN_NONE;
+ }
+
+ /* return received data as a tuple (pid, data bytes) */
+ return Py_BuildValue("iy", pid, place_for_data);
+}
+
+static PyMethodDef NotifyMethods[] = {
+ { "init_socket", init_control_socket, METH_VARARGS,
+ "Init notify socket. Returns it's file descriptor." },
+ { "read_message", handle_control_socket_connection_event, METH_VARARGS,
+ "Reads datagram from notify socket. Returns tuple of PID and received bytes." },
+ { NULL, NULL, 0, NULL } /* Sentinel */
+};
+
+static struct PyModuleDef notifymodule = {
+ PyModuleDef_HEAD_INIT, MODULE_NAME, /* name of module */
+ NULL, /* module documentation, may be NULL */
+ -1, /* size of per-interpreter state of the module,
+ or -1 if the module keeps state in global variables. */
+ NotifyMethods
+};
+
+PyMODINIT_FUNC PyInit_notify(void)
+{
+ PyObject *m;
+
+ m = PyModule_Create(¬ifymodule);
+ if (m == NULL)
+ return NULL;
+
+ NotifySocketError =
+ PyErr_NewException(MODULE_NAME ".error", NULL, NULL);
+ Py_XINCREF(NotifySocketError);
+ if (PyModule_AddObject(m, "error", NotifySocketError) < 0) {
+ Py_XDECREF(NotifySocketError);
+ Py_CLEAR(NotifySocketError);
+ Py_DECREF(m);
+ return NULL;
+ }
+
+ return m;
+}
\ No newline at end of file