From: Michael Tremer Date: Wed, 8 Oct 2025 20:06:35 +0000 (+0000) Subject: queue: Flush all data when we want to draw a graph X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6e4c3ac5bbb9afe1818af3599e279621bae9f546;p=telemetry.git queue: Flush all data when we want to draw a graph Signed-off-by: Michael Tremer --- diff --git a/src/daemon/daemon.c b/src/daemon/daemon.c index 7405061..c9d67be 100644 --- a/src/daemon/daemon.c +++ b/src/daemon/daemon.c @@ -324,6 +324,11 @@ int collecty_daemon_submit(collecty_daemon* self, return collecty_queue_submit(self->queue, source, object, value); } +int collecty_daemon_flush_source( + collecty_daemon* self, collecty_source* source, const char* object) { + return collecty_queue_flush_source(self->queue, source, object); +} + static int collecty_daemon_bus_version(sd_bus* bus, const char* path, const char* interface, const char* property, sd_bus_message* reply, void* data, sd_bus_error* error) { return sd_bus_message_append(reply, "s", PACKAGE_VERSION); diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 93d0e0a..9f1fcd9 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -45,6 +45,9 @@ int collecty_daemon_run(collecty_daemon* self); int collecty_daemon_submit(collecty_daemon* self, collecty_source* source, const char* object, const char* value); +int collecty_daemon_flush_source( + collecty_daemon* self, collecty_source* source, const char* object); + // Bus extern const collecty_bus_implementation daemon_bus_impl; diff --git a/src/daemon/queue.c b/src/daemon/queue.c index 8d91e8d..1147d1c 100644 --- a/src/daemon/queue.c +++ b/src/daemon/queue.c @@ -64,6 +64,20 @@ struct collecty_queue { STAILQ_HEAD(queue, collecty_queue_object) queue; }; +static int collecty_queue_object_equals( + collecty_queue* self, struct collecty_queue_object* o, const char* object) { + // Objects match if they are both NULL + if (!o->object && !object) + return 1; + + // If both have objects, we need to compare + if (o->object && object) + return collecty_string_equals(o->object, object); + + // Otherwise we have no match + return 0; +} + static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) { collecty_queue* self = data; int r; @@ -200,14 +214,9 @@ static struct collecty_queue_object* collecty_queue_find_object( if (o->source != source) continue; - // If both objects are NULL we have a match - if (!o->object && !object) + // Check if the object matches + if (collecty_queue_object_equals(self, o, object)) return o; - - // If both have objects, we need to compare - else if (o->object && object) - if (collecty_string_equals(o->object, object)) - return o; } // No match found @@ -337,6 +346,25 @@ ERROR: return r; } +static int collecty_queue_flush_object(collecty_queue* self, struct collecty_queue_object* o) { + int r; + + // Call the source to write its data + r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples); + if (r < 0) { + ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n", + collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r)); + } + + // Remove the object from the queue + STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes); + + // Free the object + collecty_queue_free_object(o); + + return 0; +} + int collecty_queue_flush(collecty_queue* self) { struct collecty_queue_object* o = NULL; int r; @@ -348,18 +376,30 @@ int collecty_queue_flush(collecty_queue* self) { if (!o) break; - // Call the source to write its data - r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples); - if (r < 0) { - ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n", - collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r)); - } + // Flush the object + r = collecty_queue_flush_object(self, o); + if (r < 0) + return r; + } - // Remove the object from the queue - STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes); + return 0; +} - // Free the object - collecty_queue_free_object(o); +int collecty_queue_flush_source(collecty_queue* self, + collecty_source* source, const char* object) { + struct collecty_queue_object* o = NULL; + + STAILQ_FOREACH(o, &self->queue, nodes) { + // Continue if the source does not match + if (o->source != source) + continue; + + // Continue if the object does not match + if (!collecty_queue_object_equals(self, o, object)) + continue; + + // Flush the object + return collecty_queue_flush_object(self, o); } return 0; diff --git a/src/daemon/queue.h b/src/daemon/queue.h index 1a813d3..9c77541 100644 --- a/src/daemon/queue.h +++ b/src/daemon/queue.h @@ -38,4 +38,7 @@ int collecty_queue_submit(collecty_queue* self, int collecty_queue_flush(collecty_queue* self); +int collecty_queue_flush_source(collecty_queue* self, + collecty_source* source, const char* object); + #endif /* COLLECTY_QUEUE_H */ diff --git a/src/daemon/source.c b/src/daemon/source.c index 5000da2..38feb36 100644 --- a/src/daemon/source.c +++ b/src/daemon/source.c @@ -651,5 +651,6 @@ int collecty_source_render(collecty_source* self, collecty_args* args, const cha return r; } - return 0; + // Commit any buffered data right now + return collecty_daemon_flush_source(self->daemon, self, object); }