]> git.ipfire.org Git - collecty.git/commitdiff
Add a new metrics object
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 22 Oct 2025 16:25:27 +0000 (16:25 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 22 Oct 2025 16:25:27 +0000 (16:25 +0000)
When we have larger RRD files, it will become increasingly difficult to
serialize the format strings. This new object is supposed to store
metrics more efficiently and with more metadata.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/daemon/daemon.c
src/daemon/daemon.h
src/daemon/metrics.c [new file with mode: 0644]
src/daemon/metrics.h [new file with mode: 0644]
src/daemon/queue.c
src/daemon/queue.h
src/daemon/source.c
src/daemon/source.h
src/daemon/sources/uptime.c

index 93c1622f14afb620c55f1877d138f200e9599c07..3e9ae768236ff6ffeaa39ea8f9a7542fb044b673 100644 (file)
@@ -135,6 +135,8 @@ dist_telemetryd_SOURCES = \
        src/daemon/logging.c \
        src/daemon/logging.h \
        src/daemon/main.c \
+       src/daemon/metrics.c \
+       src/daemon/metrics.h \
        src/daemon/parse.c \
        src/daemon/parse.h \
        src/daemon/proc.c \
index 3ea05f33242dee578934bc85738c2e5de7af6a1e..5b8c5b9be1d03d61bdb50f75fb4101e523adb469 100644 (file)
@@ -31,6 +31,7 @@
 #include "daemon.h"
 #include "graphs.h"
 #include "graph-bus.h"
+#include "metrics.h"
 #include "queue.h"
 #include "source.h"
 #include "sources.h"
@@ -389,6 +390,10 @@ int td_daemon_submit(td_daemon* self,
        return td_queue_submit(self->queue, source, object, value);
 }
 
+int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics) {
+       return td_queue_submit_metrics(self->queue, metrics);
+}
+
 int td_daemon_flush_source(
                td_daemon* self, td_source* source, const char* object) {
        return td_queue_flush_source(self->queue, source, object);
index c36d388d62e53f5b0494ec91210690db2aea40c7..07e545e0d54a06f6758c3c00feac75078b91372b 100644 (file)
@@ -29,6 +29,7 @@ typedef struct td_daemon td_daemon;
 #include "bus.h"
 #include "ctx.h"
 #include "graphs.h"
+#include "metrics.h"
 #include "source.h"
 #include "sources.h"
 
@@ -46,6 +47,7 @@ int td_daemon_run(td_daemon* self);
 
 int td_daemon_submit(td_daemon* self,
                td_source* source, const char* object, const char* value);
+int td_daemon_submit_metrics(td_daemon* self, td_metrics* metrics);
 
 int td_daemon_flush_source(
        td_daemon* self, td_source* source, const char* object);
diff --git a/src/daemon/metrics.c b/src/daemon/metrics.c
new file mode 100644 (file)
index 0000000..8fc9722
--- /dev/null
@@ -0,0 +1,430 @@
+/*#############################################################################
+#                                                                             #
+# telemetryd - The IPFire Telemetry Collection Service                        #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/time.h>
+
+#include "ctx.h"
+#include "metrics.h"
+#include "source.h"
+#include "string.h"
+
+// The maximum length a formatted int64_t, uint64_t or double could have when
+// being represented as a string is 24 bytes. This should give us some extra space.
+#define VALUE_MAX 32
+
+typedef enum td_metric_type {
+       TD_METRIC_UNKNOWN = 0,
+       TD_METRIC_INT64,
+       TD_METRIC_UINT64,
+       TD_METRIC_FLOAT,
+} td_metric_type;
+
+typedef struct td_metric {
+       // Type
+       td_metric_type type;
+
+       // Value
+       union {
+               // TD_METRIC_INT64
+               int64_t _int64;
+
+               // TD_METRIC_UINT64
+               uint64_t _uint64;
+
+               // TD_METRIC_FLOAT
+               double _float;
+       } value;
+} td_metric;
+
+struct td_metrics {
+       td_ctx* ctx;
+       int nrefs;
+
+       // Source
+       td_source* source;
+
+       // Object
+       char* object;
+
+       // Time
+       struct timeval t;
+
+       // Metrics
+       td_metric* metrics;
+       unsigned int num_metrics;
+};
+
+static int td_metrics_valid_object(td_metrics* self, const char* object) {
+       // Check for any invalid characters
+       for (const char* p = object; *p; p++) {
+               switch (*p) {
+                       // Whitespace is not allowed
+                       case ' ':
+                       case '\t':
+                       case '\n':
+                               break;
+
+                       // Slashes are not allowed
+                       case '/':
+                       case '\\':
+                               break;
+
+                       // Quotes are not allowed
+                       case '"':
+                       case '\'':
+                               break;
+
+                       // Colons are now allowed
+                       case ':':
+                               break;
+
+                       // The rest is allowed
+                       default:
+                               continue;
+               }
+
+               // Log error
+               ERROR(self->ctx, "%s submitted an invalid object: %s\n",
+                       td_source_name(self->source), object);
+
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static void td_metrics_free(td_metrics* self) {
+       if (self->source)
+               td_source_unref(self->source);
+       if (self->ctx)
+               td_ctx_unref(self->ctx);
+       if (self->metrics)
+               free(self->metrics);
+       free(self);
+}
+
+int td_metrics_create(td_metrics** metrics, td_ctx* ctx, td_source* source, const char* object) {
+       td_metrics* self = NULL;
+       int r;
+
+       // Allocate some memory
+       self = calloc(1, sizeof(*self));
+       if (!self)
+               return -errno;
+
+       // Initialize the reference counter
+       self->nrefs = 1;
+
+       // Store a reference to the context
+       self->ctx = td_ctx_ref(ctx);
+
+       // Store a reference to the source
+       self->source = td_source_ref(source);
+
+       // Store the object
+       if (object) {
+               // Check the object
+               r = td_metrics_valid_object(self, object);
+               if (r < 0)
+                       goto ERROR;
+
+               self->object = strdup(object);
+               if (!self->object) {
+                       r = -errno;
+                       goto ERROR;
+               }
+       }
+
+       // Fetch the current time
+       r = gettimeofday(&self->t, NULL);
+       if (r < 0) {
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Fetch the number of data sources
+       self->num_metrics = td_source_num_data_sources(self->source);
+
+       // Fail if there are no data sources
+       if (!self->num_metrics) {
+               ERROR(self->ctx, "Source %s has no data sources\n", td_source_name(self->source));
+               r = -EINVAL;
+               goto ERROR;
+       }
+
+       // Allocate space for the metrics
+       self->metrics = calloc(self->num_metrics, sizeof(*self->metrics));
+       if (!self->metrics) {
+               ERROR(self->ctx, "Failed to allocate %u metric(s): %m\n", self->num_metrics);
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Return the pointer
+       *metrics = self;
+       return 0;
+
+ERROR:
+       if (self)
+               td_metrics_unref(self);
+
+       return r;
+}
+
+td_metrics* td_metrics_ref(td_metrics* self) {
+       ++self->nrefs;
+       return self;
+}
+
+td_metrics* td_metrics_unref(td_metrics* self) {
+       if (--self->nrefs > 0)
+               return self;
+
+       td_metrics_free(self);
+       return NULL;
+}
+
+td_source* td_metrics_get_source(td_metrics* self) {
+       return td_source_ref(self->source);
+}
+
+const char* td_metrics_get_object(td_metrics* self) {
+       return self->object;
+}
+
+const struct timeval* td_metrics_get_time(td_metrics* self) {
+       return &self->t;
+}
+
+#define td_metrics_format_time(self, buffer) \
+       __td_metrics_format_time(self, buffer, sizeof(buffer))
+
+static int __td_metrics_format_time(td_metrics* self, char* buffer, size_t length) {
+       struct tm* p = NULL;
+       struct tm tm = {};
+       int r;
+
+       // Convert time
+       p = gmtime_r(&self->t.tv_sec, &tm);
+       if (!p)
+               return -errno;
+
+       // Format to string
+       r = strftime(buffer, length, "%Y-%m-%dT%H:%M:%S", &tm);
+       if (r < 0)
+               return -errno;
+
+       return 0;
+}
+
+#define td_metrics_format_value(self, metric, value) \
+       __td_metrics_format_value(self, metric, value, sizeof(value))
+
+static int __td_metrics_format_value(td_metrics* self,
+               const td_metric* metric, char* value, size_t length) {
+       switch (metric->type) {
+               case TD_METRIC_UNKNOWN:
+                       return __td_string_set(value, length, "U");
+
+               case TD_METRIC_INT64:
+                       return __td_string_format(value, length, "%ld", metric->value._int64);
+
+               case TD_METRIC_UINT64:
+                       return __td_string_format(value, length, "%lu", metric->value._uint64);
+
+               case TD_METRIC_FLOAT:
+                       return __td_string_format(value, length, "%lf", metric->value._float);
+       }
+
+       // We should never get here
+       return -ENOTSUP;
+}
+
+int td_metrics_dump(td_metrics* self) {
+       const td_rrd_ds* sources = NULL;
+       char value[VALUE_MAX];
+       char time[32];
+       int r;
+
+       // Fetch all sources
+       sources = td_source_get_data_sources(self->source);
+       if (!sources)
+               return -ENOTSUP;
+
+       // Format the time
+       r = td_metrics_format_time(self, time);
+       if (r < 0)
+               return r;
+
+       DEBUG(self->ctx, "Metrics for %s (%s) at %s\n",
+               td_source_name(self->source), (self->object) ? self->object : "N/A", time);
+
+       // Fetch metrics
+       td_metric* metric = self->metrics;
+
+       // Dump all fields
+       for (const td_rrd_ds* ds = sources; ds->field; ds++, metric++) {
+               // Format the value
+               r = td_metrics_format_value(self, metric, value);
+               if (r < 0)
+                       return r;
+
+               // Log the value
+               DEBUG(self->ctx, "  %-20s : %s\n", ds->field, value);
+       }
+
+       return 0;
+}
+
+static int td_metrics_find_index(td_metrics* self, const char* field) {
+       const td_rrd_ds* sources = NULL;
+       int index = 0;
+
+       // Check input
+       if (!field)
+               return -EINVAL;
+
+       // Fetch all sources
+       sources = td_source_get_data_sources(self->source);
+       if (!sources)
+               return -ENOTSUP;
+
+       // Search for the right field
+       for (const td_rrd_ds* ds = sources; ds->field; ds++, index++) {
+               if (td_string_equals(ds->field, field))
+                       return index;
+       }
+
+       // Show error
+       ERROR(self->ctx, "Failed to find index of field '%s' in data sources for %s\n",
+               field, td_source_name(self->source));
+
+       // Not found
+       return -ENOENT;
+}
+
+static int td_metrics_push_value(td_metrics* self,
+               const char* field, td_metric_type type, void* value) {
+       td_metric* metric = NULL;
+       int index;
+
+       // Fetch the index of the field
+       index = td_metrics_find_index(self, field);
+       if (index < 0)
+               return index;
+
+       // Fetch the metric
+       metric = &self->metrics[index];
+
+       // Store the type
+       metric->type = type;
+
+       // Store the value
+       switch (type) {
+               case TD_METRIC_UNKNOWN:
+                       break;
+
+               case TD_METRIC_INT64:
+                       metric->value._int64 = *(int64_t*)value;
+                       break;
+
+               case TD_METRIC_UINT64:
+                       metric->value._uint64 = *(uint64_t*)value;
+                       break;
+
+               case TD_METRIC_FLOAT:
+                       metric->value._float = *(double*)value;
+                       break;
+       }
+
+       return 0;
+}
+
+int td_metrics_push_int64(td_metrics* self, const char* field, int64_t value) {
+       return td_metrics_push_value(self, field, TD_METRIC_INT64, &value);
+}
+
+int td_metrics_push_uint64(td_metrics* self, const char* field, uint64_t value) {
+       return td_metrics_push_value(self, field, TD_METRIC_UINT64, &value);
+}
+
+int td_metrics_push_float(td_metrics* self, const char* field, double value) {
+       return td_metrics_push_value(self, field, TD_METRIC_FLOAT, &value);
+}
+
+int __td_metrics_serialize(td_metrics* self, char* buffer, size_t length) {
+       td_metric* metric = NULL;
+       ssize_t bytes_written = 0;
+       ssize_t bytes_left = length;
+       char* p = buffer;
+
+       // Begin with the timestamp
+       bytes_written = snprintf(p, bytes_left, "%ld", self->t.tv_sec);
+       if (bytes_written < 0)
+               return -errno;
+
+       // Abort if we have run out of space
+       else if (bytes_written >= bytes_left)
+               return -ENOBUFS;
+
+       // Advance p
+       p += bytes_written;
+       bytes_left -= bytes_written;
+
+       // Append all metrics
+       for (unsigned int i = 0; i < self->num_metrics; i++) {
+               metric = &self->metrics[i];
+
+               // Write the value
+               switch (metric->type) {
+                       case TD_METRIC_UNKNOWN:
+                               bytes_written = snprintf(p, bytes_left, ":U");
+                               break;
+
+                       case TD_METRIC_INT64:
+                               bytes_written = snprintf(p, bytes_left, ":%ld", metric->value._int64);
+                               break;
+
+                       case TD_METRIC_UINT64:
+                               bytes_written = snprintf(p, bytes_left, ":%lu", metric->value._uint64);
+                               break;
+
+                       case TD_METRIC_FLOAT:
+                               bytes_written = snprintf(p, bytes_left, ":%lf", metric->value._float);
+                               break;
+               }
+
+               // Abort on error
+               if (bytes_written < 0)
+                       return -errno;
+
+               // Abort if we have run out of buffer space
+               else if (bytes_written >= bytes_left)
+                       return -ENOBUFS;
+
+               // Advance p
+               p += bytes_written;
+               bytes_left -= bytes_written;
+       }
+
+       return 0;
+}
diff --git a/src/daemon/metrics.h b/src/daemon/metrics.h
new file mode 100644 (file)
index 0000000..198d026
--- /dev/null
@@ -0,0 +1,50 @@
+/*#############################################################################
+#                                                                             #
+# telemetryd - The IPFire Telemetry Collection Service                        #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#ifndef TELEMETRY_METRICS_H
+#define TELEMETRY_METRICS_H
+
+typedef struct td_metrics td_metrics;
+
+#include "ctx.h"
+#include "source.h"
+
+int td_metrics_create(td_metrics** metrics,
+       td_ctx* ctx, td_source* source, const char* object);
+
+td_metrics* td_metrics_ref(td_metrics* self);
+td_metrics* td_metrics_unref(td_metrics* self);
+
+td_source* td_metrics_get_source(td_metrics* self);
+const char* td_metrics_get_object(td_metrics* self);
+const struct timeval* td_metrics_get_time(td_metrics* self);
+
+int td_metrics_dump(td_metrics* self);
+
+int td_metrics_push_int64(td_metrics* self, const char* field, int64_t value);
+int td_metrics_push_uint64(td_metrics* self, const char* field, uint64_t value);
+int td_metrics_push_float(td_metrics* self, const char* field, double value);
+
+#define td_metrics_serialize(metrics, buffer) \
+       __td_metrics_serialize(metrics, buffer, sizeof(buffer))
+
+int __td_metrics_serialize(td_metrics* self, char* buffer, size_t length);
+
+#endif /* TELEMETRY_METRICS_H */
index 0459bd80551e7a30e7a9ceec165fbf238f41f5e6..cfa1b6e3863e68b2592fcd6ce10508c81e48ee6b 100644 (file)
@@ -45,6 +45,10 @@ struct td_queue_object {
        // Samples
        char** samples;
        unsigned int num_samples;
+
+       // Metrics
+       td_metrics** metrics;
+       unsigned int num_metrics;
 };
 
 struct td_queue {
@@ -104,6 +108,12 @@ static int td_queue_exit(sd_event_source* source, void* data) {
 }
 
 static void td_queue_free_object(struct td_queue_object* o) {
+       if (o->metrics) {
+               for (unsigned int i = 0; i < o->num_metrics; i++)
+                       td_metrics_unref(o->metrics[i]);
+               free(o->metrics);
+       }
+
        if (o->samples) {
                for (unsigned int i = 0; i < o->num_samples; i++)
                        free(o->samples[i]);
@@ -205,6 +215,37 @@ td_queue* td_queue_unref(td_queue* self) {
        return NULL;
 }
 
+static struct td_queue_object* td_queue_make_object(
+               td_queue* self, td_source* source, const char* object) {
+       struct td_queue_object* o = NULL;
+
+       // Allocate some memory
+       o = calloc(1, sizeof(*o));
+       if (!o)
+               goto ERROR;
+
+       // Reference the source
+       o->source = td_source_ref(source);
+
+       // Store the object
+       if (object) {
+               o->object = strdup(object);
+               if (!o->object)
+                       goto ERROR;
+       }
+
+       // Append the object to the queue
+       STAILQ_INSERT_TAIL(&self->queue, o, nodes);
+
+       return o;
+
+ERROR:
+       if (o)
+               td_queue_free_object(o);
+
+       return NULL;
+}
+
 static struct td_queue_object* td_queue_find_object(
                td_queue* self, td_source* source, const char* object) {
        struct td_queue_object* o = NULL;
@@ -223,6 +264,33 @@ static struct td_queue_object* td_queue_find_object(
        return NULL;
 }
 
+static struct td_queue_object* td_queue_find_object_by_metrics(
+               td_queue* self, td_metrics* metrics) {
+       struct td_queue_object* o = NULL;
+       const char* object = NULL;
+       td_source* source = NULL;
+
+       // Fetch the source
+       source = td_metrics_get_source(metrics);
+       if (!source)
+               return NULL;
+
+       // Fetch the object
+       object = td_metrics_get_object(metrics);
+
+       // Find the object
+       o = td_queue_find_object(self, source, object);
+
+       // Allocate a new object if we could not find one
+       if (!o)
+               o = td_queue_make_object(self, source, object);
+
+       // Cleanup
+       td_source_unref(source);
+
+       return o;
+}
+
 static int td_queue_object_append_sample(td_queue* self, td_source* source,
                const char* object, struct td_queue_object* o, const char* sample) {
        struct timeval t = {};
@@ -254,37 +322,20 @@ static int td_queue_object_append_sample(td_queue* self, td_source* source,
        return 0;
 }
 
-static int td_queue_valid_object(td_queue* queue, const char* object) {
-       // Check for any invalid characters
-       for (const char* p = object; *p; p++) {
-               switch (*p) {
-                       // Whitespace is not allowed
-                       case ' ':
-                       case '\t':
-                       case '\n':
-                               break;
-
-                       // Slashes are not allowed
-                       case '/':
-                       case '\\':
-                               break;
-
-                       // Quotes are not allowed
-                       case '"':
-                       case '\'':
-                               break;
-
-                       // Colons are now allowed
-                       case ':':
-                               break;
-
-                       // The rest is allowed
-                       default:
-                               continue;
-               }
+static int td_queue_object_append_metrics(td_queue* self,
+               struct td_queue_object* o, td_metrics* m) {
+       td_metrics** metrics = NULL;
 
-               return -EINVAL;
-       }
+       // Increase the size of the array
+       metrics = reallocarray(o->metrics, o->num_metrics + 1, sizeof(*o->metrics));
+       if (!metrics)
+               return -errno;
+
+       // Assign the metrics
+       metrics[o->num_metrics++] = td_metrics_ref(m);
+
+       // Replace the array
+       o->metrics = metrics;
 
        return 0;
 }
@@ -301,16 +352,6 @@ int td_queue_submit(td_queue* self,
        if (!sample)
                return -EINVAL;
 
-       // Check if the object is valid
-       if (object) {
-               r = td_queue_valid_object(self, object);
-               if (r < 0) {
-                       ERROR(self->ctx, "%s has submitted an invalid object: %s\n",
-                               td_source_name(source), object);
-                       goto ERROR;
-               }
-       }
-
        // Check if we can append the sample
        o = td_queue_find_object(self, source, object);
        if (o)
@@ -350,14 +391,47 @@ ERROR:
        return r;
 }
 
+int td_queue_submit_metrics(td_queue* self, td_metrics* metrics) {
+       struct td_queue_object* o = NULL;
+       int r;
+
+       // Check inputs
+       if (!metrics)
+               return -EINVAL;
+
+       // Dump the metrics
+       r = td_metrics_dump(metrics);
+       if (r < 0)
+               return r;
+
+       // Check if we can append the metrics
+       o = td_queue_find_object_by_metrics(self, metrics);
+       if (!o)
+               return -errno;
+
+       // Append the metrics
+       return td_queue_object_append_metrics(self, o, metrics);
+}
+
 static int td_queue_flush_object(td_queue* self, struct td_queue_object* o) {
        int r;
 
        // Call the source to write its data
-       r = td_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
-       if (r < 0) {
-               ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
-                       td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+       if (o->num_samples) {
+               r = td_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+                               td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+               }
+       }
+
+       // Call the source to write its metrics
+       if (o->num_metrics) {
+               r = td_source_commit_metrics(o->source, o->object, o->num_metrics, o->metrics);
+               if (r < 0) {
+                       ERROR(self->ctx, "Failed to write metrics for %s(%s): %s\n",
+                               td_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+               }
        }
 
        // Remove the object from the queue
index 58274c0abf624f4c6eb587b70d27b713e7fae4f9..5123a64ee41c82871908de724baa787142937410 100644 (file)
@@ -25,6 +25,7 @@ typedef struct td_queue td_queue;
 
 #include "ctx.h"
 #include "daemon.h"
+#include "metrics.h"
 #include "source.h"
 
 int td_queue_create(td_queue** queue,
@@ -35,6 +36,7 @@ td_queue* td_queue_unref(td_queue* self);
 
 int td_queue_submit(td_queue* self,
        td_source* source, const char* object, const char* sample);
+int td_queue_submit_metrics(td_queue* self, td_metrics* metrics);
 
 int td_queue_flush(td_queue* self);
 
index aa940ce7e47d19d6e4aca51ccb277060a110a209..10af939e09fc2916b1eb81e5507f318bc0a92283 100644 (file)
@@ -466,6 +466,24 @@ struct udev* td_source_get_udev(td_source* self) {
        return td_daemon_get_udev(self->daemon);
 }
 
+unsigned int td_source_num_data_sources(td_source* self) {
+       unsigned int num_sources = 0;
+
+       // Count all data sources
+       for (const td_rrd_ds* ds = self->impl->rrd_dss; ds->field; ds++)
+               num_sources++;
+
+       return num_sources;
+}
+
+const td_rrd_ds* td_source_get_data_sources(td_source* self) {
+       return self->impl->rrd_dss;
+}
+
+int td_source_create_metrics(td_source* self, td_metrics** metrics, const char* object) {
+       return td_metrics_create(metrics, self->ctx, self, object);
+}
+
 int td_source_create_command(td_source* self, td_command** command) {
        return td_command_create(command, self->ctx, self->daemon);
 }
@@ -506,6 +524,10 @@ int td_source_submit(td_source* self,
        return td_daemon_submit(self->daemon, self, object, value);
 }
 
+int td_source_submit_metrics(td_source* self, td_metrics* metrics) {
+       return td_daemon_submit_metrics(self->daemon, metrics);
+}
+
 static int td_source_create_database(td_source* self, const char* path, const char* source) {
        td_args* args = NULL;
        char min[24];
@@ -924,6 +946,34 @@ int td_source_commit(td_source* self,
        return 0;
 }
 
+int td_source_commit_metrics(td_source* self, const char* object,
+               unsigned int num_metrics, td_metrics** metrics) {
+       char** samples = NULL;
+       char sample[4096];
+       int r;
+
+       for (unsigned int i = 0; i < num_metrics; i++) {
+               // Serialize the metrics
+               r = td_metrics_serialize(metrics[i], sample);
+               if (r < 0)
+                       goto ERROR;
+
+               // Append it to the array
+               r = td_strings_append(&samples, sample);
+               if (r < 0)
+                       goto ERROR;
+       }
+
+       // Commit the samples
+       r = td_source_commit(self, object, num_metrics, (const char**)samples);
+
+ERROR:
+       if (samples)
+               td_strings_free(samples);
+
+       return r;
+}
+
 static int td_source_render_add_DEF(td_source* self,
                td_args* args, const char* path, const td_rrd_ds* ds, const char* object) {
        int r;
index 146e18f3b1c6ac8d24231cdd7dbe7122638fbe68..d08691b888d2fd813312b4798bb72b10d7df157d 100644 (file)
@@ -29,6 +29,7 @@ typedef struct td_source td_source;
 #include "command.h"
 #include "ctx.h"
 #include "daemon.h"
+#include "metrics.h"
 
 #define MAX_DS 64
 #define MAX_RRA 8
@@ -83,13 +84,20 @@ const char* td_source_name(td_source* self);
 
 struct udev* td_source_get_udev(td_source* self);
 
+unsigned int td_source_num_data_sources(td_source* self);
+const td_rrd_ds* td_source_get_data_sources(td_source* self);
+
+int td_source_create_metrics(td_source* self, td_metrics** metrics, const char* object);
 int td_source_create_command(td_source* self, td_command** command);
 
 int td_source_submit(td_source* self, const char* object,
        const char* format, ...) __attribute__((format(printf, 3, 4)));
+int td_source_submit_metrics(td_source* self, td_metrics* metrics);
 
 int td_source_commit(td_source* self,
        const char* object, unsigned int num_samples, const char** samples);
+int td_source_commit_metrics(td_source* self, const char* object,
+       unsigned int num_metrics, td_metrics** metrics);
 
 int td_source_render(td_source* self,
        td_args* args, const char* object);
index 546b860f22bd6fec9bd32de2b43437ff53a62cd0..f37f2354d365d3d59efa53c47a8e1ddf7d72a980 100644 (file)
@@ -26,6 +26,7 @@
 #include "uptime.h"
 
 static int uptime_heartbeat(td_ctx* ctx, td_source* source) {
+       td_metrics* metrics = NULL;
        struct sysinfo info = {};
        int r;
 
@@ -34,8 +35,26 @@ static int uptime_heartbeat(td_ctx* ctx, td_source* source) {
        if (r < 0)
                return -errno;
 
-       // Submit the uptime
-       return td_source_submit(source, NULL, "%ld", info.uptime);
+       // Create metrics
+       r = td_source_create_metrics(source, &metrics, NULL);
+       if (r < 0)
+               goto ERROR;
+
+       // Push uptime
+       r = td_metrics_push_int64(metrics, "uptime", info.uptime);
+       if (r < 0)
+               goto ERROR;
+
+       // Submit the metrics
+       r = td_source_submit_metrics(source, metrics);
+       if (r < 0)
+               goto ERROR;
+
+ERROR:
+       if (metrics)
+               td_metrics_unref(metrics);
+
+       return r;
 }
 
 const td_source_impl uptime_source = {