#include <errno.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/queue.h>
+#include <sys/time.h>
#include <systemd/sd-daemon.h>
#include <systemd/sd-event.h>
#include "ctx.h"
#include "daemon.h"
+#include "module.h"
#include "modules.h"
+struct collecty_queue_object {
+ STAILQ_ENTRY(collecty_queue_object) nodes;
+
+ // Module
+ collecty_module* module;
+
+ // Object
+ char* object;
+
+ // Timestamp
+ struct timeval t;
+
+ // Value
+ char* value;
+};
+
struct collecty_daemon {
collecty_ctx* ctx;
int nrefs;
sd_event_source* sigint;
sd_event_source* modules_init;
} events;
+
+ // Write Queue
+ STAILQ_HEAD(queue, collecty_queue_object) queue;
};
static int collecty_daemon_modules_init(sd_event_source* source, void* data) {
return 0;
}
+static void collecty_daemon_free_queue_object(struct collecty_queue_object* o) {
+ if (o->module)
+ collecty_module_unref(o->module);
+ if (o->object)
+ free(o->object);
+ if (o->value)
+ free(o->value);
+}
+
+static int collecty_daemon_setup_queue(collecty_daemon* self) {
+ // Initialize the queue
+ STAILQ_INIT(&self->queue);
+
+ return 0;
+}
+
static void collecty_daemon_free(collecty_daemon* self) {
if (self->events.modules_init)
sd_event_source_unref(self->events.modules_init);
if (r < 0)
goto ERROR;
+ // Setup the write queue
+ r = collecty_daemon_setup_queue(self);
+ if (r < 0)
+ goto ERROR;
+
// Return the pointer
*daemon = self;
return 0;
return 1;
}
+
+/*
+ Submits a new reading into the queue
+*/
+int collecty_daemon_submit(collecty_daemon* self,
+ collecty_module* module, const char* object, const char* value) {
+ struct collecty_queue_object* o = NULL;
+ int r;
+
+ // Check inputs
+ if (!value)
+ return -EINVAL;
+
+ // Allocate some memory
+ o = calloc(1, sizeof(*self));
+ if (!o)
+ return -errno;
+
+ // Reference the module
+ o->module = collecty_module_ref(module);
+
+ // Fetch the current timestamp
+ r = gettimeofday(&o->t, NULL);
+ if (r < 0) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Store the object
+ if (o->object) {
+ o->object = strdup(object);
+ if (!o->object) {
+ r = -errno;
+ goto ERROR;
+ }
+ }
+
+ // Store the value
+ o->value = strdup(value);
+ if (!o->value) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Append the object to the queue
+ STAILQ_INSERT_TAIL(&self->queue, o, nodes);
+
+ // Log action
+ DEBUG(self->ctx, "%s(%s) submitted: %s\n",
+ collecty_module_name(module), (o->object) ? o->object : "", o->value);
+
+ return 0;
+
+ERROR:
+ if (o)
+ collecty_daemon_free_queue_object(o);
+
+ return r;
+}
typedef struct collecty_daemon collecty_daemon;
#include "ctx.h"
+#include "module.h"
int collecty_daemon_create(collecty_daemon** daemon, collecty_ctx* ctx);
int collecty_daemon_run(collecty_daemon* self);
+int collecty_daemon_submit(collecty_daemon* self,
+ collecty_module* module, const char* object, const char* value);
+
#endif /* COLLECTY_DAEMON_H */