]> git.ipfire.org Git - telemetry.git/commitdiff
daemon: Move the queue into a separate object
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 16:07:58 +0000 (16:07 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 16:07:58 +0000 (16:07 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/daemon/daemon.c
src/daemon/queue.c [new file with mode: 0644]
src/daemon/queue.h [new file with mode: 0644]

index 2b6802c8dcc57032976a03179aaa98695cfc43ac..b4e73bad26c0edc3e238a03c7fe7b2e811e11f60 100644 (file)
@@ -101,7 +101,9 @@ dist_collectyd_SOURCES = \
        src/daemon/modules.c \
        src/daemon/modules.h \
        src/daemon/modules/loadavg.c \
-       src/daemon/modules/loadavg.h
+       src/daemon/modules/loadavg.h \
+       src/daemon/queue.c \
+       src/daemon/queue.h
 
 collectyd_CPPFLAGS = \
        $(AM_CPPFLAGS)
index 3f41aa02f9b3ffb62e8be482f3d7c3d5bad5849b..a81787c08821f161a5c1a8a391a8198f344aedd8 100644 (file)
@@ -21,8 +21,6 @@
 #include <errno.h>
 #include <stdlib.h>
 #include <string.h>
-#include <sys/queue.h>
-#include <sys/time.h>
 
 #include <systemd/sd-daemon.h>
 #include <systemd/sd-event.h>
 #include "daemon.h"
 #include "module.h"
 #include "modules.h"
-
-#define FLUSH_QUEUE_INTERVAL 60000000 // 60s (for testing)
-
-struct collecty_queue_object {
-       STAILQ_ENTRY(collecty_queue_object) nodes;
-
-       // Module
-       collecty_module* module;
-
-       // Object
-       char* object;
-
-       // Timestamp
-       struct timeval t;
-
-       // Value
-       char* value;
-};
+#include "queue.h"
 
 struct collecty_daemon {
        collecty_ctx* ctx;
@@ -63,21 +44,12 @@ struct collecty_daemon {
                sd_event_source* sigint;
                sd_event_source* sighup;
                sd_event_source* modules_init;
-               sd_event_source* flush;
        } events;
 
-       // Write Queue
-       STAILQ_HEAD(queue, collecty_queue_object) queue;
+       // Queue
+       collecty_queue* queue;
 };
 
-static int collecty_daemon_flush(collecty_daemon* self) {
-       DEBUG(self->ctx, "Flushing the queue...\n");
-
-       // XXX TODO
-
-       return 0;
-}
-
 static int collecty_daemon_modules_init(sd_event_source* source, void* data) {
        collecty_daemon* daemon = data;
 
@@ -99,7 +71,7 @@ static int collecty_daemon_SIGHUP(sd_event_source* source,
        collecty_daemon* self = data;
 
        // Flush the queue
-       return collecty_daemon_flush(self);
+       return collecty_queue_flush(self->queue);
 }
 
 static int collecty_daemon_setup_loop(collecty_daemon* self) {
@@ -152,69 +124,7 @@ static int collecty_daemon_setup_loop(collecty_daemon* self) {
        return 0;
 }
 
-static void collecty_daemon_free_queue_object(struct collecty_queue_object* o) {
-       if (o->module)
-               collecty_module_unref(o->module);
-       if (o->object)
-               free(o->object);
-       if (o->value)
-               free(o->value);
-       free(o);
-}
-
-static int collecty_daemon_flush_timer(sd_event_source* source, uint64_t usec, void* data) {
-       collecty_daemon* self = data;
-       int r;
-
-       // Call again soon...
-       r = sd_event_source_set_time(source, usec + FLUSH_QUEUE_INTERVAL);
-       if (r < 0)
-               return r;
-
-       // Enable the timer
-       r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
-       if (r < 0)
-               return r;
-
-       // Flush the queue
-       return collecty_daemon_flush(self);
-}
-
-static int collecty_daemon_setup_queue(collecty_daemon* self) {
-       int r;
-
-       // Initialize the queue
-       STAILQ_INIT(&self->queue);
-
-       // Create a timer which regularly flushes the queue
-       r = sd_event_add_time_relative(self->loop, &self->events.flush,
-               CLOCK_MONOTONIC, FLUSH_QUEUE_INTERVAL, 0, collecty_daemon_flush_timer, self);
-       if (r < 0) {
-               ERROR(self->ctx, "Failed to set up the flush timer: %s\n", strerror(-r));
-               return r;
-       }
-
-       return 0;
-}
-
 static void collecty_daemon_free(collecty_daemon* self) {
-       struct collecty_queue_object* o = NULL;
-
-       // Cleanup the queue
-       for (;;) {
-               o = STAILQ_FIRST(&self->queue);
-               if (!o)
-                       break;
-
-               // Remove the object from the queue
-               STAILQ_REMOVE_HEAD(&self->queue, nodes);
-
-               // Free the object
-               collecty_daemon_free_queue_object(o);
-       }
-
-       if (self->events.flush)
-               sd_event_source_unref(self->events.flush);
        if (self->events.modules_init)
                sd_event_source_unref(self->events.modules_init);
        if (self->events.sigterm)
@@ -223,6 +133,8 @@ static void collecty_daemon_free(collecty_daemon* self) {
                sd_event_source_unref(self->events.sigint);
        if (self->events.sighup)
                sd_event_source_unref(self->events.sighup);
+       if (self->queue)
+               collecty_queue_unref(self->queue);
        if (self->ctx)
                collecty_ctx_unref(self->ctx);
        if (self->loop)
@@ -251,7 +163,7 @@ int collecty_daemon_create(collecty_daemon** daemon, collecty_ctx* ctx) {
                goto ERROR;
 
        // Setup the write queue
-       r = collecty_daemon_setup_queue(self);
+       r = collecty_queue_create(&self->queue, self->ctx, self);
        if (r < 0)
                goto ERROR;
 
@@ -307,61 +219,7 @@ ERROR:
        return 1;
 }
 
-/*
-       Submits a new reading into the queue
-*/
 int collecty_daemon_submit(collecty_daemon* self,
                collecty_module* module, const char* object, const char* value) {
-       struct collecty_queue_object* o = NULL;
-       int r;
-
-       // Check inputs
-       if (!value)
-               return -EINVAL;
-
-       // Allocate some memory
-       o = calloc(1, sizeof(*self));
-       if (!o)
-               return -errno;
-
-       // Reference the module
-       o->module = collecty_module_ref(module);
-
-       // Fetch the current timestamp
-       r = gettimeofday(&o->t, NULL);
-       if (r < 0) {
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Store the object
-       if (o->object) {
-               o->object = strdup(object);
-               if (!o->object) {
-                       r = -errno;
-                       goto ERROR;
-               }
-       }
-
-       // Store the value
-       o->value = strdup(value);
-       if (!o->value) {
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Append the object to the queue
-       STAILQ_INSERT_TAIL(&self->queue, o, nodes);
-
-       // Log action
-       DEBUG(self->ctx, "%s(%s) submitted: %s\n",
-               collecty_module_name(module), (o->object) ? o->object : "", o->value);
-
-       return 0;
-
-ERROR:
-       if (o)
-               collecty_daemon_free_queue_object(o);
-
-       return r;
+       return collecty_queue_submit(self->queue, module, object, value);
 }
diff --git a/src/daemon/queue.c b/src/daemon/queue.c
new file mode 100644 (file)
index 0000000..c340f1a
--- /dev/null
@@ -0,0 +1,237 @@
+/*#############################################################################
+#                                                                             #
+# collecty - A system statistics collection daemon for IPFire                 #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <sys/time.h>
+
+#include "ctx.h"
+#include "daemon.h"
+#include "queue.h"
+
+#define HEARTBEAT 60000000 // 60s (for testing)
+
+struct collecty_queue_object {
+       STAILQ_ENTRY(collecty_queue_object) nodes;
+
+       // Module
+       collecty_module* module;
+
+       // Object
+       char* object;
+
+       // Timestamp
+       struct timeval t;
+
+       // Value
+       char* value;
+};
+
+struct collecty_queue {
+       collecty_ctx* ctx;
+       int nrefs;
+
+       // Event Loop
+       sd_event* loop;
+
+       // Events
+       struct {
+               sd_event_source* flush;
+       } events;
+
+       // Queue
+       STAILQ_HEAD(queue, collecty_queue_object) queue;
+};
+
+static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
+       collecty_queue* self = data;
+       int r;
+
+       // Call again soon...
+       r = sd_event_source_set_time(source, usec + HEARTBEAT);
+       if (r < 0)
+               return r;
+
+       // Enable the timer
+       r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
+       if (r < 0)
+               return r;
+
+       // Flush the queue
+       return collecty_queue_flush(self);
+}
+
+static void collecty_queue_free_object(struct collecty_queue_object* o) {
+       if (o->module)
+               collecty_module_unref(o->module);
+       if (o->object)
+               free(o->object);
+       if (o->value)
+               free(o->value);
+       free(o);
+}
+
+static void collecty_queue_free(collecty_queue* self) {
+       struct collecty_queue_object* o = NULL;
+
+       // Cleanup the queue
+       for (;;) {
+               o = STAILQ_FIRST(&self->queue);
+               if (!o)
+                       break;
+
+               // Remove the object from the queue
+               STAILQ_REMOVE_HEAD(&self->queue, nodes);
+
+               // Free the object
+               collecty_queue_free_object(o);
+       }
+
+       if (self->events.flush)
+               sd_event_source_unref(self->events.flush);
+       if (self->ctx)
+               collecty_ctx_unref(self->ctx);
+       if (self->loop)
+               sd_event_unref(self->loop);
+       free(self);
+}
+
+int collecty_queue_create(collecty_queue** queue,
+               collecty_ctx* ctx, collecty_daemon* daemon) {
+       collecty_queue* self = NULL;
+       int r;
+
+       // Allocate some memory
+       self = calloc(1, sizeof(*self));
+       if (!self)
+               return -errno;
+
+       // Initialize the reference counter
+       self->nrefs = 1;
+
+       // Store a reference to the context
+       self->ctx = collecty_ctx_ref(ctx);
+
+       // Fetch a reference to the event loop
+       self->loop = collecty_daemon_loop(daemon);
+
+       // Initialize the queue
+       STAILQ_INIT(&self->queue);
+
+       // Create a timer which regularly flushes the queue
+       r = sd_event_add_time_relative(self->loop, &self->events.flush,
+               CLOCK_MONOTONIC, HEARTBEAT, 0, collecty_queue_heartbeat, self);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to set up the heartbeat timer: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Return the pointer
+       *queue = self;
+       return 0;
+
+ERROR:
+       if (self)
+               collecty_queue_unref(self);
+
+       return r;
+}
+
+collecty_queue* collecty_queue_ref(collecty_queue* self) {
+       ++self->nrefs;
+       return self;
+}
+
+collecty_queue* collecty_queue_unref(collecty_queue* self) {
+       if (--self->nrefs > 0)
+               return self;
+
+       collecty_queue_free(self);
+       return NULL;
+}
+
+/*
+       Submits a new reading into the queue
+*/
+int collecty_queue_submit(collecty_queue* self,
+               collecty_module* module, const char* object, const char* value) {
+       struct collecty_queue_object* o = NULL;
+       int r;
+
+       // Check inputs
+       if (!value)
+               return -EINVAL;
+
+       // Allocate some memory
+       o = calloc(1, sizeof(*self));
+       if (!o)
+               return -errno;
+
+       // Reference the module
+       o->module = collecty_module_ref(module);
+
+       // Fetch the current timestamp
+       r = gettimeofday(&o->t, NULL);
+       if (r < 0) {
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Store the object
+       if (o->object) {
+               o->object = strdup(object);
+               if (!o->object) {
+                       r = -errno;
+                       goto ERROR;
+               }
+       }
+
+       // Store the value
+       o->value = strdup(value);
+       if (!o->value) {
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Append the object to the queue
+       STAILQ_INSERT_TAIL(&self->queue, o, nodes);
+
+       // Log action
+       DEBUG(self->ctx, "%s(%s) submitted: %s\n",
+               collecty_module_name(module), (o->object) ? o->object : "", o->value);
+
+       return 0;
+
+ERROR:
+       if (o)
+               collecty_queue_free_object(o);
+
+       return r;
+}
+
+int collecty_queue_flush(collecty_queue* self) {
+       DEBUG(self->ctx, "Flushing the queue...\n");
+
+       // XXX TODO
+
+       return 0;
+}
diff --git a/src/daemon/queue.h b/src/daemon/queue.h
new file mode 100644 (file)
index 0000000..72ae6cd
--- /dev/null
@@ -0,0 +1,41 @@
+/*#############################################################################
+#                                                                             #
+# collecty - A system statistics collection daemon for IPFire                 #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#ifndef COLLECTY_QUEUE_H
+#define COLLECTY_QUEUE_H
+
+typedef struct collecty_queue collecty_queue;
+
+#include "ctx.h"
+#include "daemon.h"
+#include "module.h"
+
+int collecty_queue_create(collecty_queue** queue,
+               collecty_ctx* ctx, collecty_daemon* daemon);
+
+collecty_queue* collecty_queue_ref(collecty_queue* self);
+collecty_queue* collecty_queue_unref(collecty_queue* self);
+
+int collecty_queue_submit(collecty_queue* self,
+       collecty_module* module, const char* object, const char* value);
+
+int collecty_queue_flush(collecty_queue* self);
+
+#endif /* COLLECTY_QUEUE_H */