return 1;
}
-int td_daemon_submit(td_daemon* self,
- td_source* source, const char* object, const char* value) {
- // Log action
- DEBUG(self->ctx, "%s(%s) submitted: %s\n",
- td_source_name(source), (object) ? object : "", value);
-
- return td_queue_submit(self->queue, source, object, value);
-}
-
int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics) {
return td_queue_submit_metrics(self->queue, metrics);
}
int td_daemon_run(td_daemon* self);
-int td_daemon_submit(td_daemon* self,
- td_source* source, const char* object, const char* value);
int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics);
int td_daemon_flush_source(
// Object
char* object;
- // Samples
- char** samples;
- unsigned int num_samples;
-
// Metrics
td_metrics** metrics;
unsigned int num_metrics;
free(o->metrics);
}
- if (o->samples) {
- for (unsigned int i = 0; i < o->num_samples; i++)
- free(o->samples[i]);
- free(o->samples);
- }
-
if (o->source)
td_source_unref(o->source);
if (o->object)
return o;
}
-static int td_queue_object_append_sample(td_queue* self, td_source* source,
- const char* object, struct td_queue_object* o, const char* sample) {
- struct timeval t = {};
- char** samples = NULL;
- char* s = NULL;
- int r;
-
- // Fetch the current timestamp
- r = gettimeofday(&t, NULL);
- if (r < 0)
- return -errno;
-
- // Increase the size of the array
- samples = reallocarray(o->samples, o->num_samples + 1, sizeof(*o->samples));
- if (!samples)
- return -errno;
-
- // Prepend the timestamp to the sample
- r = asprintf(&s, "%ld:%s", t.tv_sec, sample);
- if (r < 0)
- return -errno;
-
- // Assign the sample
- samples[o->num_samples++] = s;
-
- // Replace the array
- o->samples = samples;
-
- return 0;
-}
-
static int td_queue_object_append_metrics(td_queue* self,
struct td_queue_object* o, td_metrics* m) {
td_metrics** metrics = NULL;
/*
Submits a new reading into the queue
*/
-int td_queue_submit(td_queue* self,
- td_source* source, const char* object, const char* sample) {
- struct td_queue_object* o = NULL;
- int r;
-
- // Check inputs
- if (!sample)
- return -EINVAL;
-
- // Check if we can append the sample
- o = td_queue_find_object(self, source, object);
- if (o)
- return td_queue_object_append_sample(self, source, object, o, sample);
-
- // Allocate some memory
- o = calloc(1, sizeof(*o));
- if (!o)
- return -errno;
-
- // Reference the source
- o->source = td_source_ref(source);
-
- // Store the object
- if (object) {
- o->object = strdup(object);
- if (!o->object) {
- r = -errno;
- goto ERROR;
- }
- }
-
- // Store the sample
- r = td_queue_object_append_sample(self, source, object, o, sample);
- if (r < 0)
- goto ERROR;
-
- // Append the object to the queue
- STAILQ_INSERT_TAIL(&self->queue, o, nodes);
-
- return 0;
-
-ERROR:
- if (o)
- td_queue_free_object(o);
-
- return r;
-}
-
int td_queue_submit_metrics(td_queue* self, td_metrics* metrics) {
struct td_queue_object* o = NULL;
int r;
static int td_queue_flush_object(td_queue* self, struct td_queue_object* o) {
int r;
- // Call the source to write its data
- if (o->num_samples) {
- r = td_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
- if (r < 0) {
- ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
- td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
- }
- }
-
// Call the source to write its metrics
- if (o->num_metrics) {
- r = td_source_commit_metrics(o->source, o->object, o->num_metrics, o->metrics);
- if (r < 0) {
- ERROR(self->ctx, "Failed to write metrics for %s(%s): %s\n",
- td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
- }
+ r = td_source_commit_metrics(o->source, o->object, o->num_metrics, o->metrics);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write metrics for %s(%s): %s\n",
+ td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
}
// Remove the object from the queue
td_queue* td_queue_ref(td_queue* self);
td_queue* td_queue_unref(td_queue* self);
-int td_queue_submit(td_queue* self,
- td_source* source, const char* object, const char* sample);
int td_queue_submit_metrics(td_queue* self, td_metrics* metrics);
int td_queue_flush(td_queue* self);
return __td_string_format(path, length, "%s/%s.rrd", DATABASE_PATH, name);
}
-/*
- Called when a source has some data to submit
-*/
-int td_source_submit(td_source* self,
- const char* object, const char* format, ...) {
- char value[2048];
- va_list args;
- int r;
-
- // Format the arguments
- va_start(args, format);
- r = td_string_vformat(value, format, args);
- va_end(args);
-
- // Handle errors
- if (r < 0)
- return r;
-
- // Submit the data to the daemon
- return td_daemon_submit(self->daemon, self, object, value);
-}
-
int td_source_submit_values(td_source* self,
const char* object, const td_metric_value* values) {
td_metrics* metrics = NULL;
/*
Called to write all collected samples to disk
*/
-int td_source_commit(td_source* self,
+static int td_source_commit_samples(td_source* self,
const char* object, unsigned int num_samples, const char** samples) {
struct stat st = {};
char path[PATH_MAX];
}
// Commit the samples
- r = td_source_commit(self, object, num_metrics, (const char**)samples);
+ r = td_source_commit_samples(self, object, num_metrics, (const char**)samples);
ERROR:
if (samples)
int td_source_create_metrics(td_source* self, td_metrics** metrics, const char* object);
int td_source_create_command(td_source* self, td_command** command);
-int td_source_submit(td_source* self, const char* object,
- const char* format, ...) __attribute__((format(printf, 3, 4)));
int td_source_submit_metrics(td_source* self, td_metrics* metrics);
int td_source_submit_values(td_source* self,
const char* object, const td_metric_value* values);
-int td_source_commit(td_source* self,
- const char* object, unsigned int num_samples, const char** samples);
int td_source_commit_metrics(td_source* self, const char* object,
unsigned int num_metrics, td_metrics** metrics);