#include "module.h"
#include "modules.h"
+#define FLUSH_QUEUE_INTERVAL 60000000 // 60s (for testing)
+
struct collecty_queue_object {
STAILQ_ENTRY(collecty_queue_object) nodes;
sd_event_source* sigterm;
sd_event_source* sigint;
sd_event_source* modules_init;
+ sd_event_source* flush;
} events;
// Write Queue
free(o);
}
+static int collecty_daemon_flush(sd_event_source* source, uint64_t usec, void* data) {
+ collecty_daemon* self = data;
+ int r;
+
+ DEBUG(self->ctx, "Flushing the queue...\n");
+
+ // TODO
+
+ // Call again soon...
+ r = sd_event_source_set_time(source, usec + FLUSH_QUEUE_INTERVAL);
+ if (r < 0)
+ return r;
+
+ // Enable the timer
+ r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
static int collecty_daemon_setup_queue(collecty_daemon* self) {
+ int r;
+
// Initialize the queue
STAILQ_INIT(&self->queue);
+ // Create a timer which regularly flushes the queue
+ r = sd_event_add_time_relative(self->loop, &self->events.flush,
+ CLOCK_MONOTONIC, FLUSH_QUEUE_INTERVAL, 0, collecty_daemon_flush, self);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to set up the flush timer: %s\n", strerror(-r));
+ return r;
+ }
+
return 0;
}
collecty_daemon_free_queue_object(o);
}
+ if (self->events.flush)
+ sd_event_source_unref(self->events.flush);
if (self->events.modules_init)
sd_event_source_unref(self->events.modules_init);
if (self->events.sigterm)