]> git.ipfire.org Git - collecty.git/commitdiff
source: Complain if something stalls the event loop
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Oct 2025 16:02:24 +0000 (16:02 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Oct 2025 16:02:24 +0000 (16:02 +0000)
This source will also be de-prioritized so that other sources can work
better.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/daemon/source.c

index 8f014a5c8389a96610b0aba17c4bb6feef0ea7d1..479ed70ade8d513634f53e8d0d1049d57ee71add 100644 (file)
@@ -50,6 +50,9 @@
 #define ERROR_THRESHOLD                5
 #define FLAPPING_THRESHOLD     3
 
+// Complain if collect() took longer than this time
+#define RUNTIME_THRESHOLD      250000 // 250 milliseconds
+
 // Define some default RRAs
 static const collecty_rrd_rra default_rras[] = {
        // Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks
@@ -143,13 +146,12 @@ static int collecty_source_is_flapping(collecty_source* self) {
                changes++;
        }
 
-       DEBUG(self->ctx, "CHANGES %d\n", changes);
-
        return changes >= FLAPPING_THRESHOLD;
 }
 
-static int collecty_source_error_detection(collecty_source* self, int result) {
+static int collecty_source_error_detection(collecty_source* self, int result, uint64_t runtime) {
        enum collecty_source_state state;
+       int r;
 
        // Move everything up in the array
        for (unsigned int i = NUM_RESULTS - 1; i > 0; i--)
@@ -162,6 +164,28 @@ static int collecty_source_error_detection(collecty_source* self, int result) {
        if (self->results.num < NUM_RESULTS)
                self->results.num++;
 
+       // Complain if an iteration took too long
+       if (runtime >= RUNTIME_THRESHOLD) {
+               ERROR(self->ctx, "Heartbeat for %s stalled the event loop for %.2lfms\n",
+                       collecty_source_name(self), (double)runtime / 1000);
+
+               // Decrease the priority for this source so it won't stall any other sources
+               r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_IDLE);
+               if (r < 0)
+                       return r;
+
+       // Otherwise just log the runtime and reset the priority
+       } else {
+               // Log the runtime
+               DEBUG(self->ctx, "Heartbeat for %s took %.2lfms\n",
+                       collecty_source_name(self), (double)runtime / 1000);
+
+               // Decrease the priority for this source so it won't stall any other sources
+               r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_NORMAL);
+               if (r < 0)
+                       return r;
+       }
+
        // Log all recent results
        if (result) {
                for (unsigned int i = 0; i < self->results.num; i++) {
@@ -215,12 +239,28 @@ static int collecty_source_error_detection(collecty_source* self, int result) {
        return 0;
 }
 
+static uint64_t collecty_source_elapsed_time(void) {
+       struct timespec ts = {};
+       int r;
+
+       // Fetch the current time
+       r = clock_gettime(CLOCK_MONOTONIC, &ts);
+       if (r < 0)
+               return 0;
+
+       // Return as µsec
+       return (ts.tv_sec * 1000000) + (ts.tv_nsec / 1000);
+}
+
 static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
        collecty_source* self = data;
        uint64_t next_heartbeat;
+       uint64_t t_start;
+       uint64_t t_end;
        int r;
 
-       DEBUG(self->ctx, "Heartbeat called for %s\n", collecty_source_name(self));
+       // Store the start timestamp
+       t_start = collecty_source_elapsed_time();
 
        // Call the collect method
        r = self->impl->collect(self->ctx, self);
@@ -229,8 +269,11 @@ static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, voi
                        collecty_source_name(self), strerror(-r));
        }
 
+       // Fetch the end timestamp
+       t_end = collecty_source_elapsed_time();
+
        // Run error detection
-       r = collecty_source_error_detection(self, r);
+       r = collecty_source_error_detection(self, r, t_end - t_start);
        if (r < 0)
                return r;