#############################################################################*/
#include <errno.h>
+#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <rrd.h>
+
#include <systemd/sd-event.h>
#include "ctx.h"
return self->methods->name;
}
+#define collecty_module_path(module, object, path) \
+ __collecty_module_path(module, object, path, sizeof(path))
+
+static int __collecty_module_path(collecty_module* self,
+ const char* object, char* path, size_t length) {
+ int r;
+
+ // Fetch the module name
+ const char* name = collecty_module_name(self);
+
+ if (object)
+ r = snprintf(path, length, "%s/%s-%s.rrd", DATABASE_PATH, name, object);
+ else
+ r = snprintf(path, length, "%s/%s.rrd", DATABASE_PATH, name);
+
+ // Handle errors
+ if (r < 0)
+ return -errno;
+
+ return 0;
+}
+
/*
Called when a module has some data to submit
*/
// Submit the data to the daemon
return collecty_daemon_submit(self->daemon, self, object, value);
}
+
+/*
+ Called to write all collected samples to disk
+*/
+int collecty_module_commit(collecty_module* self,
+ const char* object, unsigned int num_samples, const char** samples) {
+ char path[PATH_MAX];
+ int r;
+
+ // Make the path
+ r = collecty_module_path(self, object, path);
+ if (r < 0)
+ return r;
+
+ // Write the samples
+ r = rrd_update_r(path, NULL, num_samples, samples);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write to %s: %s\n", path, rrd_get_error());
+ return -EFAULT;
+ }
+
+ return 0;
+}
int collecty_module_submit(collecty_module* self, const char* object,
const char* format, ...) __attribute__((format(printf, 3, 4)));
+int collecty_module_commit(collecty_module* self,
+ const char* object, unsigned int num_samples, const char** samples);
+
#endif /* COLLECTY_MODULE_H */
}
int collecty_queue_flush(collecty_queue* self) {
+ struct collecty_queue_object* o = NULL;
+ int r;
+
DEBUG(self->ctx, "Flushing the queue...\n");
- // XXX TODO
+ for (;;) {
+ o = STAILQ_FIRST(&self->queue);
+ if (!o)
+ break;
+
+ // Call the module to write its data
+ r = collecty_module_commit(o->module, o->object, o->num_samples, (const char**)o->samples);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+ collecty_module_name(o->module), (o->object) ? o->object : NULL, strerror(-r));
+ }
+
+ // Remove the object from the queue
+ STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+
+ // Free the object
+ collecty_queue_free_object(o);
+ }
return 0;
}