]> git.ipfire.org Git - telemetry.git/commitdiff
command: Add a helper object to run a command
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 6 Oct 2025 16:16:11 +0000 (16:16 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 6 Oct 2025 16:16:11 +0000 (16:16 +0000)
This is designed to run any shell commands and parse the output. This
has to be done asynchronously so that we won't block the event loop.
That slightly adds some complications.

The command will be executed and events will be installed with the event
loop to read any stdout/stderr and store it as well as receive a signal
when the process terminates.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/daemon/command.c [new file with mode: 0644]
src/daemon/command.h [new file with mode: 0644]
src/daemon/source.c
src/daemon/source.h

index 127645f83db6a43bc6be7aba18226af263c4f576..d8d56a84b08dc92e193256b01fd56e6773b7aef4 100644 (file)
@@ -95,6 +95,8 @@ dist_collectyd_SOURCES = \
        src/daemon/bus.c \
        src/daemon/bus.h \
        src/daemon/colors.h \
+       src/daemon/command.c \
+       src/daemon/command.h \
        src/daemon/ctx.c \
        src/daemon/ctx.h \
        src/daemon/daemon.c \
diff --git a/src/daemon/command.c b/src/daemon/command.c
new file mode 100644 (file)
index 0000000..4481f9e
--- /dev/null
@@ -0,0 +1,492 @@
+/*#############################################################################
+#                                                                             #
+# collecty - A system statistics collection daemon for IPFire                 #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#include <errno.h>
+#include <fcntl.h>
+#include <linux/sched.h>
+#include <sched.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <syscall.h>
+#include <sys/prctl.h>
+#include <unistd.h>
+
+#include "command.h"
+#include "ctx.h"
+#include "util.h"
+
+struct collecty_command {
+       collecty_ctx* ctx;
+       int nrefs;
+
+       // Daemon
+       collecty_daemon* daemon;
+
+       // Loop
+       sd_event* loop;
+
+       // pidfd
+       int pidfd;
+
+       // Standard Output/Error
+       struct {
+               // Pipes
+               int pipes[2];
+
+               // Stream
+               FILE* f;
+
+               // Buffer
+               char* buffer;
+               size_t length;
+       } stdout, stderr;
+
+       // Callbacks
+       struct {
+               // On success
+               collecty_command_success_callback on_success;
+               void* on_success_data;
+       } callbacks;
+
+       // Events
+       struct {
+               sd_event_source* stdout;
+               sd_event_source* stderr;
+               sd_event_source* exit;
+       } events;
+};
+
+static inline int clone3(struct clone_args* args, size_t size) {
+       return syscall(__NR_clone3, args, size);
+}
+
+static void collecty_command_close_pipe(int fds[2]) {
+       for (unsigned int i = 0; i < 2; i++) {
+               if (fds[i] >= 0) {
+                       close(fds[i]);
+                       fds[i] = -EBADF;
+               }
+       }
+}
+
+static void collecty_command_free(collecty_command* self) {
+       // Close pipes
+       collecty_command_close_pipe(self->stdout.pipes);
+       collecty_command_close_pipe(self->stderr.pipes);
+
+       // Close streams
+       if (self->stdout.f)
+               fclose(self->stdout.f);
+       if (self->stderr.f)
+               fclose(self->stderr.f);
+
+       // Free buffers
+       if (self->stdout.buffer)
+               free(self->stdout.buffer);
+       if (self->stderr.buffer)
+               free(self->stderr.buffer);
+
+       // Free events
+       if (self->events.exit)
+               sd_event_source_unref(self->events.exit);
+       if (self->events.stdout)
+               sd_event_source_unref(self->events.stdout);
+       if (self->events.stderr)
+               sd_event_source_unref(self->events.stderr);
+
+       if (self->loop)
+               sd_event_unref(self->loop);
+       if (self->daemon)
+               collecty_daemon_unref(self->daemon);
+       if (self->ctx)
+               collecty_ctx_unref(self->ctx);
+       free(self);
+}
+
+int collecty_command_create(collecty_command** command,
+               collecty_ctx* ctx, collecty_daemon* daemon) {
+       collecty_command* self = NULL;
+       int r;
+
+       // Allocate some memory
+       self = calloc(1, sizeof(*self));
+       if (!self)
+               return -errno;
+
+       // Initialize the reference counter
+       self->nrefs = 1;
+
+       // Keep a reference to the context
+       self->ctx = collecty_ctx_ref(ctx);
+
+       // Store a reference to the daemon
+       self->daemon = collecty_daemon_ref(daemon);
+
+       // Fetch a reference to the event loop
+       self->loop = collecty_daemon_loop(daemon);
+
+       // Initialize pidfd
+       self->pidfd = -EBADF;
+
+       // Initialize stdout
+       r = pipe2(self->stdout.pipes, O_CLOEXEC);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to create stdout pipes: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Open a memory stream for stdout
+       self->stdout.f = open_memstream(&self->stdout.buffer, &self->stdout.length);
+       if (!self->stdout.f) {
+               ERROR(self->ctx, "Failed to open stream for stdout: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Initialize stderr
+       r = pipe2(self->stderr.pipes, O_CLOEXEC);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to create stderr pipes: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Open a memory stream for stderr
+       self->stderr.f = open_memstream(&self->stderr.buffer, &self->stderr.length);
+       if (!self->stderr.f) {
+               ERROR(self->ctx, "Failed to open stream for stderr: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Return pointer
+       *command = self;
+       return 0;
+
+ERROR:
+       if (self)
+               collecty_command_unref(self);
+
+       return r;
+}
+
+collecty_command* collecty_command_ref(collecty_command* self) {
+       ++self->nrefs;
+       return self;
+}
+
+collecty_command* collecty_command_unref(collecty_command* self) {
+       if (--self->nrefs > 0)
+               return self;
+
+       collecty_command_free(self);
+       return NULL;
+}
+
+void collecty_command_on_success(collecty_command* self,
+               collecty_command_success_callback callback, void* data) {
+       self->callbacks.on_success = callback;
+       self->callbacks.on_success_data = data;
+}
+
+static int collecty_command_output(collecty_command* self, int fd, unsigned int events, FILE* f) {
+       ssize_t bytes_written = 0;
+       ssize_t bytes_read = 0;
+       char buffer[4096];
+
+       for (;;) {
+               // Read a block from the pipe
+               bytes_read = read(fd, buffer, sizeof(buffer));
+               if (bytes_read < 0) {
+                       switch (errno) {
+                               case EAGAIN:
+                                       goto END;
+
+                               default:
+                                       return -errno;
+                       }
+               }
+
+               if (bytes_read == 0)
+                       goto END;
+
+               // Don't write if we don't have an output buffer
+               if (unlikely(!f))
+                       continue;
+
+               // Write the buffer to the output
+               bytes_written = fwrite(buffer, 1, bytes_read, f);
+               if (bytes_written < bytes_read) {
+                       ERROR(self->ctx, "Failed to write output: %m\n");
+                       return -errno;
+               }
+       }
+
+END:
+       // Flush after we have updated the buffer
+       if (unlikely(f))
+               fflush(f);
+
+       return 0;
+}
+
+static int collecty_command_stdout(sd_event_source* source, int fd, unsigned int events, void* data) {
+       collecty_command* self = data;
+
+       return collecty_command_output(self, fd, events, self->stdout.f);
+}
+
+static int collecty_command_stderr(sd_event_source* source, int fd, unsigned int events, void* data) {
+       collecty_command* self = data;
+
+       return collecty_command_output(self, fd, events, self->stderr.f);
+}
+
+static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[2]) {
+       // Give the variables easier names to avoid confusion
+       int* fd_read  = &(*fds)[0];
+       int* fd_write = &(*fds)[1];
+
+       // Close the write end of the pipe
+       if (*fd_write >= 0) {
+               close(*fd_write);
+               *fd_write = -EBADF;
+       }
+
+       // Return the read end
+       if (*fd_read >= 0)
+               return *fd_read;
+
+       return -EBADF;
+}
+
+static int collecty_command_log_stderr(const char* line, const size_t length, void* data) {
+       collecty_command* self = data;
+
+       // Send the line to the logger
+       ERROR(self->ctx, "  stderr: %s\n", line);
+
+       return 0;
+}
+
+static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, void* data) {
+       collecty_command* self = data;
+       int rc = 0;
+       int r = 0;
+
+       // Drain standard output
+       if (self->stdout.pipes[0] >= 0) {
+               r = collecty_command_stdout(source, self->stdout.pipes[0], EPOLLIN, data);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to drain stdout: %s\n", strerror(-r));
+                       return r;
+               }
+       }
+
+       // Drain standard error
+       if (self->stderr.pipes[0] >= 0) {
+               r = collecty_command_stderr(source, self->stderr.pipes[0], EPOLLIN, data);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to drain stderr: %s\n", strerror(-r));
+                       return r;
+               }
+       }
+
+       switch (si->si_code) {
+               case CLD_EXITED:
+                       DEBUG(self->ctx, "Process has exited with status code %d\n", si->si_status);
+
+                       // Store the exit code
+                       rc = si->si_status;
+
+                       // Log stderr
+                       if (rc) {
+                               r = collecty_fwalk(self->stderr.f, collecty_command_log_stderr, self);
+                               if (r < 0)
+                                       goto ERROR;
+                       }
+                       break;
+
+               case CLD_KILLED:
+                       ERROR(self->ctx, "Process has been killed by signal %d\n", si->si_signo);
+
+                       // Store the exit code
+                       rc = 139;
+                       break;
+
+               case CLD_DUMPED:
+                       ERROR(self->ctx, "The child process terminated abnormally with status "
+                               "code %d\n", si->si_status);
+
+                       // Store the exit code
+                       rc = 128 + si->si_status;
+                       break;
+       }
+
+       // Skip any further processing if the command did not succeed
+       if (rc)
+               goto ERROR;
+
+       // Call the callback
+       if (self->callbacks.on_success)
+               r = self->callbacks.on_success(self->ctx, rc, self->stdout.buffer,
+                               self->stdout.length, self->callbacks.on_success_data);
+
+ERROR:
+       // Drop the extra reference
+       collecty_command_unref(self);
+
+       return r;
+}
+
+static int collecty_command_parent(collecty_command* self) {
+       int fd = -EBADF;
+       int r;
+
+       // Register the PID file descriptor
+       r = sd_event_add_child_pidfd(self->loop, &self->events.exit, self->pidfd, WEXITED,
+                       collecty_command_exited, collecty_command_ref(self));
+       if (r < 0) {
+               DEBUG(self->ctx, "Failed to register the child process with the event loop: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Prepare standard output for reading
+       fd = collecty_command_get_pipe_to_read(self, &self->stdout.pipes);
+       if (fd >= 0) {
+               // Add the file descriptor to the event loop
+               r = sd_event_add_io(self->loop, &self->events.stdout,
+                               fd, EPOLLIN|EPOLLHUP|EPOLLET, collecty_command_stdout, self);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to register stdout for reading: %s\n", strerror(-r));
+                       return r;
+               }
+       }
+
+       // Prepare standard error for reading
+       fd = collecty_command_get_pipe_to_read(self, &self->stderr.pipes);
+       if (fd >= 0) {
+               // Add the file descriptor to the event loop
+               r = sd_event_add_io(self->loop, &self->events.stderr,
+                               fd, EPOLLIN|EPOLLHUP|EPOLLET, collecty_command_stderr, self);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to register stderr for reading: %s\n", strerror(-r));
+                       return r;
+               }
+       }
+
+       DEBUG(self->ctx, "Parent has finished\n");
+
+       return r;
+}
+
+static int collecty_command_child(collecty_command* self, const char** argv) {
+       int fd = -EBADF;
+       int r;
+
+       // Die with parent
+       r = prctl(PR_SET_PDEATHSIG, SIGKILL, 0, 0, 0);
+       if (r < 0) {
+               ERROR(self->ctx, "Could not configure to die with parent: %m\n");
+               return -errno;
+       }
+
+       // Open /dev/null
+       fd = open("/dev/null", O_RDONLY);
+       if (fd < 0) {
+               ERROR(self->ctx, "Failed to open /dev/null: %m\n");
+               return -errno;
+       }
+
+       // Connect stdin to /dev/null
+       r = dup2(fd, STDIN_FILENO);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to connect /dev/null to stdin: %m\n");
+               return -errno;
+       }
+
+       // Close /dev/null
+       close(fd);
+
+       // Connect stdout to the pipe
+       r = dup2(self->stdout.pipes[1], STDOUT_FILENO);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to connect stdout: %m\n");
+               return -errno;
+       }
+
+       // Connect stderr to the pipe
+       r = dup2(self->stderr.pipes[1], STDERR_FILENO);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to connect stderr: %m\n");
+               return -errno;
+       }
+
+       // Close all pipes
+       collecty_command_close_pipe(self->stdout.pipes);
+       collecty_command_close_pipe(self->stderr.pipes);
+
+       // Execute the command
+
+       // Execute the command
+       r = execvp(argv[0], (char**)argv);
+       if (r < 0)
+               return -errno;
+
+       // We should never get here
+       abort();
+}
+
+int collecty_command_execute(collecty_command* self, const char** argv) {
+       struct clone_args args = {
+               .flags       = CLONE_PIDFD,
+               .exit_signal = SIGCHLD,
+               .pidfd       = (long long unsigned int)&self->pidfd,
+       };
+       pid_t pid;
+       int r;
+
+       // Array must have some content
+       if (!argv || !*argv)
+               return -EINVAL;
+
+       // Log action
+       DEBUG(self->ctx, "Executing command:\n");
+       for (unsigned int i = 0; argv[i]; i++)
+               DEBUG(self->ctx, "    argv[%d] = %s\n", i, argv[i]);
+
+       // Fork this process
+       pid = clone3(&args, sizeof(args));
+       if (pid < 0) {
+               ERROR(self->ctx, "clone3() failed: %m\n");
+               return -errno;
+       }
+
+       // Child process
+       if (pid == 0) {
+               r = collecty_command_child(self, argv);
+               _exit(r);
+       }
+
+       // Parent process
+       return collecty_command_parent(self);
+}
diff --git a/src/daemon/command.h b/src/daemon/command.h
new file mode 100644 (file)
index 0000000..1864e31
--- /dev/null
@@ -0,0 +1,44 @@
+/*#############################################################################
+#                                                                             #
+# collecty - A system statistics collection daemon for IPFire                 #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#ifndef COLLECTY_COMMAND_H
+#define COLLECTY_COMMAND_H
+
+typedef struct collecty_command collecty_command;
+
+#include "ctx.h"
+#include "daemon.h"
+
+int collecty_command_create(collecty_command** command,
+       collecty_ctx* ctx, collecty_daemon* daemon);
+
+collecty_command* collecty_command_ref(collecty_command* self);
+collecty_command* collecty_command_unref(collecty_command* self);
+
+typedef int (*collecty_command_success_callback)(collecty_ctx* ctx,
+       int rc, const char* output, const size_t length, void* data);
+
+// Called if the command has exited successfully
+void collecty_command_on_success(collecty_command* self,
+       collecty_command_success_callback callback, void* data);
+
+int collecty_command_execute(collecty_command* self, const char** argv);
+
+#endif /* COLLECTY_COMMAND_H */
index 9256cf28b83a687b3ab57af7f8840bebb34ea689..cd075032c84248dbe4284e7a739e7581cd0f4b49 100644 (file)
@@ -30,6 +30,7 @@
 #include <systemd/sd-event.h>
 
 #include "args.h"
+#include "command.h"
 #include "ctx.h"
 #include "daemon.h"
 #include "source.h"
@@ -436,6 +437,10 @@ const char* collecty_source_name(collecty_source* self) {
        return self->impl->name;
 }
 
+int collecty_source_create_command(collecty_source* self, collecty_command** command) {
+       return collecty_command_create(command, self->ctx, self->daemon);
+}
+
 #define collecty_source_path(source, object, path) \
        __collecty_source_path(source, object, path, sizeof(path))
 
index feec90ad9fa1a5cef10f3586f2ad8dd8ddfeda9f..dfe34e43019d822e00b67920d770b9b80f7a94fe 100644 (file)
@@ -24,6 +24,7 @@
 typedef struct collecty_source collecty_source;
 
 #include "args.h"
+#include "command.h"
 #include "ctx.h"
 #include "daemon.h"
 
@@ -78,6 +79,8 @@ collecty_source* collecty_source_unref(collecty_source* self);
 
 const char* collecty_source_name(collecty_source* self);
 
+int collecty_source_create_command(collecty_source* self, collecty_command** command);
+
 int collecty_source_submit(collecty_source* self, const char* object,
        const char* format, ...) __attribute__((format(printf, 3, 4)));