]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_redis plugin: Register metrics and resources only when necessary.
authorFlorian Forster <octo@collectd.org>
Thu, 25 Jan 2024 14:28:55 +0000 (15:28 +0100)
committerFlorian Forster <octo@collectd.org>
Thu, 25 Jan 2024 19:53:29 +0000 (20:53 +0100)
src/collectd.conf.pod
src/write_redis.c
src/write_redis_test.c

index 6f4866bc979a9e4f491de5ea84d2ba3e84b7fc7f..b4afdc6cf8bd4f243bd3483da505f47d0810e9ca 100644 (file)
@@ -11623,19 +11623,24 @@ command.
 
 The plugin writes one I<Set> per I<resource> (e.g. host). The set's key is the
 JSON encoded resource attributes, prefixed by C<resource/>. The values are the
-metric IDs as outlined above.
+metric IDs as outlined above. The write is only performed when the metric is
+"new", i.e. it is observed for the first time.
 
 Finally, the keys of all resource I<Sets> are written to a global I<Set>,
 called C<resources>. A list of resources can be retrieved using the C<SMEMBERS>
-I<Redis> command.
+I<Redis> command. The write is only performed if all metrics in the metric
+family are "new", see above.
 
 Putting it all together: for a metric family with a single metric, the
-following Redit operations are performced:
+following Redit operations are performced when the metric is encountered for
+the first time:
 
   ZADD metric/<metric_id> <timestamp> <value>
   SADD resource/<resource_id> metric/<metric_id>
   SADD resources resource/<resource_id>
 
+On subsequent writes, only the C<ZADD> command is performed.
+
 You can specify the database to use with the B<Database> parameter (default is
 C<0>). See L<http://redis.io/commands#sorted_set> and
 L<http://redis.io/commands#set> for details.
index 0f9ad9ee7efa80c0082eb9677fd59d8aee6c0da5..6c511f5d521413210b00bbad20fd579845af324e 100644 (file)
@@ -26,7 +26,8 @@
 
 #include "collectd.h"
 
-#include "plugin.h"
+#include "daemon/plugin.h"
+#include "daemon/utils_cache.h"
 #include "utils/common/common.h"
 #include "utils/format_json/format_json.h"
 
@@ -199,7 +200,7 @@ static int write_metric_value(wr_node_t *node, metric_t const *m,
 }
 
 static int write_metric(wr_node_t *node, char const *resource_id,
-                        metric_t const *m) {
+                        metric_t const *m, bool is_new) {
   strbuf_t id = STRBUF_CREATE;
   if (node->prefix != NULL) {
     strbuf_print(&id, node->prefix);
@@ -217,9 +218,11 @@ static int write_metric(wr_node_t *node, char const *resource_id,
     goto cleanup;
   }
 
-  err = add_metric_to_resource(node, resource_id, id.ptr);
-  if (err != 0) {
-    goto cleanup;
+  if (is_new) {
+    err = add_metric_to_resource(node, resource_id, id.ptr);
+    if (err != 0) {
+      goto cleanup;
+    }
   }
 
   err = apply_set_size(node, id.ptr);
@@ -237,6 +240,17 @@ cleanup:
   return err;
 }
 
+static bool metric_is_new(metric_t const *m) {
+  cdtime_t first_time = 0;
+  int err = uc_get_first_time(m, &first_time);
+  if (err != 0) {
+    ERROR("write_redis plugin: uc_get_first_time failed: %s", STRERROR(err));
+    return true;
+  }
+
+  return m->time == first_time;
+}
+
 static int wr_write(metric_family_t const *fam, user_data_t *ud) {
   wr_node_t *node = ud->data;
 
@@ -251,6 +265,16 @@ static int wr_write(metric_family_t const *fam, user_data_t *ud) {
     return err;
   }
 
+  // determine whether metrics are new before grabbing the log.
+  bool all_new = true;
+  bool is_new[fam->metric.num];
+  memset(is_new, 0, sizeof(is_new));
+
+  for (size_t i = 0; i < fam->metric.num; i++) {
+    is_new[i] = metric_is_new(fam->metric.ptr + i);
+    all_new = all_new && is_new[i];
+  }
+
   pthread_mutex_lock(&node->lock);
 
   err = node->reconnect(node);
@@ -259,15 +283,18 @@ static int wr_write(metric_family_t const *fam, user_data_t *ud) {
   }
 
   for (size_t i = 0; i < fam->metric.num; i++) {
-    int err = write_metric(node, resource_id.ptr, fam->metric.ptr + i);
+    int err =
+        write_metric(node, resource_id.ptr, fam->metric.ptr + i, is_new[i]);
     if (err != 0) {
       goto cleanup;
     }
   }
 
-  err = add_resource_to_global_set(node, resource_id.ptr);
-  if (err != 0) {
-    goto cleanup;
+  if (all_new) {
+    err = add_resource_to_global_set(node, resource_id.ptr);
+    if (err != 0) {
+      goto cleanup;
+    }
   }
 
 cleanup:
index 1a027a79a8b1b0be2a1a2016ef5cdd92242ee978..85f8f916d8327e7c40585704f7c1254f66d20c9c 100644 (file)
@@ -60,7 +60,9 @@ static void fake_disconnect(wr_node_t *node) {
 
 static int fake_reconnect(wr_node_t *node) { return 0; }
 
-DEF_TEST(usage_rate) {
+DEF_TEST(wr_write) {
+  uc_init();
+
   metric_family_t fam = {
       .name = "unit.test",
       .type = METRIC_TYPE_GAUGE,
@@ -68,7 +70,7 @@ DEF_TEST(usage_rate) {
           {
               .ptr =
                   (label_pair_t[]){
-                      {"test", "usage_rate"},
+                      {"test", "wr_write"},
                   },
               .num = 1,
           },
@@ -109,6 +111,8 @@ DEF_TEST(usage_rate) {
     fam.metric.ptr[i].family = &fam;
   }
 
+  CHECK_ZERO(uc_update(&fam));
+
   wr_node_t node = {
       .store_rates = false,
 
@@ -125,32 +129,60 @@ DEF_TEST(usage_rate) {
 
   CHECK_ZERO(wr_write(&fam, &ud));
 
-#define RESOURCE_ID "{\"test\":\"usage_rate\"}"
+#define RESOURCE_ID "{\"test\":\"wr_write\"}"
 #define METRIC_ONE_ID                                                          \
   "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
   "{\"metric.name\":\"m1\"}}"
 #define METRIC_TWO_ID                                                          \
   "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
   "{\"metric.name\":\"m2\"}}"
-  char *want_commands[] = {
+  char *want_commands_new[] = {
       "ZADD metric/" METRIC_ONE_ID " 100.000000000 100.000:42",
       "SADD resource/" RESOURCE_ID " metric/" METRIC_ONE_ID,
       "ZADD metric/" METRIC_TWO_ID " 100.123456780 100.123:23",
       "SADD resource/" RESOURCE_ID " metric/" METRIC_TWO_ID,
       "SADD resources resource/" RESOURCE_ID,
   };
-  size_t want_commands_num = STATIC_ARRAY_SIZE(want_commands);
+  size_t want_commands_new_num = STATIC_ARRAY_SIZE(want_commands_new);
+
+  for (size_t i = 0; i < want_commands_new_num && i < got_commands_num; i++) {
+    EXPECT_EQ_STR(want_commands_new[i], got_commands[i]);
+  }
+  EXPECT_EQ_INT(want_commands_new_num, got_commands_num);
+
+  // clear the global got_commands array
+  node.disconnect(&node);
+
+  // advance time
+  cdtime_t interval = TIME_T_TO_CDTIME_T(10);
+  for (size_t i = 0; i < fam.metric.num; i++) {
+    fam.metric.ptr[i].time += interval;
+  }
+
+  CHECK_ZERO(uc_update(&fam));
+
+  CHECK_ZERO(wr_write(&fam, &ud));
+
+  // for known metrics we expect only the ZADD commands
+  char *want_commands_known[] = {
+      "ZADD metric/" METRIC_ONE_ID " 110.000000000 110.000:42",
+      "ZADD metric/" METRIC_TWO_ID " 110.123456780 110.123:23",
+  };
+  size_t want_commands_known_num = STATIC_ARRAY_SIZE(want_commands_known);
 
-  for (size_t i = 0; i < want_commands_num && i < got_commands_num; i++) {
-    EXPECT_EQ_STR(want_commands[i], got_commands[i]);
+  for (size_t i = 0; i < want_commands_known_num && i < got_commands_num; i++) {
+    EXPECT_EQ_STR(want_commands_known[i], got_commands[i]);
   }
-  EXPECT_EQ_INT(want_commands_num, got_commands_num);
+  EXPECT_EQ_INT(want_commands_known_num, got_commands_num);
+
+  // clear the global got_commands array
+  node.disconnect(&node);
 
   return 0;
 }
 
 int main(void) {
-  RUN_TEST(usage_rate);
+  RUN_TEST(wr_write);
 
   END_TEST;
 }