From 8a0cb98e322c9cc572b6d17fb4aba2bc5aa42e9d Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 4 Oct 2025 16:02:24 +0000 Subject: [PATCH] source: Complain if something stalls the event loop This source will also be de-prioritized so that other sources can work better. Signed-off-by: Michael Tremer --- src/daemon/source.c | 53 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/src/daemon/source.c b/src/daemon/source.c index 8f014a5..479ed70 100644 --- a/src/daemon/source.c +++ b/src/daemon/source.c @@ -50,6 +50,9 @@ #define ERROR_THRESHOLD 5 #define FLAPPING_THRESHOLD 3 +// Complain if collect() took longer than this time +#define RUNTIME_THRESHOLD 250000 // 250 milliseconds + // Define some default RRAs static const collecty_rrd_rra default_rras[] = { // Keep AVERAGE/MIN/MAX with a one minute resolution for two weeks @@ -143,13 +146,12 @@ static int collecty_source_is_flapping(collecty_source* self) { changes++; } - DEBUG(self->ctx, "CHANGES %d\n", changes); - return changes >= FLAPPING_THRESHOLD; } -static int collecty_source_error_detection(collecty_source* self, int result) { +static int collecty_source_error_detection(collecty_source* self, int result, uint64_t runtime) { enum collecty_source_state state; + int r; // Move everything up in the array for (unsigned int i = NUM_RESULTS - 1; i > 0; i--) @@ -162,6 +164,28 @@ static int collecty_source_error_detection(collecty_source* self, int result) { if (self->results.num < NUM_RESULTS) self->results.num++; + // Complain if an iteration took too long + if (runtime >= RUNTIME_THRESHOLD) { + ERROR(self->ctx, "Heartbeat for %s stalled the event loop for %.2lfms\n", + collecty_source_name(self), (double)runtime / 1000); + + // Decrease the priority for this source so it won't stall any other sources + r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_IDLE); + if (r < 0) + return r; + + // Otherwise just log the runtime and reset the priority + } else { + // Log the runtime + DEBUG(self->ctx, "Heartbeat for %s took %.2lfms\n", + collecty_source_name(self), (double)runtime / 1000); + + // Decrease the priority for this source so it won't stall any other sources + r = sd_event_source_set_priority(self->events.heartbeat, SD_EVENT_PRIORITY_NORMAL); + if (r < 0) + return r; + } + // Log all recent results if (result) { for (unsigned int i = 0; i < self->results.num; i++) { @@ -215,12 +239,28 @@ static int collecty_source_error_detection(collecty_source* self, int result) { return 0; } +static uint64_t collecty_source_elapsed_time(void) { + struct timespec ts = {}; + int r; + + // Fetch the current time + r = clock_gettime(CLOCK_MONOTONIC, &ts); + if (r < 0) + return 0; + + // Return as µsec + return (ts.tv_sec * 1000000) + (ts.tv_nsec / 1000); +} + static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, void* data) { collecty_source* self = data; uint64_t next_heartbeat; + uint64_t t_start; + uint64_t t_end; int r; - DEBUG(self->ctx, "Heartbeat called for %s\n", collecty_source_name(self)); + // Store the start timestamp + t_start = collecty_source_elapsed_time(); // Call the collect method r = self->impl->collect(self->ctx, self); @@ -229,8 +269,11 @@ static int collecty_source_heartbeat(sd_event_source* source, uint64_t usec, voi collecty_source_name(self), strerror(-r)); } + // Fetch the end timestamp + t_end = collecty_source_elapsed_time(); + // Run error detection - r = collecty_source_error_detection(self, r); + r = collecty_source_error_detection(self, r, t_end - t_start); if (r < 0) return r; -- 2.47.3