src/daemon/modules.c \
src/daemon/modules.h \
src/daemon/modules/loadavg.c \
- src/daemon/modules/loadavg.h
+ src/daemon/modules/loadavg.h \
+ src/daemon/queue.c \
+ src/daemon/queue.h
collectyd_CPPFLAGS = \
$(AM_CPPFLAGS)
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/queue.h>
-#include <sys/time.h>
#include <systemd/sd-daemon.h>
#include <systemd/sd-event.h>
#include "daemon.h"
#include "module.h"
#include "modules.h"
-
-#define FLUSH_QUEUE_INTERVAL 60000000 // 60s (for testing)
-
-struct collecty_queue_object {
- STAILQ_ENTRY(collecty_queue_object) nodes;
-
- // Module
- collecty_module* module;
-
- // Object
- char* object;
-
- // Timestamp
- struct timeval t;
-
- // Value
- char* value;
-};
+#include "queue.h"
struct collecty_daemon {
collecty_ctx* ctx;
sd_event_source* sigint;
sd_event_source* sighup;
sd_event_source* modules_init;
- sd_event_source* flush;
} events;
- // Write Queue
- STAILQ_HEAD(queue, collecty_queue_object) queue;
+ // Queue
+ collecty_queue* queue;
};
-static int collecty_daemon_flush(collecty_daemon* self) {
- DEBUG(self->ctx, "Flushing the queue...\n");
-
- // XXX TODO
-
- return 0;
-}
-
static int collecty_daemon_modules_init(sd_event_source* source, void* data) {
collecty_daemon* daemon = data;
collecty_daemon* self = data;
// Flush the queue
- return collecty_daemon_flush(self);
+ return collecty_queue_flush(self->queue);
}
static int collecty_daemon_setup_loop(collecty_daemon* self) {
return 0;
}
-static void collecty_daemon_free_queue_object(struct collecty_queue_object* o) {
- if (o->module)
- collecty_module_unref(o->module);
- if (o->object)
- free(o->object);
- if (o->value)
- free(o->value);
- free(o);
-}
-
-static int collecty_daemon_flush_timer(sd_event_source* source, uint64_t usec, void* data) {
- collecty_daemon* self = data;
- int r;
-
- // Call again soon...
- r = sd_event_source_set_time(source, usec + FLUSH_QUEUE_INTERVAL);
- if (r < 0)
- return r;
-
- // Enable the timer
- r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
- if (r < 0)
- return r;
-
- // Flush the queue
- return collecty_daemon_flush(self);
-}
-
-static int collecty_daemon_setup_queue(collecty_daemon* self) {
- int r;
-
- // Initialize the queue
- STAILQ_INIT(&self->queue);
-
- // Create a timer which regularly flushes the queue
- r = sd_event_add_time_relative(self->loop, &self->events.flush,
- CLOCK_MONOTONIC, FLUSH_QUEUE_INTERVAL, 0, collecty_daemon_flush_timer, self);
- if (r < 0) {
- ERROR(self->ctx, "Failed to set up the flush timer: %s\n", strerror(-r));
- return r;
- }
-
- return 0;
-}
-
static void collecty_daemon_free(collecty_daemon* self) {
- struct collecty_queue_object* o = NULL;
-
- // Cleanup the queue
- for (;;) {
- o = STAILQ_FIRST(&self->queue);
- if (!o)
- break;
-
- // Remove the object from the queue
- STAILQ_REMOVE_HEAD(&self->queue, nodes);
-
- // Free the object
- collecty_daemon_free_queue_object(o);
- }
-
- if (self->events.flush)
- sd_event_source_unref(self->events.flush);
if (self->events.modules_init)
sd_event_source_unref(self->events.modules_init);
if (self->events.sigterm)
sd_event_source_unref(self->events.sigint);
if (self->events.sighup)
sd_event_source_unref(self->events.sighup);
+ if (self->queue)
+ collecty_queue_unref(self->queue);
if (self->ctx)
collecty_ctx_unref(self->ctx);
if (self->loop)
goto ERROR;
// Setup the write queue
- r = collecty_daemon_setup_queue(self);
+ r = collecty_queue_create(&self->queue, self->ctx, self);
if (r < 0)
goto ERROR;
return 1;
}
-/*
- Submits a new reading into the queue
-*/
int collecty_daemon_submit(collecty_daemon* self,
collecty_module* module, const char* object, const char* value) {
- struct collecty_queue_object* o = NULL;
- int r;
-
- // Check inputs
- if (!value)
- return -EINVAL;
-
- // Allocate some memory
- o = calloc(1, sizeof(*self));
- if (!o)
- return -errno;
-
- // Reference the module
- o->module = collecty_module_ref(module);
-
- // Fetch the current timestamp
- r = gettimeofday(&o->t, NULL);
- if (r < 0) {
- r = -errno;
- goto ERROR;
- }
-
- // Store the object
- if (o->object) {
- o->object = strdup(object);
- if (!o->object) {
- r = -errno;
- goto ERROR;
- }
- }
-
- // Store the value
- o->value = strdup(value);
- if (!o->value) {
- r = -errno;
- goto ERROR;
- }
-
- // Append the object to the queue
- STAILQ_INSERT_TAIL(&self->queue, o, nodes);
-
- // Log action
- DEBUG(self->ctx, "%s(%s) submitted: %s\n",
- collecty_module_name(module), (o->object) ? o->object : "", o->value);
-
- return 0;
-
-ERROR:
- if (o)
- collecty_daemon_free_queue_object(o);
-
- return r;
+ return collecty_queue_submit(self->queue, module, object, value);
}
--- /dev/null
+/*#############################################################################
+# #
+# collecty - A system statistics collection daemon for IPFire #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <sys/time.h>
+
+#include "ctx.h"
+#include "daemon.h"
+#include "queue.h"
+
+#define HEARTBEAT 60000000 // 60s (for testing)
+
+struct collecty_queue_object {
+ STAILQ_ENTRY(collecty_queue_object) nodes;
+
+ // Module
+ collecty_module* module;
+
+ // Object
+ char* object;
+
+ // Timestamp
+ struct timeval t;
+
+ // Value
+ char* value;
+};
+
+struct collecty_queue {
+ collecty_ctx* ctx;
+ int nrefs;
+
+ // Event Loop
+ sd_event* loop;
+
+ // Events
+ struct {
+ sd_event_source* flush;
+ } events;
+
+ // Queue
+ STAILQ_HEAD(queue, collecty_queue_object) queue;
+};
+
+static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
+ collecty_queue* self = data;
+ int r;
+
+ // Call again soon...
+ r = sd_event_source_set_time(source, usec + HEARTBEAT);
+ if (r < 0)
+ return r;
+
+ // Enable the timer
+ r = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
+ if (r < 0)
+ return r;
+
+ // Flush the queue
+ return collecty_queue_flush(self);
+}
+
+static void collecty_queue_free_object(struct collecty_queue_object* o) {
+ if (o->module)
+ collecty_module_unref(o->module);
+ if (o->object)
+ free(o->object);
+ if (o->value)
+ free(o->value);
+ free(o);
+}
+
+static void collecty_queue_free(collecty_queue* self) {
+ struct collecty_queue_object* o = NULL;
+
+ // Cleanup the queue
+ for (;;) {
+ o = STAILQ_FIRST(&self->queue);
+ if (!o)
+ break;
+
+ // Remove the object from the queue
+ STAILQ_REMOVE_HEAD(&self->queue, nodes);
+
+ // Free the object
+ collecty_queue_free_object(o);
+ }
+
+ if (self->events.flush)
+ sd_event_source_unref(self->events.flush);
+ if (self->ctx)
+ collecty_ctx_unref(self->ctx);
+ if (self->loop)
+ sd_event_unref(self->loop);
+ free(self);
+}
+
+int collecty_queue_create(collecty_queue** queue,
+ collecty_ctx* ctx, collecty_daemon* daemon) {
+ collecty_queue* self = NULL;
+ int r;
+
+ // Allocate some memory
+ self = calloc(1, sizeof(*self));
+ if (!self)
+ return -errno;
+
+ // Initialize the reference counter
+ self->nrefs = 1;
+
+ // Store a reference to the context
+ self->ctx = collecty_ctx_ref(ctx);
+
+ // Fetch a reference to the event loop
+ self->loop = collecty_daemon_loop(daemon);
+
+ // Initialize the queue
+ STAILQ_INIT(&self->queue);
+
+ // Create a timer which regularly flushes the queue
+ r = sd_event_add_time_relative(self->loop, &self->events.flush,
+ CLOCK_MONOTONIC, HEARTBEAT, 0, collecty_queue_heartbeat, self);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to set up the heartbeat timer: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Return the pointer
+ *queue = self;
+ return 0;
+
+ERROR:
+ if (self)
+ collecty_queue_unref(self);
+
+ return r;
+}
+
+collecty_queue* collecty_queue_ref(collecty_queue* self) {
+ ++self->nrefs;
+ return self;
+}
+
+collecty_queue* collecty_queue_unref(collecty_queue* self) {
+ if (--self->nrefs > 0)
+ return self;
+
+ collecty_queue_free(self);
+ return NULL;
+}
+
+/*
+ Submits a new reading into the queue
+*/
+int collecty_queue_submit(collecty_queue* self,
+ collecty_module* module, const char* object, const char* value) {
+ struct collecty_queue_object* o = NULL;
+ int r;
+
+ // Check inputs
+ if (!value)
+ return -EINVAL;
+
+ // Allocate some memory
+ o = calloc(1, sizeof(*self));
+ if (!o)
+ return -errno;
+
+ // Reference the module
+ o->module = collecty_module_ref(module);
+
+ // Fetch the current timestamp
+ r = gettimeofday(&o->t, NULL);
+ if (r < 0) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Store the object
+ if (o->object) {
+ o->object = strdup(object);
+ if (!o->object) {
+ r = -errno;
+ goto ERROR;
+ }
+ }
+
+ // Store the value
+ o->value = strdup(value);
+ if (!o->value) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Append the object to the queue
+ STAILQ_INSERT_TAIL(&self->queue, o, nodes);
+
+ // Log action
+ DEBUG(self->ctx, "%s(%s) submitted: %s\n",
+ collecty_module_name(module), (o->object) ? o->object : "", o->value);
+
+ return 0;
+
+ERROR:
+ if (o)
+ collecty_queue_free_object(o);
+
+ return r;
+}
+
+int collecty_queue_flush(collecty_queue* self) {
+ DEBUG(self->ctx, "Flushing the queue...\n");
+
+ // XXX TODO
+
+ return 0;
+}
--- /dev/null
+/*#############################################################################
+# #
+# collecty - A system statistics collection daemon for IPFire #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#ifndef COLLECTY_QUEUE_H
+#define COLLECTY_QUEUE_H
+
+typedef struct collecty_queue collecty_queue;
+
+#include "ctx.h"
+#include "daemon.h"
+#include "module.h"
+
+int collecty_queue_create(collecty_queue** queue,
+ collecty_ctx* ctx, collecty_daemon* daemon);
+
+collecty_queue* collecty_queue_ref(collecty_queue* self);
+collecty_queue* collecty_queue_unref(collecty_queue* self);
+
+int collecty_queue_submit(collecty_queue* self,
+ collecty_module* module, const char* object, const char* value);
+
+int collecty_queue_flush(collecty_queue* self);
+
+#endif /* COLLECTY_QUEUE_H */