From: Michael Tremer Date: Wed, 8 Oct 2025 15:46:33 +0000 (+0000) Subject: command: Use the buffer implementation for internal buffering X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2c498291bffa04a853dcf0b4048314b4d7d1e6fa;p=telemetry.git command: Use the buffer implementation for internal buffering In order to use the file parser, we convert the output of the command to a file object which then provides a nice and simple parser. Signed-off-by: Michael Tremer --- diff --git a/src/daemon/command.c b/src/daemon/command.c index 5fdd2fe..99b5a86 100644 --- a/src/daemon/command.c +++ b/src/daemon/command.c @@ -29,14 +29,24 @@ #include #include +#include "buffer.h" #include "command.h" #include "ctx.h" +#include "file.h" #include "time.h" #include "util.h" // By default, commands are being killed after 30 seconds #define DEFAULT_TIMEOUT SEC_TO_USEC(30) +typedef struct collecty_command_output { + // Pipes + int pipes[2]; + + // Buffer + collecty_buffer* buffer; +} collecty_command_output; + struct collecty_command { collecty_ctx* ctx; int nrefs; @@ -54,17 +64,8 @@ struct collecty_command { int pidfd; // Standard Output/Error - struct { - // Pipes - int pipes[2]; - - // Stream - FILE* f; - - // Buffer - char* buffer; - size_t length; - } stdout, stderr; + collecty_command_output stdout; + collecty_command_output stderr; // Callbacks struct { @@ -104,17 +105,11 @@ static void collecty_command_free(collecty_command* self) { collecty_command_close_pipe(self->stdout.pipes); collecty_command_close_pipe(self->stderr.pipes); - // Close streams - if (self->stdout.f) - fclose(self->stdout.f); - if (self->stderr.f) - fclose(self->stderr.f); - // Free buffers if (self->stdout.buffer) - free(self->stdout.buffer); + collecty_buffer_unref(self->stdout.buffer); if (self->stderr.buffer) - free(self->stderr.buffer); + collecty_buffer_unref(self->stderr.buffer); // Free events if (self->events.timeout) @@ -135,6 +130,26 @@ static void collecty_command_free(collecty_command* self) { free(self); } +static int collecty_command_init_output(collecty_command* self, collecty_command_output* output) { + int r; + + // Create pipes + r = pipe2(output->pipes, O_CLOEXEC); + if (r < 0) { + ERROR(self->ctx, "Failed to create pipes: %m\n"); + return -errno; + } + + // Create a buffer to write any data to + r = collecty_buffer_create(&output->buffer, self->ctx); + if (r < 0) { + ERROR(self->ctx, "Failed to create output buffer: %s\n", strerror(-r)); + return r; + } + + return 0; +} + int collecty_command_create(collecty_command** command, collecty_ctx* ctx, collecty_daemon* daemon) { collecty_command* self = NULL; @@ -164,36 +179,14 @@ int collecty_command_create(collecty_command** command, self->pidfd = -EBADF; // Initialize stdout - r = pipe2(self->stdout.pipes, O_CLOEXEC); - if (r < 0) { - ERROR(self->ctx, "Failed to create stdout pipes: %m\n"); - r = -errno; - goto ERROR; - } - - // Open a memory stream for stdout - self->stdout.f = open_memstream(&self->stdout.buffer, &self->stdout.length); - if (!self->stdout.f) { - ERROR(self->ctx, "Failed to open stream for stdout: %m\n"); - r = -errno; + r = collecty_command_init_output(self, &self->stdout); + if (r < 0) goto ERROR; - } // Initialize stderr - r = pipe2(self->stderr.pipes, O_CLOEXEC); - if (r < 0) { - ERROR(self->ctx, "Failed to create stderr pipes: %m\n"); - r = -errno; - goto ERROR; - } - - // Open a memory stream for stderr - self->stderr.f = open_memstream(&self->stderr.buffer, &self->stderr.length); - if (!self->stderr.f) { - ERROR(self->ctx, "Failed to open stream for stderr: %m\n"); - r = -errno; + r = collecty_command_init_output(self, &self->stderr); + if (r < 0) goto ERROR; - } // Return pointer *command = self; @@ -229,10 +222,11 @@ void collecty_command_on_success(collecty_command* self, self->callbacks.on_success_data = data; } -static int collecty_command_output(collecty_command* self, int fd, unsigned int events, FILE* f) { - ssize_t bytes_written = 0; +static int collecty_command_write(collecty_command* self, + int fd, unsigned int events, collecty_command_output* output) { ssize_t bytes_read = 0; char buffer[4096]; + int r; for (;;) { // Read a block from the pipe @@ -240,46 +234,37 @@ static int collecty_command_output(collecty_command* self, int fd, unsigned int if (bytes_read < 0) { switch (errno) { case EAGAIN: - goto END; + return 0; default: return -errno; } } + // Terminate if we have read nothing if (bytes_read == 0) - goto END; + return 0; - // Don't write if we don't have an output buffer - if (unlikely(!f)) - continue; + // Write to the buffer + r = collecty_buffer_write(output->buffer, buffer, bytes_read); + if (r < 0) + return r; - // Write the buffer to the output - bytes_written = fwrite(buffer, 1, bytes_read, f); - if (bytes_written < bytes_read) { - ERROR(self->ctx, "Failed to write output: %m\n"); - return -errno; - } } -END: - // Flush after we have updated the buffer - if (unlikely(f)) - fflush(f); - return 0; } static int collecty_command_stdout(sd_event_source* source, int fd, unsigned int events, void* data) { collecty_command* self = data; - return collecty_command_output(self, fd, events, self->stdout.f); + return collecty_command_write(self, fd, events, &self->stdout); } static int collecty_command_stderr(sd_event_source* source, int fd, unsigned int events, void* data) { collecty_command* self = data; - return collecty_command_output(self, fd, events, self->stderr.f); + return collecty_command_write(self, fd, events, &self->stderr); } static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[2]) { @@ -300,7 +285,8 @@ static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[ return -EBADF; } -static int collecty_command_log_stderr(const char* line, const size_t length, void* data) { +static int collecty_command_log_stderr(collecty_ctx* ctx, collecty_file* file, + unsigned long lineno, char* line, size_t length, void* data) { collecty_command* self = data; // Send the line to the logger @@ -311,6 +297,8 @@ static int collecty_command_log_stderr(const char* line, const size_t length, vo static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, void* data) { collecty_command* self = data; + collecty_file* stdout = NULL; + collecty_file* stderr = NULL; int rc = 0; int r = 0; @@ -341,7 +329,13 @@ static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, // Log stderr if (rc) { - r = collecty_fwalk(self->stderr.f, collecty_command_log_stderr, self); + // Open a file handle + r = collecty_file_open_buffer(&stderr, self->ctx, self->stderr.buffer); + if (r < 0) + goto ERROR; + + // Log all lines + r = collecty_file_walk(stderr, collecty_command_log_stderr, self); if (r < 0) goto ERROR; } @@ -368,11 +362,23 @@ static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, goto ERROR; // Call the callback - if (self->callbacks.on_success) - r = self->callbacks.on_success(self->ctx, rc, self->stdout.buffer, - self->stdout.length, self->callbacks.on_success_data); + if (self->callbacks.on_success) { + // Open stdout as a file + r = collecty_file_open_buffer(&stdout, self->ctx, self->stdout.buffer); + if (r < 0) + goto ERROR; + + // Call the callback + r = self->callbacks.on_success(self->ctx, + rc, stdout, self->callbacks.on_success_data); + } ERROR: + if (stderr) + collecty_file_unref(stderr); + if (stdout) + collecty_file_unref(stdout); + // Drop the extra reference collecty_command_unref(self); diff --git a/src/daemon/command.h b/src/daemon/command.h index 46bfa1e..b5d96b7 100644 --- a/src/daemon/command.h +++ b/src/daemon/command.h @@ -24,6 +24,7 @@ typedef struct collecty_command collecty_command; #include "ctx.h" +#include "file.h" #include "daemon.h" int collecty_command_create(collecty_command** command, @@ -35,8 +36,8 @@ collecty_command* collecty_command_unref(collecty_command* self); // Timeout void collecty_command_set_timeout(collecty_command* self, uint64_t timeout); -typedef int (*collecty_command_success_callback)(collecty_ctx* ctx, - int rc, const char* output, const size_t length, void* data); +typedef int (*collecty_command_success_callback) + (collecty_ctx* ctx, int rc, collecty_file* stdout, void* data); // Called if the command has exited successfully void collecty_command_on_success(collecty_command* self, diff --git a/src/daemon/sources/unbound.c b/src/daemon/sources/unbound.c index df53aad..ae2ef21 100644 --- a/src/daemon/sources/unbound.c +++ b/src/daemon/sources/unbound.c @@ -26,75 +26,39 @@ #include "../util.h" #include "unbound.h" -typedef struct collecty_unbound_stats { - unsigned long queries; - unsigned long cachehits; - unsigned long cachemiss; - unsigned long prefetch; - unsigned long rec_replies; - double rec_time_avg; - unsigned long rec_time_median; -} collecty_unbound_stats; - -static int unbound_parse(const char* line, const size_t length, void* data) { - collecty_unbound_stats* stats = data; - int r; - - // Parse queries - r = sscanf(line, "total.num.queries=%lu", &stats->queries); - if (r == 1) - return 0; - - // Parse cachehits - r = sscanf(line, "total.num.cachehits=%lu", &stats->cachehits); - if (r == 1) - return 0; - - // Parse cachemiss - r = sscanf(line, "total.num.cachemiss=%lu", &stats->cachemiss); - if (r == 1) - return 0; - - // Parse prefetch - r = sscanf(line, "total.num.prefetch=%lu", &stats->prefetch); - if (r == 1) - return 0; - - // Parse rec_replies - r = sscanf(line, "total.num.recursivereplies=%lu", &stats->rec_replies); - if (r == 1) - return 0; - - // Parse rec_time_avg - r = sscanf(line, "total.recursion.time.avg=%lf", &stats->rec_time_avg); - if (r == 1) - return 0; - - // Parse rec_time_median - r = sscanf(line, "total.recursion.time.median=%lu", &stats->rec_time_median); - if (r == 1) - return 0; - - return 0; -} - static int unbound_on_success(collecty_ctx* ctx, - int rc, const char* output, const size_t length, void* data) { + int rc, collecty_file* stdout, void* data) { collecty_source* source = data; int r; // Collect stats - collecty_unbound_stats stats = {}; + unsigned long queries = 0; + unsigned long cachehits = 0; + unsigned long cachemiss = 0; + unsigned long prefetch = 0; + unsigned long rec_replies = 0; + double rec_time_avg = 0; + unsigned long rec_time_median = 0; + + collecty_file_parser parser[] = { + { "total.num.queries=%lu", &queries }, + { "total.num.cachehits=%lu", &cachehits }, + { "total.num.cachemiss=%lu", &cachemiss }, + { "total.num.prefetch=%lu", &prefetch }, + { "total.num.recursivereplies=%lu", &rec_replies }, + { "total.recursion.time.avg=%lf", &rec_time_avg }, + { "total.recursion.time.median=%lu", &rec_time_median }, + { NULL }, + }; // Parse the output - r = collecty_fwalk_buffer(output, length, unbound_parse, &stats); + r = collecty_file_parse(stdout, parser); if (r < 0) return r; // Submit values return collecty_source_submit(source, NULL, "%lu:%lu:%lu:%lu:%lu:%lf:%lu", - stats.queries, stats.cachehits, stats.cachemiss, stats.prefetch, - stats.rec_replies, stats.rec_time_avg, stats.rec_time_median); + queries, cachehits, cachemiss, prefetch, rec_replies, rec_time_avg, rec_time_median); } static int unbound_heartbeat(collecty_ctx* ctx, collecty_source* source) {