From: Michael Tremer Date: Sat, 27 Sep 2025 16:54:52 +0000 (+0000) Subject: queue: Store multiple samples in one queue object X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=83c1dcbe6a18ef4519352f12a59c5ea180765bc6;p=collecty.git queue: Store multiple samples in one queue object Signed-off-by: Michael Tremer --- diff --git a/src/daemon/daemon.c b/src/daemon/daemon.c index a81787c..f49f349 100644 --- a/src/daemon/daemon.c +++ b/src/daemon/daemon.c @@ -221,5 +221,9 @@ ERROR: int collecty_daemon_submit(collecty_daemon* self, collecty_module* module, const char* object, const char* value) { + // Log action + DEBUG(self->ctx, "%s(%s) submitted: %s\n", + collecty_module_name(module), (object) ? object : "", value); + return collecty_queue_submit(self->queue, module, object, value); } diff --git a/src/daemon/queue.c b/src/daemon/queue.c index e284881..162d2ee 100644 --- a/src/daemon/queue.c +++ b/src/daemon/queue.c @@ -42,8 +42,9 @@ struct collecty_queue_object { // Timestamp struct timeval t; - // Value - char* value; + // Samples + char** samples; + unsigned int num_samples; }; struct collecty_queue { @@ -89,12 +90,16 @@ static int collecty_queue_exit(sd_event_source* source, void* data) { } static void collecty_queue_free_object(struct collecty_queue_object* o) { + if (o->samples) { + for (unsigned int i = 0; i < o->num_samples; i++) + free(o->samples[i]); + free(o->samples); + } + if (o->module) collecty_module_unref(o->module); if (o->object) free(o->object); - if (o->value) - free(o->value); free(o); } @@ -186,18 +191,70 @@ collecty_queue* collecty_queue_unref(collecty_queue* self) { return NULL; } +static struct collecty_queue_object* collecty_queue_find_object( + collecty_queue* self, collecty_module* module, const char* object) { + struct collecty_queue_object* o = NULL; + + STAILQ_FOREACH(o, &self->queue, nodes) { + // The module must match + if (o->module != module) + continue; + + // If both objects are NULL we have a match + if (!o->object && !object) + return o; + + // If both have objects, we need to compare + else if (o->object && object) + if (strcmp(o->object, object) == 0) + return o; + } + + // No match found + return NULL; +} + +static int collecty_queue_object_append_sample(collecty_queue* self, collecty_module* module, + const char* object, struct collecty_queue_object* o, const char* sample) { + char** samples = NULL; + char* s = NULL; + + // Increase the size of the array + samples = reallocarray(o->samples, o->num_samples + 1, sizeof(*o->samples)); + if (!samples) + return -errno; + + // Copy the sample to the heap + s = strdup(sample); + if (!s) + return -errno; + + // Assign the sample + samples[o->num_samples++] = s; + + // Replace the array + o->samples = samples; + + return 0; +} + /* Submits a new reading into the queue */ int collecty_queue_submit(collecty_queue* self, - collecty_module* module, const char* object, const char* value) { + collecty_module* module, const char* object, const char* sample) { struct collecty_queue_object* o = NULL; int r; // Check inputs - if (!value) + if (!sample) return -EINVAL; + // Check if we can append the sample + o = collecty_queue_find_object(self, module, object); + if (o) + return collecty_queue_object_append_sample(self, module, object, o, sample); + // Allocate some memory o = calloc(1, sizeof(*self)); if (!o) @@ -222,20 +279,14 @@ int collecty_queue_submit(collecty_queue* self, } } - // Store the value - o->value = strdup(value); - if (!o->value) { - r = -errno; + // Store the sample + r = collecty_queue_object_append_sample(self, module, object, o, sample); + if (r < 0) goto ERROR; - } // Append the object to the queue STAILQ_INSERT_TAIL(&self->queue, o, nodes); - // Log action - DEBUG(self->ctx, "%s(%s) submitted: %s\n", - collecty_module_name(module), (o->object) ? o->object : "", o->value); - return 0; ERROR: diff --git a/src/daemon/queue.h b/src/daemon/queue.h index 72ae6cd..9844c3e 100644 --- a/src/daemon/queue.h +++ b/src/daemon/queue.h @@ -34,7 +34,7 @@ collecty_queue* collecty_queue_ref(collecty_queue* self); collecty_queue* collecty_queue_unref(collecty_queue* self); int collecty_queue_submit(collecty_queue* self, - collecty_module* module, const char* object, const char* value); + collecty_module* module, const char* object, const char* sample); int collecty_queue_flush(collecty_queue* self);