// XXX We need to check whether it is a good idea to hardcode this here
#define XFF 0.1
+// Store the last 10 return codes
+#define NUM_RESULTS 10
+
+// Thresholds to disable the source
+#define ERROR_THRESHOLD 5
+#define FLAPPING_THRESHOLD 3
+
// Define some default RRAs
static const collecty_rrd_rra default_rras[] = {
// Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks
struct {
sd_event_source* heartbeat;
} events;
+
+ // State
+ enum collecty_source_state {
+ STATE_HEALTHY = 0,
+ STATE_FLAPPING,
+ STATE_ERROR,
+ STATE_DISABLED,
+ } state;
+
+ // Results
+ struct {
+ // Stores the return codes
+ int r[NUM_RESULTS];
+
+ // Stores the total number of values we have
+ unsigned int num;
+ } results;
};
+// Return the number of recent bad results
+static int collecty_source_count_bad_results(collecty_source* self) {
+ int counter = 0;
+
+ // Starting with the most recent result, count all bad results until a good one is found
+ for (unsigned int i = 0; i < self->results.num; i++) {
+ if (!self->results.r[i])
+ break;
+
+ // Increment the counter
+ counter++;
+ }
+
+ return counter;
+}
+
+static int collecty_source_is_flapping(collecty_source* self) {
+ int changes = 0;
+ int r;
+
+ // If we have less than threshold results, we don't need to run this
+ if (self->results.num <= FLAPPING_THRESHOLD)
+ return 0;
+
+ // Start with the most recent result
+ r = self->results.r[0];
+
+ // Check all further results and count the changes
+ for (unsigned int i = 1; i < self->results.num; i++) {
+ if (self->results.r[i] == r)
+ continue;
+
+ // We have changed
+ r = self->results.r[i];
+ changes++;
+ }
+
+ DEBUG(self->ctx, "CHANGES %d\n", changes);
+
+ return changes >= FLAPPING_THRESHOLD;
+}
+
+static int collecty_source_error_detection(collecty_source* self, int result) {
+ enum collecty_source_state state;
+
+ // Move everything up in the array
+ for (unsigned int i = NUM_RESULTS - 1; i > 0; i--)
+ self->results.r[i] = self->results.r[i - 1];
+
+ // Store the latest result
+ self->results.r[0] = result;
+
+ // We now have more results
+ if (self->results.num < NUM_RESULTS)
+ self->results.num++;
+
+ // Log all recent results
+ if (result) {
+ for (unsigned int i = 0; i < self->results.num; i++) {
+ DEBUG(self->ctx, "%s: result[%u] = %s\n",
+ collecty_source_name(self), i, strerror(-self->results.r[i]));
+ }
+
+ // Print bad results
+ DEBUG(self->ctx, "%s: bad results: %d\n",
+ collecty_source_name(self), collecty_source_count_bad_results(self));
+ }
+
+ // If the last N attempts have been bad, we are in error state
+ if (collecty_source_count_bad_results(self) >= ERROR_THRESHOLD)
+ state = STATE_ERROR;
+
+ // Check if we are flapping
+ else if (collecty_source_is_flapping(self))
+ state = STATE_FLAPPING;
+
+ // Otherwise we must be healthy
+ else
+ state = STATE_HEALTHY;
+
+ // If the state has not changed, we are don
+ if (self->state == state)
+ return 0;
+
+ // Store the new state
+ self->state = state;
+
+ // Log the state change
+ switch (state) {
+ case STATE_HEALTHY:
+ DEBUG(self->ctx, "%s is now healthy\n", collecty_source_name(self));
+ break;
+
+ case STATE_FLAPPING:
+ ERROR(self->ctx, "%s is now flapping\n", collecty_source_name(self));
+ break;
+
+ case STATE_ERROR:
+ ERROR(self->ctx, "%s is now in error state\n", collecty_source_name(self));
+ break;
+
+ case STATE_DISABLED:
+ ERROR(self->ctx, "%s has been disabled\n", collecty_source_name(self));
+ break;
+ }
+
+ return 0;
+}
+
static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
collecty_source* self = data;
int r;
collecty_source_name(self), strerror(-r));
}
+ // Run error detection
+ r = collecty_source_error_detection(self, r);
+ if (r < 0)
+ return r;
+
// Arm the timer again
r = sd_event_source_set_time(self->events.heartbeat, usec + HEARTBEAT);
if (r < 0)