]> git.ipfire.org Git - telemetry.git/commitdiff
queue: Flush all data when we want to draw a graph
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 8 Oct 2025 20:06:35 +0000 (20:06 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 8 Oct 2025 20:06:35 +0000 (20:06 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/daemon/daemon.c
src/daemon/daemon.h
src/daemon/queue.c
src/daemon/queue.h
src/daemon/source.c

index 7405061b726c74f3d66522e19963eabdc0728e9a..c9d67be6e499f10f023a590a462cc4dd353c6d7b 100644 (file)
@@ -324,6 +324,11 @@ int collecty_daemon_submit(collecty_daemon* self,
        return collecty_queue_submit(self->queue, source, object, value);
 }
 
+int collecty_daemon_flush_source(
+               collecty_daemon* self, collecty_source* source, const char* object) {
+       return collecty_queue_flush_source(self->queue, source, object);
+}
+
 static int collecty_daemon_bus_version(sd_bus* bus, const char* path, const char* interface,
                const char* property, sd_bus_message* reply, void* data, sd_bus_error* error) {
        return sd_bus_message_append(reply, "s", PACKAGE_VERSION);
index 93d0e0a6bd39ece660d2875443c1b46e925f10bc..9f1fcd9569d16b3b5bcc9b28879df35316ed8779 100644 (file)
@@ -45,6 +45,9 @@ int collecty_daemon_run(collecty_daemon* self);
 int collecty_daemon_submit(collecty_daemon* self,
                collecty_source* source, const char* object, const char* value);
 
+int collecty_daemon_flush_source(
+       collecty_daemon* self, collecty_source* source, const char* object);
+
 // Bus
 extern const collecty_bus_implementation daemon_bus_impl;
 
index 8d91e8d3a677812bbc677173209a06656842307c..1147d1cf92508a3d1241767fcf729f397d5cb539 100644 (file)
@@ -64,6 +64,20 @@ struct collecty_queue {
        STAILQ_HEAD(queue, collecty_queue_object) queue;
 };
 
+static int collecty_queue_object_equals(
+               collecty_queue* self, struct collecty_queue_object* o, const char* object) {
+       // Objects match if they are both NULL
+       if (!o->object && !object)
+               return 1;
+
+       // If both have objects, we need to compare
+       if (o->object && object)
+               return collecty_string_equals(o->object, object);
+
+       // Otherwise we have no match
+       return 0;
+}
+
 static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
        collecty_queue* self = data;
        int r;
@@ -200,14 +214,9 @@ static struct collecty_queue_object* collecty_queue_find_object(
                if (o->source != source)
                        continue;
 
-               // If both objects are NULL we have a match
-               if (!o->object && !object)
+               // Check if the object matches
+               if (collecty_queue_object_equals(self, o, object))
                        return o;
-
-               // If both have objects, we need to compare
-               else if (o->object && object)
-                       if (collecty_string_equals(o->object, object))
-                               return o;
        }
 
        // No match found
@@ -337,6 +346,25 @@ ERROR:
        return r;
 }
 
+static int collecty_queue_flush_object(collecty_queue* self, struct collecty_queue_object* o) {
+       int r;
+
+       // Call the source to write its data
+       r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+                       collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+       }
+
+       // Remove the object from the queue
+       STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+
+       // Free the object
+       collecty_queue_free_object(o);
+
+       return 0;
+}
+
 int collecty_queue_flush(collecty_queue* self) {
        struct collecty_queue_object* o = NULL;
        int r;
@@ -348,18 +376,30 @@ int collecty_queue_flush(collecty_queue* self) {
                if (!o)
                        break;
 
-               // Call the source to write its data
-               r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
-               if (r < 0) {
-                       ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
-                               collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
-               }
+               // Flush the object
+               r = collecty_queue_flush_object(self, o);
+               if (r < 0)
+                       return r;
+       }
 
-               // Remove the object from the queue
-               STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+       return 0;
+}
 
-               // Free the object
-               collecty_queue_free_object(o);
+int collecty_queue_flush_source(collecty_queue* self,
+               collecty_source* source, const char* object) {
+       struct collecty_queue_object* o = NULL;
+
+       STAILQ_FOREACH(o, &self->queue, nodes) {
+               // Continue if the source does not match
+               if (o->source != source)
+                       continue;
+
+               // Continue if the object does not match
+               if (!collecty_queue_object_equals(self, o, object))
+                       continue;
+
+               // Flush the object
+               return collecty_queue_flush_object(self, o);
        }
 
        return 0;
index 1a813d3daeb5072d187bb414fe4373b9fd71de48..9c775411cbbdba7454c0b71e7e996fec73e13346 100644 (file)
@@ -38,4 +38,7 @@ int collecty_queue_submit(collecty_queue* self,
 
 int collecty_queue_flush(collecty_queue* self);
 
+int collecty_queue_flush_source(collecty_queue* self,
+       collecty_source* source, const char* object);
+
 #endif /* COLLECTY_QUEUE_H */
index 5000da2839686ac5b91cc721755c7d7ee8483bbc..38feb365b49f8f17dd235789d469e9129160b203 100644 (file)
@@ -651,5 +651,6 @@ int collecty_source_render(collecty_source* self, collecty_args* args, const cha
                        return r;
        }
 
-       return 0;
+       // Commit any buffered data right now
+       return collecty_daemon_flush_source(self->daemon, self, object);
 }