]> git.ipfire.org Git - collecty.git/commitdiff
sources: Store result codes and check for patterns
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Oct 2025 15:09:15 +0000 (15:09 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Oct 2025 15:09:15 +0000 (15:09 +0000)
That way we can disable any sources which constantly fail and we might
as well detect flapping.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/daemon/source.c

index e4db4f19f83b018e18066bbbf9ec03a664dc8901..1db74a731c5b4e04c26c95b763dfeba1ce2bd56c 100644 (file)
 // XXX We need to check whether it is a good idea to hardcode this here
 #define XFF 0.1
 
+// Store the last 10 return codes
+#define NUM_RESULTS    10
+
+// Thresholds to disable the source
+#define ERROR_THRESHOLD                5
+#define FLAPPING_THRESHOLD     3
+
 // Define some default RRAs
 static const collecty_rrd_rra default_rras[] = {
        // Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks
@@ -80,8 +87,134 @@ struct collecty_source {
        struct {
                sd_event_source* heartbeat;
        } events;
+
+       // State
+       enum collecty_source_state {
+               STATE_HEALTHY = 0,
+               STATE_FLAPPING,
+               STATE_ERROR,
+               STATE_DISABLED,
+       } state;
+
+       // Results
+       struct {
+               // Stores the return codes
+               int r[NUM_RESULTS];
+
+               // Stores the total number of values we have
+               unsigned int num;
+       } results;
 };
 
+// Return the number of recent bad results
+static int collecty_source_count_bad_results(collecty_source* self) {
+       int counter = 0;
+
+       // Starting with the most recent result, count all bad results until a good one is found
+       for (unsigned int i = 0; i < self->results.num; i++) {
+               if (!self->results.r[i])
+                       break;
+
+               // Increment the counter
+               counter++;
+       }
+
+       return counter;
+}
+
+static int collecty_source_is_flapping(collecty_source* self) {
+       int changes = 0;
+       int r;
+
+       // If we have less than threshold results, we don't need to run this
+       if (self->results.num <= FLAPPING_THRESHOLD)
+               return 0;
+
+       // Start with the most recent result
+       r = self->results.r[0];
+
+       // Check all further results and count the changes
+       for (unsigned int i = 1; i < self->results.num; i++) {
+               if (self->results.r[i] == r)
+                       continue;
+
+               // We have changed
+               r = self->results.r[i];
+               changes++;
+       }
+
+       DEBUG(self->ctx, "CHANGES %d\n", changes);
+
+       return changes >= FLAPPING_THRESHOLD;
+}
+
+static int collecty_source_error_detection(collecty_source* self, int result) {
+       enum collecty_source_state state;
+
+       // Move everything up in the array
+       for (unsigned int i = NUM_RESULTS - 1; i > 0; i--)
+               self->results.r[i] = self->results.r[i - 1];
+
+       // Store the latest result
+       self->results.r[0] = result;
+
+       // We now have more results
+       if (self->results.num < NUM_RESULTS)
+               self->results.num++;
+
+       // Log all recent results
+       if (result) {
+               for (unsigned int i = 0; i < self->results.num; i++) {
+                       DEBUG(self->ctx, "%s: result[%u] = %s\n",
+                               collecty_source_name(self), i, strerror(-self->results.r[i]));
+               }
+
+               // Print bad results
+               DEBUG(self->ctx, "%s: bad results: %d\n",
+                       collecty_source_name(self), collecty_source_count_bad_results(self));
+       }
+
+       // If the last N attempts have been bad, we are in error state
+       if (collecty_source_count_bad_results(self) >= ERROR_THRESHOLD)
+               state = STATE_ERROR;
+
+       // Check if we are flapping
+       else if (collecty_source_is_flapping(self))
+               state = STATE_FLAPPING;
+
+       // Otherwise we must be healthy
+       else
+               state = STATE_HEALTHY;
+
+       // If the state has not changed, we are don
+       if (self->state == state)
+               return 0;
+
+       // Store the new state
+       self->state = state;
+
+       // Log the state change
+       switch (state) {
+               case STATE_HEALTHY:
+                       DEBUG(self->ctx, "%s is now healthy\n", collecty_source_name(self));
+                       break;
+
+               case STATE_FLAPPING:
+                       ERROR(self->ctx, "%s is now flapping\n", collecty_source_name(self));
+                       break;
+
+               case STATE_ERROR:
+                       ERROR(self->ctx, "%s is now in error state\n", collecty_source_name(self));
+                       break;
+
+               case STATE_DISABLED:
+                       ERROR(self->ctx, "%s has been disabled\n", collecty_source_name(self));
+                       break;
+       }
+
+       return 0;
+}
+
 static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
        collecty_source* self = data;
        int r;
@@ -95,6 +228,11 @@ static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, voi
                        collecty_source_name(self), strerror(-r));
        }
 
+       // Run error detection
+       r = collecty_source_error_detection(self, r);
+       if (r < 0)
+               return r;
+
        // Arm the timer again
        r = sd_event_source_set_time(self->events.heartbeat, usec + HEARTBEAT);
        if (r < 0)