#define ERROR_THRESHOLD 5
#define FLAPPING_THRESHOLD 3
+// Complain if collect() took longer than this time
+#define RUNTIME_THRESHOLD 250000 // 250 milliseconds
+
// Define some default RRAs
static const collecty_rrd_rra default_rras[] = {
// Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks
changes++;
}
- DEBUG(self->ctx, "CHANGES %d\n", changes);
-
return changes >= FLAPPING_THRESHOLD;
}
-static int collecty_source_error_detection(collecty_source* self, int result) {
+static int collecty_source_error_detection(collecty_source* self, int result, uint64_t runtime) {
enum collecty_source_state state;
+ int r;
// Move everything up in the array
for (unsigned int i = NUM_RESULTS - 1; i > 0; i--)
if (self->results.num < NUM_RESULTS)
self->results.num++;
+ // Complain if an iteration took too long
+ if (runtime >= RUNTIME_THRESHOLD) {
+ ERROR(self->ctx, "Heartbeat for %s stalled the event loop for %.2lfms\n",
+ collecty_source_name(self), (double)runtime / 1000);
+
+ // Decrease the priority for this source so it won't stall any other sources
+ r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_IDLE);
+ if (r < 0)
+ return r;
+
+ // Otherwise just log the runtime and reset the priority
+ } else {
+ // Log the runtime
+ DEBUG(self->ctx, "Heartbeat for %s took %.2lfms\n",
+ collecty_source_name(self), (double)runtime / 1000);
+
+ // Decrease the priority for this source so it won't stall any other sources
+ r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_NORMAL);
+ if (r < 0)
+ return r;
+ }
+
// Log all recent results
if (result) {
for (unsigned int i = 0; i < self->results.num; i++) {
return 0;
}
+static uint64_t collecty_source_elapsed_time(void) {
+ struct timespec ts = {};
+ int r;
+
+ // Fetch the current time
+ r = clock_gettime(CLOCK_MONOTONIC, &ts);
+ if (r < 0)
+ return 0;
+
+ // Return as µsec
+ return (ts.tv_sec * 1000000) + (ts.tv_nsec / 1000);
+}
+
static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
collecty_source* self = data;
uint64_t next_heartbeat;
+ uint64_t t_start;
+ uint64_t t_end;
int r;
- DEBUG(self->ctx, "Heartbeat called for %s\n", collecty_source_name(self));
+ // Store the start timestamp
+ t_start = collecty_source_elapsed_time();
// Call the collect method
r = self->impl->collect(self->ctx, self);
collecty_source_name(self), strerror(-r));
}
+ // Fetch the end timestamp
+ t_end = collecty_source_elapsed_time();
+
// Run error detection
- r = collecty_source_error_detection(self, r);
+ r = collecty_source_error_detection(self, r, t_end - t_start);
if (r < 0)
return r;