From: Lennart Poettering Date: Thu, 23 Apr 2026 07:29:20 +0000 (+0200) Subject: varlink-util: add generic code that calls out to a 'hook' directory of sockets X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0a8560eed873a5f89487630a19db550fdbee3c15;p=thirdparty%2Fsystemd.git varlink-util: add generic code that calls out to a 'hook' directory of sockets --- diff --git a/src/libsystemd/sd-varlink/varlink-util.c b/src/libsystemd/sd-varlink/varlink-util.c index 8b61627c562..475bec40d84 100644 --- a/src/libsystemd/sd-varlink/varlink-util.c +++ b/src/libsystemd/sd-varlink/varlink-util.c @@ -1,10 +1,17 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ +#include "sd-event.h" +#include "sd-varlink.h" + #include "alloc-util.h" #include "errno-util.h" +#include "fd-util.h" #include "log.h" +#include "path-util.h" #include "pidref.h" +#include "recurse-dir.h" #include "set.h" +#include "socket-util.h" #include "string-util.h" #include "varlink-internal.h" #include "varlink-util.h" @@ -214,3 +221,164 @@ DEFINE_HASH_OPS_WITH_VALUE_DESTRUCTOR( trivial_compare_func, sd_varlink, sd_varlink_unref); + +static int varlink_finish_idle(Set *s) { + int r; + + sd_varlink *vl; + bool fully_idle = true; + SET_FOREACH(vl, s) { + r = sd_varlink_is_idle(vl); + if (r < 0) + return r; + if (r == 0) + fully_idle = false; + else { + /* Idle? Then we can close the connection, and release some resources. */ + assert_se(set_remove(s, vl) == vl); + vl = sd_varlink_close_unref(vl); + } + } + + return fully_idle; +} + +#define VARLINK_EXECUTE_SOCKETS_MAX 255 + +ssize_t varlink_execute_directory( + const char *path, + const char *method, + sd_json_variant *parameters, + bool more, + usec_t timeout_usec, + sd_varlink_reply_t reply, + void *userdata) { + + int r; + + assert(path); + assert(method); + + /* Invokes the specified method on all Varlink sockets in the specified directory. Any reply + * will be dispatched to the reply callback. Blocks until the last reply has come in. + * + * Returns how many sockets were contacted. + * + * Usecase for all of this: hook directories, where components can link their sockets into to get + * notified about certain system events. */ + + _cleanup_close_ int fd = open(path, O_RDONLY|O_CLOEXEC|O_DIRECTORY); + if (fd < 0) + return log_debug_errno(errno, "Failed to open '%s': %m", path); + + _cleanup_free_ DirectoryEntries *dentries = NULL; + r = readdir_all(fd, RECURSE_DIR_IGNORE_DOT|RECURSE_DIR_ENSURE_TYPE, &dentries); + if (r < 0) + return log_debug_errno(r, "Failed to enumerate '%s': %m", path); + + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + _cleanup_(set_freep) Set *links = NULL; + size_t t = 0; + FOREACH_ARRAY(dp, dentries->entries, dentries->n_entries) { + struct dirent *de = *dp; + + if (de->d_type != DT_SOCK) + continue; + + t++; + + _cleanup_free_ char *j = path_join(path, de->d_name); + if (!j) + return log_oom_debug(); + + if (set_size(links) >= VARLINK_EXECUTE_SOCKETS_MAX) { + log_debug("Too many sockets (%zu) in directory, skipping '%s'.", t, j); + continue; + } + + _cleanup_close_ int socket_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (socket_fd < 0) + return log_debug_errno(errno, "Failed to allocate AF_UNIX/SOCK_STREAM socket: %m"); + + r = connect_unix_path(socket_fd, fd, de->d_name); + if (r < 0) { + log_debug_errno(r, "Failed to connect to '%s', ignoring: %m", j); + continue; + } + + if (!event) { + r = sd_event_new(&event); + if (r < 0) + return log_debug_errno(r, "Failed to allocate event loop: %m"); + } + + _cleanup_(sd_varlink_unrefp) sd_varlink *link = NULL; + r = sd_varlink_connect_fd(&link, socket_fd); + if (r < 0) + return log_debug_errno(r, "Failed to allocate Varlink connection: %m"); + + TAKE_FD(socket_fd); + + r = sd_varlink_attach_event(link, event, /* priority= */ 0); + if (r < 0) + return log_debug_errno(r, "Failed to attach varlink connection to event loop: %m"); + + sd_varlink_set_userdata(link, userdata); + + r = sd_varlink_bind_reply(link, reply); + if (r < 0) + return log_debug_errno(r, "Failed to bind reply callback: %m"); + + r = sd_varlink_set_description(link, j); + if (r < 0) + return log_debug_errno(r, "Failed to set description: %m"); + + r = sd_varlink_set_relative_timeout(link, timeout_usec); + if (r < 0) + return r; + + if (more) + r = sd_varlink_observe(link, method, parameters); + else + r = sd_varlink_invoke(link, method, parameters); + if (r < 0) + return log_debug_errno(r, "Failed to enqueue message on Varlink connection: %m"); + + if (set_ensure_consume(&links, &varlink_hash_ops, TAKE_PTR(link)) < 0) + return log_oom_debug(); + } + + size_t c = set_size(links); + + for (;;) { + if (event) { + int state = sd_event_get_state(event); + if (state < 0) + return state; + if (state == SD_EVENT_FINISHED) { + int x; + r = sd_event_get_exit_code(event, &x); + if (r < 0) + return r; + if (x != 0) + return x; + + break; + } + } + + r = varlink_finish_idle(links); + if (r < 0) + return r; + if (r > 0) + break; /* idle, we are done */ + + assert(event); + + r = sd_event_run(event, /* timeout= */ UINT64_MAX); + if (r < 0) + return r; + } + + return (ssize_t) c; +} diff --git a/src/libsystemd/sd-varlink/varlink-util.h b/src/libsystemd/sd-varlink/varlink-util.h index ba0f2322535..d6ecb03c545 100644 --- a/src/libsystemd/sd-varlink/varlink-util.h +++ b/src/libsystemd/sd-varlink/varlink-util.h @@ -29,3 +29,5 @@ int varlink_server_new( int varlink_check_privileged_peer(sd_varlink *vl); extern const struct hash_ops varlink_hash_ops; + +ssize_t varlink_execute_directory(const char *path, const char *method, sd_json_variant *parameters, bool more, usec_t timeout_usec, sd_varlink_reply_t reply, void *userdata); diff --git a/src/test/test-varlink.c b/src/test/test-varlink.c index 1bbc87c32c0..8cdbfafaa2a 100644 --- a/src/test/test-varlink.c +++ b/src/test/test-varlink.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "sd-event.h" @@ -14,6 +15,7 @@ #include "io-util.h" #include "json-util.h" #include "memfd-util.h" +#include "path-util.h" #include "rm-rf.h" #include "socket-util.h" #include "tests.h" @@ -936,4 +938,111 @@ TEST(upgrade_pipelining) { ASSERT_OK(-pthread_join(t, NULL)); } +typedef struct ExecDirServer { + sd_varlink_server *server; + sd_event *event; + const char *name; + pthread_t thread; +} ExecDirServer; + +static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + ExecDirServer *srv = ASSERT_PTR(userdata); + + return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("name", srv->name)); +} + +static void on_execute_dir_disconnect(sd_varlink_server *s, sd_varlink *link, void *userdata) { + ExecDirServer *srv = ASSERT_PTR(userdata); + + /* Only one client (from varlink_execute_directory()) connects per server — once it's gone, we're done. */ + ASSERT_OK(sd_event_exit(srv->event, 0)); +} + +static void *execute_dir_server_thread(void *arg) { + ExecDirServer *srv = arg; + + ASSERT_OK(sd_event_loop(srv->event)); + return NULL; +} + +static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + size_t *count = ASSERT_PTR(userdata); + + ASSERT_NULL(error_id); + ASSERT_NOT_NULL(sd_json_variant_by_key(parameters, "name")); + + (*count)++; + return 0; +} + +TEST(execute_directory) { + _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL; + static const char * const names[] = { "alpha", "beta", "gamma" }; + ExecDirServer servers[ELEMENTSOF(names)] = {}; + size_t reply_count = 0; + + ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir)); + + for (size_t i = 0; i < ELEMENTSOF(names); i++) { + ExecDirServer *eds = servers + i; + servers[i].name = names[i]; + + _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i])); + + ASSERT_OK(sd_event_new(&eds->event)); + ASSERT_OK(varlink_server_new(&eds->server, + SD_VARLINK_SERVER_INHERIT_USERDATA, + eds)); + ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping)); + ASSERT_OK(sd_varlink_server_bind_disconnect(eds->server, on_execute_dir_disconnect)); + ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600)); + ASSERT_OK(sd_varlink_server_attach_event(eds->server, eds->event, 0)); + + ASSERT_OK(-pthread_create(&eds->thread, NULL, execute_dir_server_thread, eds)); + } + + ASSERT_OK_EQ(varlink_execute_directory( + tmpdir, + "io.test.ExecDirPing", + /* parameters= */ NULL, + /* more= */ false, + /* timeout_usec= */ USEC_INFINITY, + execute_dir_reply, + &reply_count), (ssize_t) ELEMENTSOF(names)); + ASSERT_EQ(reply_count, (unsigned) ELEMENTSOF(names)); + + FOREACH_ELEMENT(eds, servers) { + ASSERT_OK(-pthread_join(eds->thread, NULL)); + eds->server = sd_varlink_server_unref(eds->server); + eds->event = sd_event_unref(eds->event); + } + + /* Calling the helper against a non-existent directory must fail. */ + _cleanup_free_ char *nope = NULL; + ASSERT_OK(asprintf(&nope, "%s/does-not-exist", tmpdir)); + ASSERT_FAIL(varlink_execute_directory( + nope, + "io.test.ExecDirPing", + /* parameters= */ NULL, + /* more= */ false, + /* timeout_usec= */ USEC_INFINITY, + execute_dir_reply, + &reply_count)); + + /* An empty directory must simply return 0 and not invoke the reply callback. */ + _cleanup_free_ char *empty = ASSERT_PTR(path_join(tmpdir, "empty")); + ASSERT_OK_ERRNO(mkdir(empty, 0755)); + + size_t count_before = reply_count; + ASSERT_OK_ZERO(varlink_execute_directory( + empty, + "io.test.ExecDirPing", + /* parameters= */ NULL, + /* more= */ false, + /* timeout_usec= */ USEC_INFINITY, + execute_dir_reply, + &reply_count)); + ASSERT_EQ(reply_count, count_before); +} + DEFINE_TEST_MAIN(LOG_DEBUG);