]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
varlink-util: add generic code that calls out to a 'hook' directory of sockets 41815/head
authorLennart Poettering <lennart@amutable.com>
Thu, 23 Apr 2026 07:29:20 +0000 (09:29 +0200)
committerLennart Poettering <lennart@amutable.com>
Sun, 26 Apr 2026 09:29:52 +0000 (11:29 +0200)
src/libsystemd/sd-varlink/varlink-util.c
src/libsystemd/sd-varlink/varlink-util.h
src/test/test-varlink.c

index 8b61627c562c9daee4c71704924a68f015469dab..475bec40d844f45a3fb48f7a43406dd28e1fc962 100644 (file)
@@ -1,10 +1,17 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
+#include "sd-event.h"
+#include "sd-varlink.h"
+
 #include "alloc-util.h"
 #include "errno-util.h"
+#include "fd-util.h"
 #include "log.h"
+#include "path-util.h"
 #include "pidref.h"
+#include "recurse-dir.h"
 #include "set.h"
+#include "socket-util.h"
 #include "string-util.h"
 #include "varlink-internal.h"
 #include "varlink-util.h"
@@ -214,3 +221,164 @@ DEFINE_HASH_OPS_WITH_VALUE_DESTRUCTOR(
                 trivial_compare_func,
                 sd_varlink,
                 sd_varlink_unref);
+
+static int varlink_finish_idle(Set *s) {
+        int r;
+
+        sd_varlink *vl;
+        bool fully_idle = true;
+        SET_FOREACH(vl, s) {
+                r = sd_varlink_is_idle(vl);
+                if (r < 0)
+                        return r;
+                if (r == 0)
+                        fully_idle = false;
+                else {
+                        /* Idle? Then we can close the connection, and release some resources. */
+                        assert_se(set_remove(s, vl) == vl);
+                        vl = sd_varlink_close_unref(vl);
+                }
+        }
+
+        return fully_idle;
+}
+
+#define VARLINK_EXECUTE_SOCKETS_MAX 255
+
+ssize_t varlink_execute_directory(
+                const char *path,
+                const char *method,
+                sd_json_variant *parameters,
+                bool more,
+                usec_t timeout_usec,
+                sd_varlink_reply_t reply,
+                void *userdata) {
+
+        int r;
+
+        assert(path);
+        assert(method);
+
+        /* Invokes the specified method on all Varlink sockets in the specified directory. Any reply
+         * will be dispatched to the reply callback. Blocks until the last reply has come in.
+         *
+         * Returns how many sockets were contacted.
+         *
+         * Usecase for all of this: hook directories, where components can link their sockets into to get
+         * notified about certain system events. */
+
+        _cleanup_close_ int fd = open(path, O_RDONLY|O_CLOEXEC|O_DIRECTORY);
+        if (fd < 0)
+                return log_debug_errno(errno, "Failed to open '%s': %m", path);
+
+        _cleanup_free_ DirectoryEntries *dentries = NULL;
+        r = readdir_all(fd, RECURSE_DIR_IGNORE_DOT|RECURSE_DIR_ENSURE_TYPE, &dentries);
+        if (r < 0)
+                return log_debug_errno(r, "Failed to enumerate '%s': %m", path);
+
+        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+        _cleanup_(set_freep) Set *links = NULL;
+        size_t t = 0;
+        FOREACH_ARRAY(dp, dentries->entries, dentries->n_entries) {
+                struct dirent *de = *dp;
+
+                if (de->d_type != DT_SOCK)
+                        continue;
+
+                t++;
+
+                _cleanup_free_ char *j = path_join(path, de->d_name);
+                if (!j)
+                        return log_oom_debug();
+
+                if (set_size(links) >= VARLINK_EXECUTE_SOCKETS_MAX) {
+                        log_debug("Too many sockets (%zu) in directory, skipping '%s'.", t, j);
+                        continue;
+                }
+
+                _cleanup_close_ int socket_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+                if (socket_fd < 0)
+                        return log_debug_errno(errno, "Failed to allocate AF_UNIX/SOCK_STREAM socket: %m");
+
+                r = connect_unix_path(socket_fd, fd, de->d_name);
+                if (r < 0) {
+                        log_debug_errno(r, "Failed to connect to '%s', ignoring: %m", j);
+                        continue;
+                }
+
+                if (!event) {
+                        r = sd_event_new(&event);
+                        if (r < 0)
+                                return log_debug_errno(r, "Failed to allocate event loop: %m");
+                }
+
+                _cleanup_(sd_varlink_unrefp) sd_varlink *link = NULL;
+                r = sd_varlink_connect_fd(&link, socket_fd);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to allocate Varlink connection: %m");
+
+                TAKE_FD(socket_fd);
+
+                r = sd_varlink_attach_event(link, event, /* priority= */ 0);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to attach varlink connection to event loop: %m");
+
+                sd_varlink_set_userdata(link, userdata);
+
+                r = sd_varlink_bind_reply(link, reply);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to bind reply callback: %m");
+
+                r = sd_varlink_set_description(link, j);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to set description: %m");
+
+                r = sd_varlink_set_relative_timeout(link, timeout_usec);
+                if (r < 0)
+                        return r;
+
+                if (more)
+                        r = sd_varlink_observe(link, method, parameters);
+                else
+                        r = sd_varlink_invoke(link, method, parameters);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to enqueue message on Varlink connection: %m");
+
+                if (set_ensure_consume(&links, &varlink_hash_ops, TAKE_PTR(link)) < 0)
+                        return log_oom_debug();
+        }
+
+        size_t c = set_size(links);
+
+        for (;;) {
+                if (event) {
+                        int state = sd_event_get_state(event);
+                        if (state < 0)
+                                return state;
+                        if (state == SD_EVENT_FINISHED) {
+                                int x;
+                                r = sd_event_get_exit_code(event, &x);
+                                if (r < 0)
+                                        return r;
+                                if (x != 0)
+                                        return x;
+
+                                break;
+                        }
+                }
+
+                r = varlink_finish_idle(links);
+                if (r < 0)
+                        return r;
+                if (r > 0)
+                        break; /* idle, we are done */
+
+                assert(event);
+
+                r = sd_event_run(event, /* timeout= */ UINT64_MAX);
+                if (r < 0)
+                        return r;
+        }
+
+        return (ssize_t) c;
+}
index ba0f23225356acef83375c3cc6690c16f91f29bf..d6ecb03c545332a2402d3fd72530e77d5c3dd93d 100644 (file)
@@ -29,3 +29,5 @@ int varlink_server_new(
 int varlink_check_privileged_peer(sd_varlink *vl);
 
 extern const struct hash_ops varlink_hash_ops;
+
+ssize_t varlink_execute_directory(const char *path, const char *method, sd_json_variant *parameters, bool more, usec_t timeout_usec, sd_varlink_reply_t reply, void *userdata);
index 1bbc87c32c0f942dd9350d57836eafc106af4eac..8cdbfafaa2ae9b2ad92cba05e84d14e12f71fd63 100644 (file)
@@ -4,6 +4,7 @@
 #include <poll.h>
 #include <pthread.h>
 #include <sys/socket.h>
+#include <sys/stat.h>
 #include <unistd.h>
 
 #include "sd-event.h"
@@ -14,6 +15,7 @@
 #include "io-util.h"
 #include "json-util.h"
 #include "memfd-util.h"
+#include "path-util.h"
 #include "rm-rf.h"
 #include "socket-util.h"
 #include "tests.h"
@@ -936,4 +938,111 @@ TEST(upgrade_pipelining) {
         ASSERT_OK(-pthread_join(t, NULL));
 }
 
+typedef struct ExecDirServer {
+        sd_varlink_server *server;
+        sd_event *event;
+        const char *name;
+        pthread_t thread;
+} ExecDirServer;
+
+static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        ExecDirServer *srv = ASSERT_PTR(userdata);
+
+        return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("name", srv->name));
+}
+
+static void on_execute_dir_disconnect(sd_varlink_server *s, sd_varlink *link, void *userdata) {
+        ExecDirServer *srv = ASSERT_PTR(userdata);
+
+        /* Only one client (from varlink_execute_directory()) connects per server — once it's gone, we're done. */
+        ASSERT_OK(sd_event_exit(srv->event, 0));
+}
+
+static void *execute_dir_server_thread(void *arg) {
+        ExecDirServer *srv = arg;
+
+        ASSERT_OK(sd_event_loop(srv->event));
+        return NULL;
+}
+
+static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        size_t *count = ASSERT_PTR(userdata);
+
+        ASSERT_NULL(error_id);
+        ASSERT_NOT_NULL(sd_json_variant_by_key(parameters, "name"));
+
+        (*count)++;
+        return 0;
+}
+
+TEST(execute_directory) {
+        _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+        static const char * const names[] = { "alpha", "beta", "gamma" };
+        ExecDirServer servers[ELEMENTSOF(names)] = {};
+        size_t reply_count = 0;
+
+        ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
+
+        for (size_t i = 0; i < ELEMENTSOF(names); i++) {
+                ExecDirServer *eds = servers + i;
+                servers[i].name = names[i];
+
+                _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
+
+                ASSERT_OK(sd_event_new(&eds->event));
+                ASSERT_OK(varlink_server_new(&eds->server,
+                                             SD_VARLINK_SERVER_INHERIT_USERDATA,
+                                             eds));
+                ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
+                ASSERT_OK(sd_varlink_server_bind_disconnect(eds->server, on_execute_dir_disconnect));
+                ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
+                ASSERT_OK(sd_varlink_server_attach_event(eds->server, eds->event, 0));
+
+                ASSERT_OK(-pthread_create(&eds->thread, NULL, execute_dir_server_thread, eds));
+        }
+
+        ASSERT_OK_EQ(varlink_execute_directory(
+                                     tmpdir,
+                                     "io.test.ExecDirPing",
+                                     /* parameters= */ NULL,
+                                     /* more= */ false,
+                                     /* timeout_usec= */ USEC_INFINITY,
+                                     execute_dir_reply,
+                                     &reply_count), (ssize_t) ELEMENTSOF(names));
+        ASSERT_EQ(reply_count, (unsigned) ELEMENTSOF(names));
+
+        FOREACH_ELEMENT(eds, servers) {
+                ASSERT_OK(-pthread_join(eds->thread, NULL));
+                eds->server = sd_varlink_server_unref(eds->server);
+                eds->event = sd_event_unref(eds->event);
+        }
+
+        /* Calling the helper against a non-existent directory must fail. */
+        _cleanup_free_ char *nope = NULL;
+        ASSERT_OK(asprintf(&nope, "%s/does-not-exist", tmpdir));
+        ASSERT_FAIL(varlink_execute_directory(
+                                    nope,
+                                    "io.test.ExecDirPing",
+                                    /* parameters= */ NULL,
+                                    /* more= */ false,
+                                    /* timeout_usec= */ USEC_INFINITY,
+                                    execute_dir_reply,
+                                    &reply_count));
+
+        /* An empty directory must simply return 0 and not invoke the reply callback. */
+        _cleanup_free_ char *empty = ASSERT_PTR(path_join(tmpdir, "empty"));
+        ASSERT_OK_ERRNO(mkdir(empty, 0755));
+
+        size_t count_before = reply_count;
+        ASSERT_OK_ZERO(varlink_execute_directory(
+                                       empty,
+                                       "io.test.ExecDirPing",
+                                       /* parameters= */ NULL,
+                                       /* more= */ false,
+                                       /* timeout_usec= */ USEC_INFINITY,
+                                       execute_dir_reply,
+                                       &reply_count));
+        ASSERT_EQ(reply_count, count_before);
+}
+
 DEFINE_TEST_MAIN(LOG_DEBUG);