/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#include "sd-event.h"
+#include "sd-varlink.h"
+
#include "alloc-util.h"
#include "errno-util.h"
+#include "fd-util.h"
#include "log.h"
+#include "path-util.h"
#include "pidref.h"
+#include "recurse-dir.h"
#include "set.h"
+#include "socket-util.h"
#include "string-util.h"
#include "varlink-internal.h"
#include "varlink-util.h"
trivial_compare_func,
sd_varlink,
sd_varlink_unref);
+
+static int varlink_finish_idle(Set *s) {
+ int r;
+
+ sd_varlink *vl;
+ bool fully_idle = true;
+ SET_FOREACH(vl, s) {
+ r = sd_varlink_is_idle(vl);
+ if (r < 0)
+ return r;
+ if (r == 0)
+ fully_idle = false;
+ else {
+ /* Idle? Then we can close the connection, and release some resources. */
+ assert_se(set_remove(s, vl) == vl);
+ vl = sd_varlink_close_unref(vl);
+ }
+ }
+
+ return fully_idle;
+}
+
+#define VARLINK_EXECUTE_SOCKETS_MAX 255
+
+ssize_t varlink_execute_directory(
+ const char *path,
+ const char *method,
+ sd_json_variant *parameters,
+ bool more,
+ usec_t timeout_usec,
+ sd_varlink_reply_t reply,
+ void *userdata) {
+
+ int r;
+
+ assert(path);
+ assert(method);
+
+ /* Invokes the specified method on all Varlink sockets in the specified directory. Any reply
+ * will be dispatched to the reply callback. Blocks until the last reply has come in.
+ *
+ * Returns how many sockets were contacted.
+ *
+ * Usecase for all of this: hook directories, where components can link their sockets into to get
+ * notified about certain system events. */
+
+ _cleanup_close_ int fd = open(path, O_RDONLY|O_CLOEXEC|O_DIRECTORY);
+ if (fd < 0)
+ return log_debug_errno(errno, "Failed to open '%s': %m", path);
+
+ _cleanup_free_ DirectoryEntries *dentries = NULL;
+ r = readdir_all(fd, RECURSE_DIR_IGNORE_DOT|RECURSE_DIR_ENSURE_TYPE, &dentries);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to enumerate '%s': %m", path);
+
+ _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+ _cleanup_(set_freep) Set *links = NULL;
+ size_t t = 0;
+ FOREACH_ARRAY(dp, dentries->entries, dentries->n_entries) {
+ struct dirent *de = *dp;
+
+ if (de->d_type != DT_SOCK)
+ continue;
+
+ t++;
+
+ _cleanup_free_ char *j = path_join(path, de->d_name);
+ if (!j)
+ return log_oom_debug();
+
+ if (set_size(links) >= VARLINK_EXECUTE_SOCKETS_MAX) {
+ log_debug("Too many sockets (%zu) in directory, skipping '%s'.", t, j);
+ continue;
+ }
+
+ _cleanup_close_ int socket_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+ if (socket_fd < 0)
+ return log_debug_errno(errno, "Failed to allocate AF_UNIX/SOCK_STREAM socket: %m");
+
+ r = connect_unix_path(socket_fd, fd, de->d_name);
+ if (r < 0) {
+ log_debug_errno(r, "Failed to connect to '%s', ignoring: %m", j);
+ continue;
+ }
+
+ if (!event) {
+ r = sd_event_new(&event);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to allocate event loop: %m");
+ }
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *link = NULL;
+ r = sd_varlink_connect_fd(&link, socket_fd);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to allocate Varlink connection: %m");
+
+ TAKE_FD(socket_fd);
+
+ r = sd_varlink_attach_event(link, event, /* priority= */ 0);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to attach varlink connection to event loop: %m");
+
+ sd_varlink_set_userdata(link, userdata);
+
+ r = sd_varlink_bind_reply(link, reply);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to bind reply callback: %m");
+
+ r = sd_varlink_set_description(link, j);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to set description: %m");
+
+ r = sd_varlink_set_relative_timeout(link, timeout_usec);
+ if (r < 0)
+ return r;
+
+ if (more)
+ r = sd_varlink_observe(link, method, parameters);
+ else
+ r = sd_varlink_invoke(link, method, parameters);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to enqueue message on Varlink connection: %m");
+
+ if (set_ensure_consume(&links, &varlink_hash_ops, TAKE_PTR(link)) < 0)
+ return log_oom_debug();
+ }
+
+ size_t c = set_size(links);
+
+ for (;;) {
+ if (event) {
+ int state = sd_event_get_state(event);
+ if (state < 0)
+ return state;
+ if (state == SD_EVENT_FINISHED) {
+ int x;
+ r = sd_event_get_exit_code(event, &x);
+ if (r < 0)
+ return r;
+ if (x != 0)
+ return x;
+
+ break;
+ }
+ }
+
+ r = varlink_finish_idle(links);
+ if (r < 0)
+ return r;
+ if (r > 0)
+ break; /* idle, we are done */
+
+ assert(event);
+
+ r = sd_event_run(event, /* timeout= */ UINT64_MAX);
+ if (r < 0)
+ return r;
+ }
+
+ return (ssize_t) c;
+}
#include <poll.h>
#include <pthread.h>
#include <sys/socket.h>
+#include <sys/stat.h>
#include <unistd.h>
#include "sd-event.h"
#include "io-util.h"
#include "json-util.h"
#include "memfd-util.h"
+#include "path-util.h"
#include "rm-rf.h"
#include "socket-util.h"
#include "tests.h"
ASSERT_OK(-pthread_join(t, NULL));
}
+typedef struct ExecDirServer {
+ sd_varlink_server *server;
+ sd_event *event;
+ const char *name;
+ pthread_t thread;
+} ExecDirServer;
+
+static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ ExecDirServer *srv = ASSERT_PTR(userdata);
+
+ return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("name", srv->name));
+}
+
+static void on_execute_dir_disconnect(sd_varlink_server *s, sd_varlink *link, void *userdata) {
+ ExecDirServer *srv = ASSERT_PTR(userdata);
+
+ /* Only one client (from varlink_execute_directory()) connects per server — once it's gone, we're done. */
+ ASSERT_OK(sd_event_exit(srv->event, 0));
+}
+
+static void *execute_dir_server_thread(void *arg) {
+ ExecDirServer *srv = arg;
+
+ ASSERT_OK(sd_event_loop(srv->event));
+ return NULL;
+}
+
+static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ size_t *count = ASSERT_PTR(userdata);
+
+ ASSERT_NULL(error_id);
+ ASSERT_NOT_NULL(sd_json_variant_by_key(parameters, "name"));
+
+ (*count)++;
+ return 0;
+}
+
+TEST(execute_directory) {
+ _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+ static const char * const names[] = { "alpha", "beta", "gamma" };
+ ExecDirServer servers[ELEMENTSOF(names)] = {};
+ size_t reply_count = 0;
+
+ ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
+
+ for (size_t i = 0; i < ELEMENTSOF(names); i++) {
+ ExecDirServer *eds = servers + i;
+ servers[i].name = names[i];
+
+ _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
+
+ ASSERT_OK(sd_event_new(&eds->event));
+ ASSERT_OK(varlink_server_new(&eds->server,
+ SD_VARLINK_SERVER_INHERIT_USERDATA,
+ eds));
+ ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
+ ASSERT_OK(sd_varlink_server_bind_disconnect(eds->server, on_execute_dir_disconnect));
+ ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
+ ASSERT_OK(sd_varlink_server_attach_event(eds->server, eds->event, 0));
+
+ ASSERT_OK(-pthread_create(&eds->thread, NULL, execute_dir_server_thread, eds));
+ }
+
+ ASSERT_OK_EQ(varlink_execute_directory(
+ tmpdir,
+ "io.test.ExecDirPing",
+ /* parameters= */ NULL,
+ /* more= */ false,
+ /* timeout_usec= */ USEC_INFINITY,
+ execute_dir_reply,
+ &reply_count), (ssize_t) ELEMENTSOF(names));
+ ASSERT_EQ(reply_count, (unsigned) ELEMENTSOF(names));
+
+ FOREACH_ELEMENT(eds, servers) {
+ ASSERT_OK(-pthread_join(eds->thread, NULL));
+ eds->server = sd_varlink_server_unref(eds->server);
+ eds->event = sd_event_unref(eds->event);
+ }
+
+ /* Calling the helper against a non-existent directory must fail. */
+ _cleanup_free_ char *nope = NULL;
+ ASSERT_OK(asprintf(&nope, "%s/does-not-exist", tmpdir));
+ ASSERT_FAIL(varlink_execute_directory(
+ nope,
+ "io.test.ExecDirPing",
+ /* parameters= */ NULL,
+ /* more= */ false,
+ /* timeout_usec= */ USEC_INFINITY,
+ execute_dir_reply,
+ &reply_count));
+
+ /* An empty directory must simply return 0 and not invoke the reply callback. */
+ _cleanup_free_ char *empty = ASSERT_PTR(path_join(tmpdir, "empty"));
+ ASSERT_OK_ERRNO(mkdir(empty, 0755));
+
+ size_t count_before = reply_count;
+ ASSERT_OK_ZERO(varlink_execute_directory(
+ empty,
+ "io.test.ExecDirPing",
+ /* parameters= */ NULL,
+ /* more= */ false,
+ /* timeout_usec= */ USEC_INFINITY,
+ execute_dir_reply,
+ &reply_count));
+ ASSERT_EQ(reply_count, count_before);
+}
+
DEFINE_TEST_MAIN(LOG_DEBUG);