]> git.ipfire.org Git - telemetry.git/commitdiff
command: Use the buffer implementation for internal buffering
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 8 Oct 2025 15:46:33 +0000 (15:46 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 8 Oct 2025 15:46:33 +0000 (15:46 +0000)
In order to use the file parser, we convert the output of the command to
a file object which then provides a nice and simple parser.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/daemon/command.c
src/daemon/command.h
src/daemon/sources/unbound.c

index 5fdd2fe6886b99e97ac3a9666a09808991d89ad0..99b5a869f165a1b500ef1b18ecb366caab8367b7 100644 (file)
 #include <sys/prctl.h>
 #include <unistd.h>
 
+#include "buffer.h"
 #include "command.h"
 #include "ctx.h"
+#include "file.h"
 #include "time.h"
 #include "util.h"
 
 // By default, commands are being killed after 30 seconds
 #define DEFAULT_TIMEOUT        SEC_TO_USEC(30)
 
+typedef struct collecty_command_output {
+       // Pipes
+       int pipes[2];
+
+       // Buffer
+       collecty_buffer* buffer;
+} collecty_command_output;
+
 struct collecty_command {
        collecty_ctx* ctx;
        int nrefs;
@@ -54,17 +64,8 @@ struct collecty_command {
        int pidfd;
 
        // Standard Output/Error
-       struct {
-               // Pipes
-               int pipes[2];
-
-               // Stream
-               FILE* f;
-
-               // Buffer
-               char* buffer;
-               size_t length;
-       } stdout, stderr;
+       collecty_command_output stdout;
+       collecty_command_output stderr;
 
        // Callbacks
        struct {
@@ -104,17 +105,11 @@ static void collecty_command_free(collecty_command* self) {
        collecty_command_close_pipe(self->stdout.pipes);
        collecty_command_close_pipe(self->stderr.pipes);
 
-       // Close streams
-       if (self->stdout.f)
-               fclose(self->stdout.f);
-       if (self->stderr.f)
-               fclose(self->stderr.f);
-
        // Free buffers
        if (self->stdout.buffer)
-               free(self->stdout.buffer);
+               collecty_buffer_unref(self->stdout.buffer);
        if (self->stderr.buffer)
-               free(self->stderr.buffer);
+               collecty_buffer_unref(self->stderr.buffer);
 
        // Free events
        if (self->events.timeout)
@@ -135,6 +130,26 @@ static void collecty_command_free(collecty_command* self) {
        free(self);
 }
 
+static int collecty_command_init_output(collecty_command* self, collecty_command_output* output) {
+       int r;
+
+       // Create pipes
+       r = pipe2(output->pipes, O_CLOEXEC);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to create pipes: %m\n");
+               return -errno;
+       }
+
+       // Create a buffer to write any data to
+       r = collecty_buffer_create(&output->buffer, self->ctx);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to create output buffer: %s\n", strerror(-r));
+               return r;
+       }
+
+       return 0;
+}
+
 int collecty_command_create(collecty_command** command,
                collecty_ctx* ctx, collecty_daemon* daemon) {
        collecty_command* self = NULL;
@@ -164,36 +179,14 @@ int collecty_command_create(collecty_command** command,
        self->pidfd = -EBADF;
 
        // Initialize stdout
-       r = pipe2(self->stdout.pipes, O_CLOEXEC);
-       if (r < 0) {
-               ERROR(self->ctx, "Failed to create stdout pipes: %m\n");
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Open a memory stream for stdout
-       self->stdout.f = open_memstream(&self->stdout.buffer, &self->stdout.length);
-       if (!self->stdout.f) {
-               ERROR(self->ctx, "Failed to open stream for stdout: %m\n");
-               r = -errno;
+       r = collecty_command_init_output(self, &self->stdout);
+       if (r < 0)
                goto ERROR;
-       }
 
        // Initialize stderr
-       r = pipe2(self->stderr.pipes, O_CLOEXEC);
-       if (r < 0) {
-               ERROR(self->ctx, "Failed to create stderr pipes: %m\n");
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Open a memory stream for stderr
-       self->stderr.f = open_memstream(&self->stderr.buffer, &self->stderr.length);
-       if (!self->stderr.f) {
-               ERROR(self->ctx, "Failed to open stream for stderr: %m\n");
-               r = -errno;
+       r = collecty_command_init_output(self, &self->stderr);
+       if (r < 0)
                goto ERROR;
-       }
 
        // Return pointer
        *command = self;
@@ -229,10 +222,11 @@ void collecty_command_on_success(collecty_command* self,
        self->callbacks.on_success_data = data;
 }
 
-static int collecty_command_output(collecty_command* self, int fd, unsigned int events, FILE* f) {
-       ssize_t bytes_written = 0;
+static int collecty_command_write(collecty_command* self,
+               int fd, unsigned int events, collecty_command_output* output) {
        ssize_t bytes_read = 0;
        char buffer[4096];
+       int r;
 
        for (;;) {
                // Read a block from the pipe
@@ -240,46 +234,37 @@ static int collecty_command_output(collecty_command* self, int fd, unsigned int
                if (bytes_read < 0) {
                        switch (errno) {
                                case EAGAIN:
-                                       goto END;
+                                       return 0;
 
                                default:
                                        return -errno;
                        }
                }
 
+               // Terminate if we have read nothing
                if (bytes_read == 0)
-                       goto END;
+                       return 0;
 
-               // Don't write if we don't have an output buffer
-               if (unlikely(!f))
-                       continue;
+               // Write to the buffer
+               r = collecty_buffer_write(output->buffer, buffer, bytes_read);
+               if (r < 0)
+                       return r;
 
-               // Write the buffer to the output
-               bytes_written = fwrite(buffer, 1, bytes_read, f);
-               if (bytes_written < bytes_read) {
-                       ERROR(self->ctx, "Failed to write output: %m\n");
-                       return -errno;
-               }
        }
 
-END:
-       // Flush after we have updated the buffer
-       if (unlikely(f))
-               fflush(f);
-
        return 0;
 }
 
 static int collecty_command_stdout(sd_event_source* source, int fd, unsigned int events, void* data) {
        collecty_command* self = data;
 
-       return collecty_command_output(self, fd, events, self->stdout.f);
+       return collecty_command_write(self, fd, events, &self->stdout);
 }
 
 static int collecty_command_stderr(sd_event_source* source, int fd, unsigned int events, void* data) {
        collecty_command* self = data;
 
-       return collecty_command_output(self, fd, events, self->stderr.f);
+       return collecty_command_write(self, fd, events, &self->stderr);
 }
 
 static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[2]) {
@@ -300,7 +285,8 @@ static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[
        return -EBADF;
 }
 
-static int collecty_command_log_stderr(const char* line, const size_t length, void* data) {
+static int collecty_command_log_stderr(collecty_ctx* ctx, collecty_file* file,
+               unsigned long lineno, char* line, size_t length, void* data) {
        collecty_command* self = data;
 
        // Send the line to the logger
@@ -311,6 +297,8 @@ static int collecty_command_log_stderr(const char* line, const size_t length, vo
 
 static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, void* data) {
        collecty_command* self = data;
+       collecty_file* stdout = NULL;
+       collecty_file* stderr = NULL;
        int rc = 0;
        int r = 0;
 
@@ -341,7 +329,13 @@ static int collecty_command_exited(sd_event_source* source, const siginfo_t* si,
 
                        // Log stderr
                        if (rc) {
-                               r = collecty_fwalk(self->stderr.f, collecty_command_log_stderr, self);
+                               // Open a file handle
+                               r = collecty_file_open_buffer(&stderr, self->ctx, self->stderr.buffer);
+                               if (r < 0)
+                                       goto ERROR;
+
+                               // Log all lines
+                               r = collecty_file_walk(stderr, collecty_command_log_stderr, self);
                                if (r < 0)
                                        goto ERROR;
                        }
@@ -368,11 +362,23 @@ static int collecty_command_exited(sd_event_source* source, const siginfo_t* si,
                goto ERROR;
 
        // Call the callback
-       if (self->callbacks.on_success)
-               r = self->callbacks.on_success(self->ctx, rc, self->stdout.buffer,
-                               self->stdout.length, self->callbacks.on_success_data);
+       if (self->callbacks.on_success) {
+               // Open stdout as a file
+               r = collecty_file_open_buffer(&stdout, self->ctx, self->stdout.buffer);
+               if (r < 0)
+                       goto ERROR;
+
+               // Call the callback
+               r = self->callbacks.on_success(self->ctx,
+                       rc, stdout, self->callbacks.on_success_data);
+       }
 
 ERROR:
+       if (stderr)
+               collecty_file_unref(stderr);
+       if (stdout)
+               collecty_file_unref(stdout);
+
        // Drop the extra reference
        collecty_command_unref(self);
 
index 46bfa1ed427e956a03b4cc1ac2fe11823e86be67..b5d96b7ed68172c07e972ab93fc08c62a7e5244f 100644 (file)
@@ -24,6 +24,7 @@
 typedef struct collecty_command collecty_command;
 
 #include "ctx.h"
+#include "file.h"
 #include "daemon.h"
 
 int collecty_command_create(collecty_command** command,
@@ -35,8 +36,8 @@ collecty_command* collecty_command_unref(collecty_command* self);
 // Timeout
 void collecty_command_set_timeout(collecty_command* self, uint64_t timeout);
 
-typedef int (*collecty_command_success_callback)(collecty_ctx* ctx,
-       int rc, const char* output, const size_t length, void* data);
+typedef int (*collecty_command_success_callback)
+       (collecty_ctx* ctx, int rc, collecty_file* stdout, void* data);
 
 // Called if the command has exited successfully
 void collecty_command_on_success(collecty_command* self,
index df53aad721a2b93bd36ba9ebcff4d77ebad27ea1..ae2ef211a1a9ecd7560e98fc650e0d315a2563c5 100644 (file)
 #include "../util.h"
 #include "unbound.h"
 
-typedef struct collecty_unbound_stats {
-       unsigned long queries;
-       unsigned long cachehits;
-       unsigned long cachemiss;
-       unsigned long prefetch;
-       unsigned long rec_replies;
-       double rec_time_avg;
-       unsigned long rec_time_median;
-} collecty_unbound_stats;
-
-static int unbound_parse(const char* line, const size_t length, void* data) {
-       collecty_unbound_stats* stats = data;
-       int r;
-
-       // Parse queries
-       r = sscanf(line, "total.num.queries=%lu", &stats->queries);
-       if (r == 1)
-               return 0;
-
-       // Parse cachehits
-       r = sscanf(line, "total.num.cachehits=%lu", &stats->cachehits);
-       if (r == 1)
-               return 0;
-
-       // Parse cachemiss
-       r = sscanf(line, "total.num.cachemiss=%lu", &stats->cachemiss);
-       if (r == 1)
-               return 0;
-
-       // Parse prefetch
-       r = sscanf(line, "total.num.prefetch=%lu", &stats->prefetch);
-       if (r == 1)
-               return 0;
-
-       // Parse rec_replies
-       r = sscanf(line, "total.num.recursivereplies=%lu", &stats->rec_replies);
-       if (r == 1)
-               return 0;
-
-       // Parse rec_time_avg
-       r = sscanf(line, "total.recursion.time.avg=%lf", &stats->rec_time_avg);
-       if (r == 1)
-               return 0;
-
-       // Parse rec_time_median
-       r = sscanf(line, "total.recursion.time.median=%lu", &stats->rec_time_median);
-       if (r == 1)
-               return 0;
-
-       return 0;
-}
-
 static int unbound_on_success(collecty_ctx* ctx,
-               int rc, const char* output, const size_t length, void* data) {
+               int rc, collecty_file* stdout, void* data) {
        collecty_source* source = data;
        int r;
 
        // Collect stats
-       collecty_unbound_stats stats = {};
+       unsigned long queries = 0;
+       unsigned long cachehits = 0;
+       unsigned long cachemiss = 0;
+       unsigned long prefetch = 0;
+       unsigned long rec_replies = 0;
+       double rec_time_avg = 0;
+       unsigned long rec_time_median = 0;
+
+       collecty_file_parser parser[] = {
+               { "total.num.queries=%lu",           &queries },
+               { "total.num.cachehits=%lu",         &cachehits },
+               { "total.num.cachemiss=%lu",         &cachemiss },
+               { "total.num.prefetch=%lu",          &prefetch },
+               { "total.num.recursivereplies=%lu",  &rec_replies },
+               { "total.recursion.time.avg=%lf",    &rec_time_avg },
+               { "total.recursion.time.median=%lu", &rec_time_median },
+               { NULL },
+       };
 
        // Parse the output
-       r = collecty_fwalk_buffer(output, length, unbound_parse, &stats);
+       r = collecty_file_parse(stdout, parser);
        if (r < 0)
                return r;
 
        // Submit values
        return collecty_source_submit(source, NULL, "%lu:%lu:%lu:%lu:%lu:%lf:%lu",
-               stats.queries, stats.cachehits, stats.cachemiss, stats.prefetch,
-               stats.rec_replies, stats.rec_time_avg, stats.rec_time_median);
+               queries, cachehits, cachemiss, prefetch, rec_replies, rec_time_avg, rec_time_median);
 }
 
 static int unbound_heartbeat(collecty_ctx* ctx, collecty_source* source) {