src/daemon/logging.c \
src/daemon/logging.h \
src/daemon/main.c \
+ src/daemon/metrics.c \
+ src/daemon/metrics.h \
src/daemon/parse.c \
src/daemon/parse.h \
src/daemon/proc.c \
#include "daemon.h"
#include "graphs.h"
#include "graph-bus.h"
+#include "metrics.h"
#include "queue.h"
#include "source.h"
#include "sources.h"
return td_queue_submit(self->queue, source, object, value);
}
+int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics) {
+ return td_queue_submit_metrics(self->queue, metrics);
+}
+
int td_daemon_flush_source(
td_daemon* self, td_source* source, const char* object) {
return td_queue_flush_source(self->queue, source, object);
#include "bus.h"
#include "ctx.h"
#include "graphs.h"
+#include "metrics.h"
#include "source.h"
#include "sources.h"
int td_daemon_submit(td_daemon* self,
td_source* source, const char* object, const char* value);
+int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics);
int td_daemon_flush_source(
td_daemon* self, td_source* source, const char* object);
--- /dev/null
+/*#############################################################################
+# #
+# telemetryd - The IPFire Telemetry Collection Service #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/time.h>
+
+#include "ctx.h"
+#include "metrics.h"
+#include "source.h"
+#include "string.h"
+
+// The maximum length a formatted int64_t, uint64_t or double could have when
+// being represented as a string is 24 bytes. This should give us some extra space.
+#define VALUE_MAX 32
+
+typedef enum td_metric_type {
+ TD_METRIC_UNKNOWN = 0,
+ TD_METRIC_INT64,
+ TD_METRIC_UINT64,
+ TD_METRIC_FLOAT,
+} td_metric_type;
+
+typedef struct td_metric {
+ // Type
+ td_metric_type type;
+
+ // Value
+ union {
+ // TD_METRIC_INT64
+ int64_t _int64;
+
+ // TD_METRIC_UINT64
+ uint64_t _uint64;
+
+ // TD_METRIC_FLOAT
+ double _float;
+ } value;
+} td_metric;
+
+struct td_metrics {
+ td_ctx* ctx;
+ int nrefs;
+
+ // Source
+ td_source* source;
+
+ // Object
+ char* object;
+
+ // Time
+ struct timeval t;
+
+ // Metrics
+ td_metric* metrics;
+ unsigned int num_metrics;
+};
+
+static int td_metrics_valid_object(td_metrics* self, const char* object) {
+ // Check for any invalid characters
+ for (const char* p = object; *p; p++) {
+ switch (*p) {
+ // Whitespace is not allowed
+ case ' ':
+ case '\t':
+ case '\n':
+ break;
+
+ // Slashes are not allowed
+ case '/':
+ case '\\':
+ break;
+
+ // Quotes are not allowed
+ case '"':
+ case '\'':
+ break;
+
+ // Colons are now allowed
+ case ':':
+ break;
+
+ // The rest is allowed
+ default:
+ continue;
+ }
+
+ // Log error
+ ERROR(self->ctx, "%s submitted an invalid object: %s\n",
+ td_source_name(self->source), object);
+
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static void td_metrics_free(td_metrics* self) {
+ if (self->source)
+ td_source_unref(self->source);
+ if (self->ctx)
+ td_ctx_unref(self->ctx);
+ if (self->metrics)
+ free(self->metrics);
+ free(self);
+}
+
+int td_metrics_create(td_metrics** metrics, td_ctx* ctx, td_source* source, const char* object) {
+ td_metrics* self = NULL;
+ int r;
+
+ // Allocate some memory
+ self = calloc(1, sizeof(*self));
+ if (!self)
+ return -errno;
+
+ // Initialize the reference counter
+ self->nrefs = 1;
+
+ // Store a reference to the context
+ self->ctx = td_ctx_ref(ctx);
+
+ // Store a reference to the source
+ self->source = td_source_ref(source);
+
+ // Store the object
+ if (object) {
+ // Check the object
+ r = td_metrics_valid_object(self, object);
+ if (r < 0)
+ goto ERROR;
+
+ self->object = strdup(object);
+ if (!self->object) {
+ r = -errno;
+ goto ERROR;
+ }
+ }
+
+ // Fetch the current time
+ r = gettimeofday(&self->t, NULL);
+ if (r < 0) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Fetch the number of data sources
+ self->num_metrics = td_source_num_data_sources(self->source);
+
+ // Fail if there are no data sources
+ if (!self->num_metrics) {
+ ERROR(self->ctx, "Source %s has no data sources\n", td_source_name(self->source));
+ r = -EINVAL;
+ goto ERROR;
+ }
+
+ // Allocate space for the metrics
+ self->metrics = calloc(self->num_metrics, sizeof(*self->metrics));
+ if (!self->metrics) {
+ ERROR(self->ctx, "Failed to allocate %u metric(s): %m\n", self->num_metrics);
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Return the pointer
+ *metrics = self;
+ return 0;
+
+ERROR:
+ if (self)
+ td_metrics_unref(self);
+
+ return r;
+}
+
+td_metrics* td_metrics_ref(td_metrics* self) {
+ ++self->nrefs;
+ return self;
+}
+
+td_metrics* td_metrics_unref(td_metrics* self) {
+ if (--self->nrefs > 0)
+ return self;
+
+ td_metrics_free(self);
+ return NULL;
+}
+
+td_source* td_metrics_get_source(td_metrics* self) {
+ return td_source_ref(self->source);
+}
+
+const char* td_metrics_get_object(td_metrics* self) {
+ return self->object;
+}
+
+const struct timeval* td_metrics_get_time(td_metrics* self) {
+ return &self->t;
+}
+
+#define td_metrics_format_time(self, buffer) \
+ __td_metrics_format_time(self, buffer, sizeof(buffer))
+
+static int __td_metrics_format_time(td_metrics* self, char* buffer, size_t length) {
+ struct tm* p = NULL;
+ struct tm tm = {};
+ int r;
+
+ // Convert time
+ p = gmtime_r(&self->t.tv_sec, &tm);
+ if (!p)
+ return -errno;
+
+ // Format to string
+ r = strftime(buffer, length, "%Y-%m-%dT%H:%M:%S", &tm);
+ if (r < 0)
+ return -errno;
+
+ return 0;
+}
+
+#define td_metrics_format_value(self, metric, value) \
+ __td_metrics_format_value(self, metric, value, sizeof(value))
+
+static int __td_metrics_format_value(td_metrics* self,
+ const td_metric* metric, char* value, size_t length) {
+ switch (metric->type) {
+ case TD_METRIC_UNKNOWN:
+ return __td_string_set(value, length, "U");
+
+ case TD_METRIC_INT64:
+ return __td_string_format(value, length, "%ld", metric->value._int64);
+
+ case TD_METRIC_UINT64:
+ return __td_string_format(value, length, "%lu", metric->value._uint64);
+
+ case TD_METRIC_FLOAT:
+ return __td_string_format(value, length, "%lf", metric->value._float);
+ }
+
+ // We should never get here
+ return -ENOTSUP;
+}
+
+int td_metrics_dump(td_metrics* self) {
+ const td_rrd_ds* sources = NULL;
+ char value[VALUE_MAX];
+ char time[32];
+ int r;
+
+ // Fetch all sources
+ sources = td_source_get_data_sources(self->source);
+ if (!sources)
+ return -ENOTSUP;
+
+ // Format the time
+ r = td_metrics_format_time(self, time);
+ if (r < 0)
+ return r;
+
+ DEBUG(self->ctx, "Metrics for %s (%s) at %s\n",
+ td_source_name(self->source), (self->object) ? self->object : "N/A", time);
+
+ // Fetch metrics
+ td_metric* metric = self->metrics;
+
+ // Dump all fields
+ for (const td_rrd_ds* ds = sources; ds->field; ds++, metric++) {
+ // Format the value
+ r = td_metrics_format_value(self, metric, value);
+ if (r < 0)
+ return r;
+
+ // Log the value
+ DEBUG(self->ctx, " %-20s : %s\n", ds->field, value);
+ }
+
+ return 0;
+}
+
+static int td_metrics_find_index(td_metrics* self, const char* field) {
+ const td_rrd_ds* sources = NULL;
+ int index = 0;
+
+ // Check input
+ if (!field)
+ return -EINVAL;
+
+ // Fetch all sources
+ sources = td_source_get_data_sources(self->source);
+ if (!sources)
+ return -ENOTSUP;
+
+ // Search for the right field
+ for (const td_rrd_ds* ds = sources; ds->field; ds++, index++) {
+ if (td_string_equals(ds->field, field))
+ return index;
+ }
+
+ // Show error
+ ERROR(self->ctx, "Failed to find index of field '%s' in data sources for %s\n",
+ field, td_source_name(self->source));
+
+ // Not found
+ return -ENOENT;
+}
+
+static int td_metrics_push_value(td_metrics* self,
+ const char* field, td_metric_type type, void* value) {
+ td_metric* metric = NULL;
+ int index;
+
+ // Fetch the index of the field
+ index = td_metrics_find_index(self, field);
+ if (index < 0)
+ return index;
+
+ // Fetch the metric
+ metric = &self->metrics[index];
+
+ // Store the type
+ metric->type = type;
+
+ // Store the value
+ switch (type) {
+ case TD_METRIC_UNKNOWN:
+ break;
+
+ case TD_METRIC_INT64:
+ metric->value._int64 = *(int64_t*)value;
+ break;
+
+ case TD_METRIC_UINT64:
+ metric->value._uint64 = *(uint64_t*)value;
+ break;
+
+ case TD_METRIC_FLOAT:
+ metric->value._float = *(double*)value;
+ break;
+ }
+
+ return 0;
+}
+
+int td_metrics_push_int64(td_metrics* self, const char* field, int64_t value) {
+ return td_metrics_push_value(self, field, TD_METRIC_INT64, &value);
+}
+
+int td_metrics_push_uint64(td_metrics* self, const char* field, uint64_t value) {
+ return td_metrics_push_value(self, field, TD_METRIC_UINT64, &value);
+}
+
+int td_metrics_push_float(td_metrics* self, const char* field, double value) {
+ return td_metrics_push_value(self, field, TD_METRIC_FLOAT, &value);
+}
+
+int __td_metrics_serialize(td_metrics* self, char* buffer, size_t length) {
+ td_metric* metric = NULL;
+ ssize_t bytes_written = 0;
+ ssize_t bytes_left = length;
+ char* p = buffer;
+
+ // Begin with the timestamp
+ bytes_written = snprintf(p, bytes_left, "%ld", self->t.tv_sec);
+ if (bytes_written < 0)
+ return -errno;
+
+ // Abort if we have run out of space
+ else if (bytes_written >= bytes_left)
+ return -ENOBUFS;
+
+ // Advance p
+ p += bytes_written;
+ bytes_left -= bytes_written;
+
+ // Append all metrics
+ for (unsigned int i = 0; i < self->num_metrics; i++) {
+ metric = &self->metrics[i];
+
+ // Write the value
+ switch (metric->type) {
+ case TD_METRIC_UNKNOWN:
+ bytes_written = snprintf(p, bytes_left, ":U");
+ break;
+
+ case TD_METRIC_INT64:
+ bytes_written = snprintf(p, bytes_left, ":%ld", metric->value._int64);
+ break;
+
+ case TD_METRIC_UINT64:
+ bytes_written = snprintf(p, bytes_left, ":%lu", metric->value._uint64);
+ break;
+
+ case TD_METRIC_FLOAT:
+ bytes_written = snprintf(p, bytes_left, ":%lf", metric->value._float);
+ break;
+ }
+
+ // Abort on error
+ if (bytes_written < 0)
+ return -errno;
+
+ // Abort if we have run out of buffer space
+ else if (bytes_written >= bytes_left)
+ return -ENOBUFS;
+
+ // Advance p
+ p += bytes_written;
+ bytes_left -= bytes_written;
+ }
+
+ return 0;
+}
--- /dev/null
+/*#############################################################################
+# #
+# telemetryd - The IPFire Telemetry Collection Service #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#ifndef TELEMETRY_METRICS_H
+#define TELEMETRY_METRICS_H
+
+typedef struct td_metrics td_metrics;
+
+#include "ctx.h"
+#include "source.h"
+
+int td_metrics_create(td_metrics** metrics,
+ td_ctx* ctx, td_source* source, const char* object);
+
+td_metrics* td_metrics_ref(td_metrics* self);
+td_metrics* td_metrics_unref(td_metrics* self);
+
+td_source* td_metrics_get_source(td_metrics* self);
+const char* td_metrics_get_object(td_metrics* self);
+const struct timeval* td_metrics_get_time(td_metrics* self);
+
+int td_metrics_dump(td_metrics* self);
+
+int td_metrics_push_int64(td_metrics* self, const char* field, int64_t value);
+int td_metrics_push_uint64(td_metrics* self, const char* field, uint64_t value);
+int td_metrics_push_float(td_metrics* self, const char* field, double value);
+
+#define td_metrics_serialize(metrics, buffer) \
+ __td_metrics_serialize(metrics, buffer, sizeof(buffer))
+
+int __td_metrics_serialize(td_metrics* self, char* buffer, size_t length);
+
+#endif /* TELEMETRY_METRICS_H */
// Samples
char** samples;
unsigned int num_samples;
+
+ // Metrics
+ td_metrics** metrics;
+ unsigned int num_metrics;
};
struct td_queue {
}
static void td_queue_free_object(struct td_queue_object* o) {
+ if (o->metrics) {
+ for (unsigned int i = 0; i < o->num_metrics; i++)
+ td_metrics_unref(o->metrics[i]);
+ free(o->metrics);
+ }
+
if (o->samples) {
for (unsigned int i = 0; i < o->num_samples; i++)
free(o->samples[i]);
return NULL;
}
+static struct td_queue_object* td_queue_make_object(
+ td_queue* self, td_source* source, const char* object) {
+ struct td_queue_object* o = NULL;
+
+ // Allocate some memory
+ o = calloc(1, sizeof(*o));
+ if (!o)
+ goto ERROR;
+
+ // Reference the source
+ o->source = td_source_ref(source);
+
+ // Store the object
+ if (object) {
+ o->object = strdup(object);
+ if (!o->object)
+ goto ERROR;
+ }
+
+ // Append the object to the queue
+ STAILQ_INSERT_TAIL(&self->queue, o, nodes);
+
+ return o;
+
+ERROR:
+ if (o)
+ td_queue_free_object(o);
+
+ return NULL;
+}
+
static struct td_queue_object* td_queue_find_object(
td_queue* self, td_source* source, const char* object) {
struct td_queue_object* o = NULL;
return NULL;
}
+static struct td_queue_object* td_queue_find_object_by_metrics(
+ td_queue* self, td_metrics* metrics) {
+ struct td_queue_object* o = NULL;
+ const char* object = NULL;
+ td_source* source = NULL;
+
+ // Fetch the source
+ source = td_metrics_get_source(metrics);
+ if (!source)
+ return NULL;
+
+ // Fetch the object
+ object = td_metrics_get_object(metrics);
+
+ // Find the object
+ o = td_queue_find_object(self, source, object);
+
+ // Allocate a new object if we could not find one
+ if (!o)
+ o = td_queue_make_object(self, source, object);
+
+ // Cleanup
+ td_source_unref(source);
+
+ return o;
+}
+
static int td_queue_object_append_sample(td_queue* self, td_source* source,
const char* object, struct td_queue_object* o, const char* sample) {
struct timeval t = {};
return 0;
}
-static int td_queue_valid_object(td_queue* queue, const char* object) {
- // Check for any invalid characters
- for (const char* p = object; *p; p++) {
- switch (*p) {
- // Whitespace is not allowed
- case ' ':
- case '\t':
- case '\n':
- break;
-
- // Slashes are not allowed
- case '/':
- case '\\':
- break;
-
- // Quotes are not allowed
- case '"':
- case '\'':
- break;
-
- // Colons are now allowed
- case ':':
- break;
-
- // The rest is allowed
- default:
- continue;
- }
+static int td_queue_object_append_metrics(td_queue* self,
+ struct td_queue_object* o, td_metrics* m) {
+ td_metrics** metrics = NULL;
- return -EINVAL;
- }
+ // Increase the size of the array
+ metrics = reallocarray(o->metrics, o->num_metrics + 1, sizeof(*o->metrics));
+ if (!metrics)
+ return -errno;
+
+ // Assign the metrics
+ metrics[o->num_metrics++] = td_metrics_ref(m);
+
+ // Replace the array
+ o->metrics = metrics;
return 0;
}
if (!sample)
return -EINVAL;
- // Check if the object is valid
- if (object) {
- r = td_queue_valid_object(self, object);
- if (r < 0) {
- ERROR(self->ctx, "%s has submitted an invalid object: %s\n",
- td_source_name(source), object);
- goto ERROR;
- }
- }
-
// Check if we can append the sample
o = td_queue_find_object(self, source, object);
if (o)
return r;
}
+int td_queue_submit_metrics(td_queue* self, td_metrics* metrics) {
+ struct td_queue_object* o = NULL;
+ int r;
+
+ // Check inputs
+ if (!metrics)
+ return -EINVAL;
+
+ // Dump the metrics
+ r = td_metrics_dump(metrics);
+ if (r < 0)
+ return r;
+
+ // Check if we can append the metrics
+ o = td_queue_find_object_by_metrics(self, metrics);
+ if (!o)
+ return -errno;
+
+ // Append the metrics
+ return td_queue_object_append_metrics(self, o, metrics);
+}
+
static int td_queue_flush_object(td_queue* self, struct td_queue_object* o) {
int r;
// Call the source to write its data
- r = td_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
- if (r < 0) {
- ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
- td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+ if (o->num_samples) {
+ r = td_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+ td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+ }
+ }
+
+ // Call the source to write its metrics
+ if (o->num_metrics) {
+ r = td_source_commit_metrics(o->source, o->object, o->num_metrics, o->metrics);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write metrics for %s(%s): %s\n",
+ td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+ }
}
// Remove the object from the queue
#include "ctx.h"
#include "daemon.h"
+#include "metrics.h"
#include "source.h"
int td_queue_create(td_queue** queue,
int td_queue_submit(td_queue* self,
td_source* source, const char* object, const char* sample);
+int td_queue_submit_metrics(td_queue* self, td_metrics* metrics);
int td_queue_flush(td_queue* self);
return td_daemon_get_udev(self->daemon);
}
+unsigned int td_source_num_data_sources(td_source* self) {
+ unsigned int num_sources = 0;
+
+ // Count all data sources
+ for (const td_rrd_ds* ds = self->impl->rrd_dss; ds->field; ds++)
+ num_sources++;
+
+ return num_sources;
+}
+
+const td_rrd_ds* td_source_get_data_sources(td_source* self) {
+ return self->impl->rrd_dss;
+}
+
+int td_source_create_metrics(td_source* self, td_metrics** metrics, const char* object) {
+ return td_metrics_create(metrics, self->ctx, self, object);
+}
+
int td_source_create_command(td_source* self, td_command** command) {
return td_command_create(command, self->ctx, self->daemon);
}
return td_daemon_submit(self->daemon, self, object, value);
}
+int td_source_submit_metrics(td_source* self, td_metrics* metrics) {
+ return td_daemon_submit_metrics(self->daemon, metrics);
+}
+
static int td_source_create_database(td_source* self, const char* path, const char* source) {
td_args* args = NULL;
char min[24];
return 0;
}
+int td_source_commit_metrics(td_source* self, const char* object,
+ unsigned int num_metrics, td_metrics** metrics) {
+ char** samples = NULL;
+ char sample[4096];
+ int r;
+
+ for (unsigned int i = 0; i < num_metrics; i++) {
+ // Serialize the metrics
+ r = td_metrics_serialize(metrics[i], sample);
+ if (r < 0)
+ goto ERROR;
+
+ // Append it to the array
+ r = td_strings_append(&samples, sample);
+ if (r < 0)
+ goto ERROR;
+ }
+
+ // Commit the samples
+ r = td_source_commit(self, object, num_metrics, (const char**)samples);
+
+ERROR:
+ if (samples)
+ td_strings_free(samples);
+
+ return r;
+}
+
static int td_source_render_add_DEF(td_source* self,
td_args* args, const char* path, const td_rrd_ds* ds, const char* object) {
int r;
#include "command.h"
#include "ctx.h"
#include "daemon.h"
+#include "metrics.h"
#define MAX_DS 64
#define MAX_RRA 8
struct udev* td_source_get_udev(td_source* self);
+unsigned int td_source_num_data_sources(td_source* self);
+const td_rrd_ds* td_source_get_data_sources(td_source* self);
+
+int td_source_create_metrics(td_source* self, td_metrics** metrics, const char* object);
int td_source_create_command(td_source* self, td_command** command);
int td_source_submit(td_source* self, const char* object,
const char* format, ...) __attribute__((format(printf, 3, 4)));
+int td_source_submit_metrics(td_source* self, td_metrics* metrics);
int td_source_commit(td_source* self,
const char* object, unsigned int num_samples, const char** samples);
+int td_source_commit_metrics(td_source* self, const char* object,
+ unsigned int num_metrics, td_metrics** metrics);
int td_source_render(td_source* self,
td_args* args, const char* object);
#include "uptime.h"
static int uptime_heartbeat(td_ctx* ctx, td_source* source) {
+ td_metrics* metrics = NULL;
struct sysinfo info = {};
int r;
if (r < 0)
return -errno;
- // Submit the uptime
- return td_source_submit(source, NULL, "%ld", info.uptime);
+ // Create metrics
+ r = td_source_create_metrics(source, &metrics, NULL);
+ if (r < 0)
+ goto ERROR;
+
+ // Push uptime
+ r = td_metrics_push_int64(metrics, "uptime", info.uptime);
+ if (r < 0)
+ goto ERROR;
+
+ // Submit the metrics
+ r = td_source_submit_metrics(source, metrics);
+ if (r < 0)
+ goto ERROR;
+
+ERROR:
+ if (metrics)
+ td_metrics_unref(metrics);
+
+ return r;
}
const td_source_impl uptime_source = {