]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_redis plugin: Add an intermediary set that maps resources to metrics.
authorFlorian Forster <octo@collectd.org>
Wed, 24 Jan 2024 16:29:53 +0000 (17:29 +0100)
committerFlorian Forster <octo@collectd.org>
Thu, 25 Jan 2024 19:53:29 +0000 (20:53 +0100)
src/collectd.conf.pod
src/utils/format_json/format_json.c
src/utils/format_json/format_json.h
src/write_redis.c
src/write_redis_test.c

index 68e00ad1797f4800b7cc40ea9c632f9e354f3d71..6f4866bc979a9e4f491de5ea84d2ba3e84b7fc7f 100644 (file)
@@ -11616,15 +11616,29 @@ Synopsis:
     </Node>
   </Plugin>
 
-Values are submitted to I<Sorted Sets>, using the metric name as the key, and
-the timestamp as the score. Retrieving a date range can then be done using the
-C<ZRANGEBYSCORE> I<Redis> command. Additionally, all the identifiers of these
-I<Sorted Sets> are kept in a I<Set> called C<collectd/values> (or
-C<${prefix}/values> if the B<Prefix> option was specified) and can be retrieved
-using the C<SMEMBERS> I<Redis> command. You can specify the database to use
-with the B<Database> parameter (default is C<0>). See
-L<http://redis.io/commands#sorted_set> and L<http://redis.io/commands#set> for
-details.
+Metric values are written to I<Sorted Sets>, using the JSON encoded metric
+identity prefixed by C<metric/> as the key, and the timestamp as the score.
+This allows retrieving a date range using the C<ZRANGEBYSCORE> I<Redis>
+command.
+
+The plugin writes one I<Set> per I<resource> (e.g. host). The set's key is the
+JSON encoded resource attributes, prefixed by C<resource/>. The values are the
+metric IDs as outlined above.
+
+Finally, the keys of all resource I<Sets> are written to a global I<Set>,
+called C<resources>. A list of resources can be retrieved using the C<SMEMBERS>
+I<Redis> command.
+
+Putting it all together: for a metric family with a single metric, the
+following Redit operations are performced:
+
+  ZADD metric/<metric_id> <timestamp> <value>
+  SADD resource/<resource_id> metric/<metric_id>
+  SADD resources resource/<resource_id>
+
+You can specify the database to use with the B<Database> parameter (default is
+C<0>). See L<http://redis.io/commands#sorted_set> and
+L<http://redis.io/commands#set> for details.
 
 The information shown in the synopsis above is the I<default configuration>
 which is used by the plugin if no configuration is present.
index b4a99343b9573ad05edc6b83cbc07ca628df63d4..47c66e06e5afd128e61b8d3fa7b0ddb6f78ed5f8 100644 (file)
@@ -454,6 +454,46 @@ static int format_metric_identity(yajl_gen g, metric_t const *m) {
   return 0;
 }
 
+int format_json_label_set(strbuf_t *buf, label_set_t labels) {
+  if (buf == NULL) {
+    return EINVAL;
+  }
+
+#if HAVE_YAJL_V2
+  yajl_gen g = yajl_gen_alloc(NULL);
+#else /* !HAVE_YAJL_V2 */
+  yajl_gen_config conf = {0};
+  yajl_gen g = yajl_gen_alloc(&conf, NULL);
+#endif
+  if (g == NULL) {
+    return -1;
+  }
+
+  if (format_label_set(g, labels) != 0) {
+    yajl_gen_clear(g);
+    yajl_gen_free(g);
+    return -1;
+  }
+
+  unsigned char const *out;
+#if HAVE_YAJL_V2
+  size_t out_len;
+#else
+  unsigned int out_len;
+#endif
+  /* copy to output buffer */
+  if (yajl_gen_get_buf(g, &out, &out_len) != yajl_gen_status_ok) {
+    yajl_gen_clear(g);
+    yajl_gen_free(g);
+    return -1;
+  }
+  strbuf_printn(buf, (char const *)out, (size_t)out_len);
+
+  yajl_gen_clear(g);
+  yajl_gen_free(g);
+  return 0;
+}
+
 int format_json_metric_identity(strbuf_t *buf, metric_t const *m) {
   if (buf == NULL || m == NULL) {
     return EINVAL;
index a09adb74a1ec4ac18e817fdd2238cebeb39ff1e9..9b2293532b7725f3bc325ba5d5568f796ca0f2c7 100644 (file)
@@ -52,6 +52,8 @@ int format_json_notification(char *buffer, size_t buffer_size,
 int format_json_open_telemetry(strbuf_t *buf,
                                resource_metrics_set_t const *set);
 
+int format_json_label_set(strbuf_t *buf, label_set_t labels);
+
 int format_json_metric_identity(strbuf_t *buf, metric_t const *m);
 
 #endif /* UTILS_FORMAT_JSON_H */
index 4142f0072e514c0dca93e43faa2d81a8656077c5..0f9ad9ee7efa80c0082eb9677fd59d8aee6c0da5 100644 (file)
@@ -159,12 +159,12 @@ static int apply_set_duration(wr_node_t *node, char const *id,
   return err;
 }
 
-static int register_metric(wr_node_t *node, char const *id) {
+static int add_resource_to_global_set(wr_node_t *node, char const *id) {
   strbuf_t key = STRBUF_CREATE;
   if (node->prefix != NULL) {
     strbuf_print(&key, node->prefix);
   }
-  strbuf_print(&key, "values");
+  strbuf_print(&key, "resources");
 
   char const *cmd[] = {"SADD", key.ptr, id};
   int err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
@@ -173,6 +173,12 @@ static int register_metric(wr_node_t *node, char const *id) {
   return err;
 }
 
+static int add_metric_to_resource(wr_node_t *node, char const *resource_id,
+                                  char const *metric_id) {
+  char const *cmd[] = {"SADD", resource_id, metric_id};
+  return node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
+}
+
 static int write_metric_value(wr_node_t *node, metric_t const *m,
                               char const *id) {
   strbuf_t value = STRBUF_CREATE;
@@ -192,11 +198,13 @@ static int write_metric_value(wr_node_t *node, metric_t const *m,
   return err;
 }
 
-static int write_metric(wr_node_t *node, metric_t const *m) {
+static int write_metric(wr_node_t *node, char const *resource_id,
+                        metric_t const *m) {
   strbuf_t id = STRBUF_CREATE;
   if (node->prefix != NULL) {
     strbuf_print(&id, node->prefix);
   }
+  strbuf_print(&id, "metric/");
   int err = format_json_metric_identity(&id, m);
   if (err != 0) {
     ERROR("write_redis plugin: Formatting metric identity failed: %s",
@@ -209,7 +217,7 @@ static int write_metric(wr_node_t *node, metric_t const *m) {
     goto cleanup;
   }
 
-  err = register_metric(node, id.ptr);
+  err = add_metric_to_resource(node, resource_id, id.ptr);
   if (err != 0) {
     goto cleanup;
   }
@@ -229,29 +237,44 @@ cleanup:
   return err;
 }
 
-static int wr_write(/* {{{ */
-                    metric_family_t const *fam, user_data_t *ud) {
+static int wr_write(metric_family_t const *fam, user_data_t *ud) {
   wr_node_t *node = ud->data;
 
+  strbuf_t resource_id = STRBUF_CREATE;
+  if (node->prefix != NULL) {
+    strbuf_print(&resource_id, node->prefix);
+  }
+  strbuf_print(&resource_id, "resource/");
+  int err = format_json_label_set(&resource_id, fam->resource);
+  if (err != 0) {
+    STRBUF_DESTROY(resource_id);
+    return err;
+  }
+
   pthread_mutex_lock(&node->lock);
 
-  int err = node->reconnect(node);
+  err = node->reconnect(node);
   if (err != 0) {
-    pthread_mutex_unlock(&node->lock);
-    return err;
+    goto cleanup;
   }
 
   for (size_t i = 0; i < fam->metric.num; i++) {
-    int err = write_metric(node, fam->metric.ptr + i);
+    int err = write_metric(node, resource_id.ptr, fam->metric.ptr + i);
     if (err != 0) {
-      pthread_mutex_unlock(&node->lock);
-      return err;
+      goto cleanup;
     }
   }
 
+  err = add_resource_to_global_set(node, resource_id.ptr);
+  if (err != 0) {
+    goto cleanup;
+  }
+
+cleanup:
   pthread_mutex_unlock(&node->lock);
-  return 0;
-} /* }}} int wr_write */
+  STRBUF_DESTROY(resource_id);
+  return err;
+}
 
 static void wr_config_free(void *ptr) /* {{{ */
 {
index 86682bada428820ed418d373cce208e7900ebaec..1a027a79a8b1b0be2a1a2016ef5cdd92242ee978 100644 (file)
@@ -39,7 +39,8 @@ static int fake_execute(wr_node_t *node, int argc, char const **argv) {
     strbuf_print(&cmd, argv[i]);
   }
 
-  got_commands = realloc(got_commands, sizeof(*got_commands) * (got_commands_num + 1));
+  got_commands =
+      realloc(got_commands, sizeof(*got_commands) * (got_commands_num + 1));
   got_commands[got_commands_num] = strdup(cmd.ptr);
   got_commands_num++;
 
@@ -80,15 +81,27 @@ DEF_TEST(usage_rate) {
                               {
                                   .ptr =
                                       (label_pair_t[]){
-                                          {"metric.type", "gauge"},
+                                          {"metric.name", "m1"},
                                       },
                                   .num = 1,
                               },
                           .value = (value_t){.gauge = 42},
                           .time = TIME_T_TO_CDTIME_T(100),
                       },
+                      {
+                          .label =
+                              {
+                                  .ptr =
+                                      (label_pair_t[]){
+                                          {"metric.name", "m2"},
+                                      },
+                                  .num = 1,
+                              },
+                          .value = (value_t){.gauge = 23},
+                          .time = DOUBLE_TO_CDTIME_T(100.123456780),
+                      },
                   },
-              .num = 1,
+              .num = 2,
           },
   };
 
@@ -112,12 +125,19 @@ DEF_TEST(usage_rate) {
 
   CHECK_ZERO(wr_write(&fam, &ud));
 
-#define METRIC_ID                                                              \
-  "{\"name\":\"unit.test\",\"resource\":{\"test\":\"usage_rate\"},\"labels\":" \
-  "{\"metric.type\":\"gauge\"}}"
+#define RESOURCE_ID "{\"test\":\"usage_rate\"}"
+#define METRIC_ONE_ID                                                          \
+  "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
+  "{\"metric.name\":\"m1\"}}"
+#define METRIC_TWO_ID                                                          \
+  "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
+  "{\"metric.name\":\"m2\"}}"
   char *want_commands[] = {
-      "ZADD " METRIC_ID " 100.000000000 100.000:42",
-      "SADD values " METRIC_ID,
+      "ZADD metric/" METRIC_ONE_ID " 100.000000000 100.000:42",
+      "SADD resource/" RESOURCE_ID " metric/" METRIC_ONE_ID,
+      "ZADD metric/" METRIC_TWO_ID " 100.123456780 100.123:23",
+      "SADD resource/" RESOURCE_ID " metric/" METRIC_TWO_ID,
+      "SADD resources resource/" RESOURCE_ID,
   };
   size_t want_commands_num = STATIC_ARRAY_SIZE(want_commands);