]> git.ipfire.org Git - collecty.git/commitdiff
module: Implement batched committing of samples to the database
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 17:58:06 +0000 (17:58 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 17:58:55 +0000 (17:58 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
configure.ac
src/daemon/module.c
src/daemon/module.h
src/daemon/queue.c

index 7408325c323bd645eaa05599cca441e726a0f7ee..e6d761c4c18723d90f1ba40fb73f9efb6d57d6b8 100644 (file)
@@ -183,6 +183,11 @@ AC_SUBST([dbussystemservicedir], [$with_dbussystemservicedir])
 
 # ------------------------------------------------------------------------------
 
+AC_DEFINE_UNQUOTED([DATABASE_PATH], ["/var/lib/collecty"],
+       [The path where the RRD databases are being stored])
+
+# ------------------------------------------------------------------------------
+
 AC_CONFIG_FILES([
        Makefile
        po/Makefile.in
index da1d45fa11b6a7e4591eb72ce77bdf152fc4a11e..8b9cd6ae5dabcbdbffdcdbefb05256981c7ae91b 100644 (file)
 #############################################################################*/
 
 #include <errno.h>
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
+#include <rrd.h>
+
 #include <systemd/sd-event.h>
 
 #include "ctx.h"
@@ -221,6 +224,28 @@ const char* collecty_module_name(collecty_module* self) {
        return self->methods->name;
 }
 
+#define collecty_module_path(module, object, path) \
+       __collecty_module_path(module, object, path, sizeof(path))
+
+static int __collecty_module_path(collecty_module* self,
+               const char* object, char* path, size_t length) {
+       int r;
+
+       // Fetch the module name
+       const char* name = collecty_module_name(self);
+
+       if (object)
+               r = snprintf(path, length, "%s/%s-%s.rrd", DATABASE_PATH, name, object);
+       else
+               r = snprintf(path, length, "%s/%s.rrd", DATABASE_PATH, name);
+
+       // Handle errors
+       if (r < 0)
+               return -errno;
+
+       return 0;
+}
+
 /*
        Called when a module has some data to submit
 */
@@ -242,3 +267,26 @@ int collecty_module_submit(collecty_module* self,
        // Submit the data to the daemon
        return collecty_daemon_submit(self->daemon, self, object, value);
 }
+
+/*
+       Called to write all collected samples to disk
+*/
+int collecty_module_commit(collecty_module* self,
+               const char* object, unsigned int num_samples, const char** samples) {
+       char path[PATH_MAX];
+       int r;
+
+       // Make the path
+       r = collecty_module_path(self, object, path);
+       if (r < 0)
+               return r;
+
+       // Write the samples
+       r = rrd_update_r(path, NULL, num_samples, samples);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to write to %s: %s\n", path, rrd_get_error());
+               return -EFAULT;
+       }
+
+       return 0;
+}
index db0180a7df8992d1460d67a38d8ef6d11cec9f65..db8e70632d2ef39e14f0e4d9e477fba4c31864a4 100644 (file)
@@ -50,4 +50,7 @@ const char* collecty_module_name(collecty_module* self);
 int collecty_module_submit(collecty_module* self, const char* object,
        const char* format, ...) __attribute__((format(printf, 3, 4)));
 
+int collecty_module_commit(collecty_module* self,
+       const char* object, unsigned int num_samples, const char** samples);
+
 #endif /* COLLECTY_MODULE_H */
index d032ec0ceeed7292670dcc6cf3cc7120f7161ee4..5d5f54f7735f23c2cd11ccaee982131108e2ff45 100644 (file)
@@ -295,9 +295,29 @@ ERROR:
 }
 
 int collecty_queue_flush(collecty_queue* self) {
+       struct collecty_queue_object* o = NULL;
+       int r;
+
        DEBUG(self->ctx, "Flushing the queue...\n");
 
-       // XXX TODO
+       for (;;) {
+               o = STAILQ_FIRST(&self->queue);
+               if (!o)
+                       break;
+
+               // Call the module to write its data
+               r = collecty_module_commit(o->module, o->object, o->num_samples, (const char**)o->samples);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+                               collecty_module_name(o->module), (o->object) ? o->object : NULL, strerror(-r));
+               }
+
+               // Remove the object from the queue
+               STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+
+               // Free the object
+               collecty_queue_free_object(o);
+       }
 
        return 0;
 }