From: Michael Tremer Date: Sat, 27 Sep 2025 16:07:58 +0000 (+0000) Subject: daemon: Move the queue into a separate object X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c0f89d831c5508f031c6f9ba379a6e47af5abecb;p=telemetry.git daemon: Move the queue into a separate object Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index 2b6802c..b4e73ba 100644 --- a/Makefile.am +++ b/Makefile.am @@ -101,7 +101,9 @@ dist_collectyd_SOURCES = \ src/daemon/modules.c \ src/daemon/modules.h \ src/daemon/modules/loadavg.c \ - src/daemon/modules/loadavg.h + src/daemon/modules/loadavg.h \ + src/daemon/queue.c \ + src/daemon/queue.h collectyd_CPPFLAGS = \ $(AM_CPPFLAGS) diff --git a/src/daemon/daemon.c b/src/daemon/daemon.c index 3f41aa0..a81787c 100644 --- a/src/daemon/daemon.c +++ b/src/daemon/daemon.c @@ -21,8 +21,6 @@ #include #include #include -#include -#include #include #include @@ -31,24 +29,7 @@ #include "daemon.h" #include "module.h" #include "modules.h" - -#define FLUSH_QUEUE_INTERVAL 60000000 // 60s (for testing) - -struct collecty_queue_object { - STAILQ_ENTRY(collecty_queue_object) nodes; - - // Module - collecty_module* module; - - // Object - char* object; - - // Timestamp - struct timeval t; - - // Value - char* value; -}; +#include "queue.h" struct collecty_daemon { collecty_ctx* ctx; @@ -63,21 +44,12 @@ struct collecty_daemon { sd_event_source* sigint; sd_event_source* sighup; sd_event_source* modules_init; - sd_event_source* flush; } events; - // Write Queue - STAILQ_HEAD(queue, collecty_queue_object) queue; + // Queue + collecty_queue* queue; }; -static int collecty_daemon_flush(collecty_daemon* self) { - DEBUG(self->ctx, "Flushing the queue...\n"); - - // XXX TODO - - return 0; -} - static int collecty_daemon_modules_init(sd_event_source* source, void* data) { collecty_daemon* daemon = data; @@ -99,7 +71,7 @@ static int collecty_daemon_SIGHUP(sd_event_source* source, collecty_daemon* self = data; // Flush the queue - return collecty_daemon_flush(self); + return collecty_queue_flush(self->queue); } static int collecty_daemon_setup_loop(collecty_daemon* self) { @@ -152,69 +124,7 @@ static int collecty_daemon_setup_loop(collecty_daemon* self) { return 0; } -static void collecty_daemon_free_queue_object(struct collecty_queue_object* o) { - if (o->module) - collecty_module_unref(o->module); - if (o->object) - free(o->object); - if (o->value) - free(o->value); - free(o); -} - -static int collecty_daemon_flush_timer(sd_event_source* source, uint64_t usec, void* data) { - collecty_daemon* self = data; - int r; - - // Call again soon... - r = sd_event_source_set_time(source, usec + FLUSH_QUEUE_INTERVAL); - if (r < 0) - return r; - - // Enable the timer - r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT); - if (r < 0) - return r; - - // Flush the queue - return collecty_daemon_flush(self); -} - -static int collecty_daemon_setup_queue(collecty_daemon* self) { - int r; - - // Initialize the queue - STAILQ_INIT(&self->queue); - - // Create a timer which regularly flushes the queue - r = sd_event_add_time_relative(self->loop, &self->events.flush, - CLOCK_MONOTONIC, FLUSH_QUEUE_INTERVAL, 0, collecty_daemon_flush_timer, self); - if (r < 0) { - ERROR(self->ctx, "Failed to set up the flush timer: %s\n", strerror(-r)); - return r; - } - - return 0; -} - static void collecty_daemon_free(collecty_daemon* self) { - struct collecty_queue_object* o = NULL; - - // Cleanup the queue - for (;;) { - o = STAILQ_FIRST(&self->queue); - if (!o) - break; - - // Remove the object from the queue - STAILQ_REMOVE_HEAD(&self->queue, nodes); - - // Free the object - collecty_daemon_free_queue_object(o); - } - - if (self->events.flush) - sd_event_source_unref(self->events.flush); if (self->events.modules_init) sd_event_source_unref(self->events.modules_init); if (self->events.sigterm) @@ -223,6 +133,8 @@ static void collecty_daemon_free(collecty_daemon* self) { sd_event_source_unref(self->events.sigint); if (self->events.sighup) sd_event_source_unref(self->events.sighup); + if (self->queue) + collecty_queue_unref(self->queue); if (self->ctx) collecty_ctx_unref(self->ctx); if (self->loop) @@ -251,7 +163,7 @@ int collecty_daemon_create(collecty_daemon** daemon, collecty_ctx* ctx) { goto ERROR; // Setup the write queue - r = collecty_daemon_setup_queue(self); + r = collecty_queue_create(&self->queue, self->ctx, self); if (r < 0) goto ERROR; @@ -307,61 +219,7 @@ ERROR: return 1; } -/* - Submits a new reading into the queue -*/ int collecty_daemon_submit(collecty_daemon* self, collecty_module* module, const char* object, const char* value) { - struct collecty_queue_object* o = NULL; - int r; - - // Check inputs - if (!value) - return -EINVAL; - - // Allocate some memory - o = calloc(1, sizeof(*self)); - if (!o) - return -errno; - - // Reference the module - o->module = collecty_module_ref(module); - - // Fetch the current timestamp - r = gettimeofday(&o->t, NULL); - if (r < 0) { - r = -errno; - goto ERROR; - } - - // Store the object - if (o->object) { - o->object = strdup(object); - if (!o->object) { - r = -errno; - goto ERROR; - } - } - - // Store the value - o->value = strdup(value); - if (!o->value) { - r = -errno; - goto ERROR; - } - - // Append the object to the queue - STAILQ_INSERT_TAIL(&self->queue, o, nodes); - - // Log action - DEBUG(self->ctx, "%s(%s) submitted: %s\n", - collecty_module_name(module), (o->object) ? o->object : "", o->value); - - return 0; - -ERROR: - if (o) - collecty_daemon_free_queue_object(o); - - return r; + return collecty_queue_submit(self->queue, module, object, value); } diff --git a/src/daemon/queue.c b/src/daemon/queue.c new file mode 100644 index 0000000..c340f1a --- /dev/null +++ b/src/daemon/queue.c @@ -0,0 +1,237 @@ +/*############################################################################# +# # +# collecty - A system statistics collection daemon for IPFire # +# Copyright (C) 2025 IPFire Development Team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +#############################################################################*/ + +#include +#include +#include +#include +#include + +#include "ctx.h" +#include "daemon.h" +#include "queue.h" + +#define HEARTBEAT 60000000 // 60s (for testing) + +struct collecty_queue_object { + STAILQ_ENTRY(collecty_queue_object) nodes; + + // Module + collecty_module* module; + + // Object + char* object; + + // Timestamp + struct timeval t; + + // Value + char* value; +}; + +struct collecty_queue { + collecty_ctx* ctx; + int nrefs; + + // Event Loop + sd_event* loop; + + // Events + struct { + sd_event_source* flush; + } events; + + // Queue + STAILQ_HEAD(queue, collecty_queue_object) queue; +}; + +static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) { + collecty_queue* self = data; + int r; + + // Call again soon... + r = sd_event_source_set_time(source, usec + HEARTBEAT); + if (r < 0) + return r; + + // Enable the timer + r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT); + if (r < 0) + return r; + + // Flush the queue + return collecty_queue_flush(self); +} + +static void collecty_queue_free_object(struct collecty_queue_object* o) { + if (o->module) + collecty_module_unref(o->module); + if (o->object) + free(o->object); + if (o->value) + free(o->value); + free(o); +} + +static void collecty_queue_free(collecty_queue* self) { + struct collecty_queue_object* o = NULL; + + // Cleanup the queue + for (;;) { + o = STAILQ_FIRST(&self->queue); + if (!o) + break; + + // Remove the object from the queue + STAILQ_REMOVE_HEAD(&self->queue, nodes); + + // Free the object + collecty_queue_free_object(o); + } + + if (self->events.flush) + sd_event_source_unref(self->events.flush); + if (self->ctx) + collecty_ctx_unref(self->ctx); + if (self->loop) + sd_event_unref(self->loop); + free(self); +} + +int collecty_queue_create(collecty_queue** queue, + collecty_ctx* ctx, collecty_daemon* daemon) { + collecty_queue* self = NULL; + int r; + + // Allocate some memory + self = calloc(1, sizeof(*self)); + if (!self) + return -errno; + + // Initialize the reference counter + self->nrefs = 1; + + // Store a reference to the context + self->ctx = collecty_ctx_ref(ctx); + + // Fetch a reference to the event loop + self->loop = collecty_daemon_loop(daemon); + + // Initialize the queue + STAILQ_INIT(&self->queue); + + // Create a timer which regularly flushes the queue + r = sd_event_add_time_relative(self->loop, &self->events.flush, + CLOCK_MONOTONIC, HEARTBEAT, 0, collecty_queue_heartbeat, self); + if (r < 0) { + ERROR(self->ctx, "Failed to set up the heartbeat timer: %s\n", strerror(-r)); + goto ERROR; + } + + // Return the pointer + *queue = self; + return 0; + +ERROR: + if (self) + collecty_queue_unref(self); + + return r; +} + +collecty_queue* collecty_queue_ref(collecty_queue* self) { + ++self->nrefs; + return self; +} + +collecty_queue* collecty_queue_unref(collecty_queue* self) { + if (--self->nrefs > 0) + return self; + + collecty_queue_free(self); + return NULL; +} + +/* + Submits a new reading into the queue +*/ +int collecty_queue_submit(collecty_queue* self, + collecty_module* module, const char* object, const char* value) { + struct collecty_queue_object* o = NULL; + int r; + + // Check inputs + if (!value) + return -EINVAL; + + // Allocate some memory + o = calloc(1, sizeof(*self)); + if (!o) + return -errno; + + // Reference the module + o->module = collecty_module_ref(module); + + // Fetch the current timestamp + r = gettimeofday(&o->t, NULL); + if (r < 0) { + r = -errno; + goto ERROR; + } + + // Store the object + if (o->object) { + o->object = strdup(object); + if (!o->object) { + r = -errno; + goto ERROR; + } + } + + // Store the value + o->value = strdup(value); + if (!o->value) { + r = -errno; + goto ERROR; + } + + // Append the object to the queue + STAILQ_INSERT_TAIL(&self->queue, o, nodes); + + // Log action + DEBUG(self->ctx, "%s(%s) submitted: %s\n", + collecty_module_name(module), (o->object) ? o->object : "", o->value); + + return 0; + +ERROR: + if (o) + collecty_queue_free_object(o); + + return r; +} + +int collecty_queue_flush(collecty_queue* self) { + DEBUG(self->ctx, "Flushing the queue...\n"); + + // XXX TODO + + return 0; +} diff --git a/src/daemon/queue.h b/src/daemon/queue.h new file mode 100644 index 0000000..72ae6cd --- /dev/null +++ b/src/daemon/queue.h @@ -0,0 +1,41 @@ +/*############################################################################# +# # +# collecty - A system statistics collection daemon for IPFire # +# Copyright (C) 2025 IPFire Development Team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +#############################################################################*/ + +#ifndef COLLECTY_QUEUE_H +#define COLLECTY_QUEUE_H + +typedef struct collecty_queue collecty_queue; + +#include "ctx.h" +#include "daemon.h" +#include "module.h" + +int collecty_queue_create(collecty_queue** queue, + collecty_ctx* ctx, collecty_daemon* daemon); + +collecty_queue* collecty_queue_ref(collecty_queue* self); +collecty_queue* collecty_queue_unref(collecty_queue* self); + +int collecty_queue_submit(collecty_queue* self, + collecty_module* module, const char* object, const char* value); + +int collecty_queue_flush(collecty_queue* self); + +#endif /* COLLECTY_QUEUE_H */