From: Michael Tremer Date: Sat, 27 Sep 2025 17:58:06 +0000 (+0000) Subject: module: Implement batched committing of samples to the database X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=51574feefb0d11a5407e816c3f3f76be8fb7b4ec;p=telemetry.git module: Implement batched committing of samples to the database Signed-off-by: Michael Tremer --- diff --git a/configure.ac b/configure.ac index 7408325..e6d761c 100644 --- a/configure.ac +++ b/configure.ac @@ -183,6 +183,11 @@ AC_SUBST([dbussystemservicedir], [$with_dbussystemservicedir]) # ------------------------------------------------------------------------------ +AC_DEFINE_UNQUOTED([DATABASE_PATH], ["/var/lib/collecty"], + [The path where the RRD databases are being stored]) + +# ------------------------------------------------------------------------------ + AC_CONFIG_FILES([ Makefile po/Makefile.in diff --git a/src/daemon/module.c b/src/daemon/module.c index da1d45f..8b9cd6a 100644 --- a/src/daemon/module.c +++ b/src/daemon/module.c @@ -19,10 +19,13 @@ #############################################################################*/ #include +#include #include #include #include +#include + #include #include "ctx.h" @@ -221,6 +224,28 @@ const char* collecty_module_name(collecty_module* self) { return self->methods->name; } +#define collecty_module_path(module, object, path) \ + __collecty_module_path(module, object, path, sizeof(path)) + +static int __collecty_module_path(collecty_module* self, + const char* object, char* path, size_t length) { + int r; + + // Fetch the module name + const char* name = collecty_module_name(self); + + if (object) + r = snprintf(path, length, "%s/%s-%s.rrd", DATABASE_PATH, name, object); + else + r = snprintf(path, length, "%s/%s.rrd", DATABASE_PATH, name); + + // Handle errors + if (r < 0) + return -errno; + + return 0; +} + /* Called when a module has some data to submit */ @@ -242,3 +267,26 @@ int collecty_module_submit(collecty_module* self, // Submit the data to the daemon return collecty_daemon_submit(self->daemon, self, object, value); } + +/* + Called to write all collected samples to disk +*/ +int collecty_module_commit(collecty_module* self, + const char* object, unsigned int num_samples, const char** samples) { + char path[PATH_MAX]; + int r; + + // Make the path + r = collecty_module_path(self, object, path); + if (r < 0) + return r; + + // Write the samples + r = rrd_update_r(path, NULL, num_samples, samples); + if (r < 0) { + ERROR(self->ctx, "Failed to write to %s: %s\n", path, rrd_get_error()); + return -EFAULT; + } + + return 0; +} diff --git a/src/daemon/module.h b/src/daemon/module.h index db0180a..db8e706 100644 --- a/src/daemon/module.h +++ b/src/daemon/module.h @@ -50,4 +50,7 @@ const char* collecty_module_name(collecty_module* self); int collecty_module_submit(collecty_module* self, const char* object, const char* format, ...) __attribute__((format(printf, 3, 4))); +int collecty_module_commit(collecty_module* self, + const char* object, unsigned int num_samples, const char** samples); + #endif /* COLLECTY_MODULE_H */ diff --git a/src/daemon/queue.c b/src/daemon/queue.c index d032ec0..5d5f54f 100644 --- a/src/daemon/queue.c +++ b/src/daemon/queue.c @@ -295,9 +295,29 @@ ERROR: } int collecty_queue_flush(collecty_queue* self) { + struct collecty_queue_object* o = NULL; + int r; + DEBUG(self->ctx, "Flushing the queue...\n"); - // XXX TODO + for (;;) { + o = STAILQ_FIRST(&self->queue); + if (!o) + break; + + // Call the module to write its data + r = collecty_module_commit(o->module, o->object, o->num_samples, (const char**)o->samples); + if (r < 0) { + ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n", + collecty_module_name(o->module), (o->object) ? o->object : NULL, strerror(-r)); + } + + // Remove the object from the queue + STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes); + + // Free the object + collecty_queue_free_object(o); + } return 0; }