]> git.ipfire.org Git - collecty.git/commitdiff
queue: Store multiple samples in one queue object
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 16:54:52 +0000 (16:54 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 27 Sep 2025 16:54:52 +0000 (16:54 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/daemon/daemon.c
src/daemon/queue.c
src/daemon/queue.h

index a81787c08821f161a5c1a8a391a8198f344aedd8..f49f349b5e43fea5bec3c838c2dd1947418ee3dc 100644 (file)
@@ -221,5 +221,9 @@ ERROR:
 
 int collecty_daemon_submit(collecty_daemon* self,
                collecty_module* module, const char* object, const char* value) {
+       // Log action
+       DEBUG(self->ctx, "%s(%s) submitted: %s\n",
+               collecty_module_name(module), (object) ? object : "", value);
+
        return collecty_queue_submit(self->queue, module, object, value);
 }
index e2848816a52953227797357247e3da22420ff76e..162d2ee6dabd77f8272491674d49caba35575311 100644 (file)
@@ -42,8 +42,9 @@ struct collecty_queue_object {
        // Timestamp
        struct timeval t;
 
-       // Value
-       char* value;
+       // Samples
+       char** samples;
+       unsigned int num_samples;
 };
 
 struct collecty_queue {
@@ -89,12 +90,16 @@ static int collecty_queue_exit(sd_event_source* source, void* data) {
 }
 
 static void collecty_queue_free_object(struct collecty_queue_object* o) {
+       if (o->samples) {
+               for (unsigned int i = 0; i < o->num_samples; i++)
+                       free(o->samples[i]);
+               free(o->samples);
+       }
+
        if (o->module)
                collecty_module_unref(o->module);
        if (o->object)
                free(o->object);
-       if (o->value)
-               free(o->value);
        free(o);
 }
 
@@ -186,18 +191,70 @@ collecty_queue* collecty_queue_unref(collecty_queue* self) {
        return NULL;
 }
 
+static struct collecty_queue_object* collecty_queue_find_object(
+               collecty_queue* self, collecty_module* module, const char* object) {
+       struct collecty_queue_object* o = NULL;
+
+       STAILQ_FOREACH(o, &self->queue, nodes) {
+               // The module must match
+               if (o->module != module)
+                       continue;
+
+               // If both objects are NULL we have a match
+               if (!o->object && !object)
+                       return o;
+
+               // If both have objects, we need to compare
+               else if (o->object && object)
+                       if (strcmp(o->object, object) == 0)
+                               return o;
+       }
+
+       // No match found
+       return NULL;
+}
+
+static int collecty_queue_object_append_sample(collecty_queue* self, collecty_module* module,
+               const char* object, struct collecty_queue_object* o, const char* sample) {
+       char** samples = NULL;
+       char* s = NULL;
+
+       // Increase the size of the array
+       samples = reallocarray(o->samples, o->num_samples + 1, sizeof(*o->samples));
+       if (!samples)
+               return -errno;
+
+       // Copy the sample to the heap
+       s = strdup(sample);
+       if (!s)
+               return -errno;
+
+       // Assign the sample
+       samples[o->num_samples++] = s;
+
+       // Replace the array
+       o->samples = samples;
+
+       return 0;
+}
+
 /*
        Submits a new reading into the queue
 */
 int collecty_queue_submit(collecty_queue* self,
-               collecty_module* module, const char* object, const char* value) {
+               collecty_module* module, const char* object, const char* sample) {
        struct collecty_queue_object* o = NULL;
        int r;
 
        // Check inputs
-       if (!value)
+       if (!sample)
                return -EINVAL;
 
+       // Check if we can append the sample
+       o = collecty_queue_find_object(self, module, object);
+       if (o)
+               return collecty_queue_object_append_sample(self, module, object, o, sample);
+
        // Allocate some memory
        o = calloc(1, sizeof(*self));
        if (!o)
@@ -222,20 +279,14 @@ int collecty_queue_submit(collecty_queue* self,
                }
        }
 
-       // Store the value
-       o->value = strdup(value);
-       if (!o->value) {
-               r = -errno;
+       // Store the sample
+       r = collecty_queue_object_append_sample(self, module, object, o, sample);
+       if (r < 0)
                goto ERROR;
-       }
 
        // Append the object to the queue
        STAILQ_INSERT_TAIL(&self->queue, o, nodes);
 
-       // Log action
-       DEBUG(self->ctx, "%s(%s) submitted: %s\n",
-               collecty_module_name(module), (o->object) ? o->object : "", o->value);
-
        return 0;
 
 ERROR:
index 72ae6cd4f017b57148d1346d9441a978fcf9b7bc..9844c3eff45277ca73f42710d1df3aaf6c866bfe 100644 (file)
@@ -34,7 +34,7 @@ collecty_queue* collecty_queue_ref(collecty_queue* self);
 collecty_queue* collecty_queue_unref(collecty_queue* self);
 
 int collecty_queue_submit(collecty_queue* self,
-       collecty_module* module, const char* object, const char* value);
+       collecty_module* module, const char* object, const char* sample);
 
 int collecty_queue_flush(collecty_queue* self);