From 153c06623b175cc300dace3c8914ffe1512ec0e5 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 4 Oct 2025 15:09:15 +0000 Subject: [PATCH] sources: Store result codes and check for patterns That way we can disable any sources which constantly fail and we might as well detect flapping. Signed-off-by: Michael Tremer --- src/daemon/source.c | 138 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/src/daemon/source.c b/src/daemon/source.c index e4db4f1..1db74a7 100644 --- a/src/daemon/source.c +++ b/src/daemon/source.c @@ -43,6 +43,13 @@ // XXX We need to check whether it is a good idea to hardcode this here #define XFF 0.1 +// Store the last 10 return codes +#define NUM_RESULTS 10 + +// Thresholds to disable the source +#define ERROR_THRESHOLD 5 +#define FLAPPING_THRESHOLD 3 + // Define some default RRAs static const collecty_rrd_rra default_rras[] = { // Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks @@ -80,8 +87,134 @@ struct collecty_source { struct { sd_event_source* heartbeat; } events; + + // State + enum collecty_source_state { + STATE_HEALTHY = 0, + STATE_FLAPPING, + STATE_ERROR, + STATE_DISABLED, + } state; + + // Results + struct { + // Stores the return codes + int r[NUM_RESULTS]; + + // Stores the total number of values we have + unsigned int num; + } results; }; +// Return the number of recent bad results +static int collecty_source_count_bad_results(collecty_source* self) { + int counter = 0; + + // Starting with the most recent result, count all bad results until a good one is found + for (unsigned int i = 0; i < self->results.num; i++) { + if (!self->results.r[i]) + break; + + // Increment the counter + counter++; + } + + return counter; +} + +static int collecty_source_is_flapping(collecty_source* self) { + int changes = 0; + int r; + + // If we have less than threshold results, we don't need to run this + if (self->results.num <= FLAPPING_THRESHOLD) + return 0; + + // Start with the most recent result + r = self->results.r[0]; + + // Check all further results and count the changes + for (unsigned int i = 1; i < self->results.num; i++) { + if (self->results.r[i] == r) + continue; + + // We have changed + r = self->results.r[i]; + changes++; + } + + DEBUG(self->ctx, "CHANGES %d\n", changes); + + return changes >= FLAPPING_THRESHOLD; +} + +static int collecty_source_error_detection(collecty_source* self, int result) { + enum collecty_source_state state; + + // Move everything up in the array + for (unsigned int i = NUM_RESULTS - 1; i > 0; i--) + self->results.r[i] = self->results.r[i - 1]; + + // Store the latest result + self->results.r[0] = result; + + // We now have more results + if (self->results.num < NUM_RESULTS) + self->results.num++; + + // Log all recent results + if (result) { + for (unsigned int i = 0; i < self->results.num; i++) { + DEBUG(self->ctx, "%s: result[%u] = %s\n", + collecty_source_name(self), i, strerror(-self->results.r[i])); + } + + // Print bad results + DEBUG(self->ctx, "%s: bad results: %d\n", + collecty_source_name(self), collecty_source_count_bad_results(self)); + } + + // If the last N attempts have been bad, we are in error state + if (collecty_source_count_bad_results(self) >= ERROR_THRESHOLD) + state = STATE_ERROR; + + // Check if we are flapping + else if (collecty_source_is_flapping(self)) + state = STATE_FLAPPING; + + // Otherwise we must be healthy + else + state = STATE_HEALTHY; + + // If the state has not changed, we are don + if (self->state == state) + return 0; + + // Store the new state + self->state = state; + + // Log the state change + switch (state) { + case STATE_HEALTHY: + DEBUG(self->ctx, "%s is now healthy\n", collecty_source_name(self)); + break; + + case STATE_FLAPPING: + ERROR(self->ctx, "%s is now flapping\n", collecty_source_name(self)); + break; + + case STATE_ERROR: + ERROR(self->ctx, "%s is now in error state\n", collecty_source_name(self)); + break; + + case STATE_DISABLED: + ERROR(self->ctx, "%s has been disabled\n", collecty_source_name(self)); + break; + } + + return 0; +} + static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) { collecty_source* self = data; int r; @@ -95,6 +228,11 @@ static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, voi collecty_source_name(self), strerror(-r)); } + // Run error detection + r = collecty_source_error_detection(self, r); + if (r < 0) + return r; + // Arm the timer again r = sd_event_source_set_time(self->events.heartbeat, usec + HEARTBEAT); if (r < 0) -- 2.47.3