]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_redis plugin: Migrate to RedisTimeSeries.
authorFlorian Forster <octo@collectd.org>
Fri, 26 Jan 2024 12:01:05 +0000 (13:01 +0100)
committerFlorian Forster <octo@collectd.org>
Fri, 26 Jan 2024 12:12:28 +0000 (13:12 +0100)
src/collectd.conf.in
src/collectd.conf.pod
src/write_redis.c
src/write_redis_test.c

index 18dd82ef6101128112905392ed1a4bba985d76b2..e59c7b3177664ddb86f2391598c5f54337125351 100644 (file)
 #<Plugin write_redis>
 #      <Node "example">
 #              Host "localhost"
-#              Port "6379"
-#              Timeout 1000
-#              Prefix "collectd/"
+#              Port 6379
+#              Database 0
+#              Timeout 0
+#              Retention 0 # infinite
+#              StoreRates true
+#              Prefix ""
 #      </Node>
 #</Plugin>
 
index b4afdc6cf8bd4f243bd3483da505f47d0810e9ca..c9dd932e80bf09949e14764d28c46f9635a8e0d4 100644 (file)
@@ -11600,26 +11600,27 @@ want to set B<metadata.broker.list> to your Kafka broker list.
 =head2 Plugin C<write_redis>
 
 The I<write_redis plugin> submits values to I<Redis>, a data structure server.
+It uses the I<Time series> type provided by the I<RedisTimeSeries> module.
 
 Synopsis:
 
   <Plugin "write_redis">
     <Node "example">
         Host "localhost"
-        Port "6379"
-        Timeout 1000
-        Prefix "collectd/"
-        Database 1
-        MaxSetSize -1
-        MaxSetDuration -1
+        Port 6379
+        Database 0
+        Timeout 0
+        Retention 0
         StoreRates true
+        Prefix ""
     </Node>
   </Plugin>
 
-Metric values are written to I<Sorted Sets>, using the JSON encoded metric
-identity prefixed by C<metric/> as the key, and the timestamp as the score.
-This allows retrieving a date range using the C<ZRANGEBYSCORE> I<Redis>
-command.
+Metric values are written to I<Time series> keys, using the JSON encoded metric
+identity prefixed by C<metric/> as the key. When a metric is encountered for
+the first time, collectd tries to create a new time series using C<TS.CREATE>.
+This fails if the time series already exists, therefore C<TS.CREATE> errors are
+logged but ignored.
 
 The plugin writes one I<Set> per I<resource> (e.g. host). The set's key is the
 JSON encoded resource attributes, prefixed by C<resource/>. The values are the
@@ -11635,15 +11636,19 @@ Putting it all together: for a metric family with a single metric, the
 following Redit operations are performced when the metric is encountered for
 the first time:
 
-  ZADD metric/<metric_id> <timestamp> <value>
+  TS.CREATE metric/<metric_id> RETENTION <Retention> LABELS <name0> <value0> ...
+  TS.ADD metric/<metric_id> <timestamp> <value>
   SADD resource/<resource_id> metric/<metric_id>
   SADD resources resource/<resource_id>
 
-On subsequent writes, only the C<ZADD> command is performed.
+On subsequent writes, only the C<TS.ADD> command is performed.
+
+More information about the Redis data types used here is available at
+L<https://redis.io/docs/data-types/timeseries/> and
+L<http://redis.io/commands#set>.
 
 You can specify the database to use with the B<Database> parameter (default is
-C<0>). See L<http://redis.io/commands#sorted_set> and
-L<http://redis.io/commands#set> for details.
+C<0>).
 
 The information shown in the synopsis above is the I<default configuration>
 which is used by the plugin if no configuration is present.
@@ -11669,40 +11674,34 @@ The name or IP-address of the host running the I<Redis> instance.
 The B<Port> option is the TCP port on which the Redis instance accepts
 connections. Either a service name of a port number may be given.
 
-=item B<Timeout> I<Seconds>
-
-The B<Timeout> option sets the socket connection timeout in seconds. Use
-fractional numbers to specify timeouts shorter than one second, e.g. C<0.5> for
-500 ms.
-
-=item B<Prefix> I<Prefix>
-
-Prefix used when constructing the name of the I<Sorted Sets> and the I<Set>
-containing all metrics. Defaults to no prefix. If a prefix is configured,
-metric names are prefixed with the configured string verbatim, without an
-additional separation character.
-
 =item B<Database> I<Index>
 
 This index selects the redis database to use for writing operations. Defaults
 to C<0>.
 
-=item B<MaxSetSize> I<Items>
+=item B<Timeout> I<Seconds>
 
-The B<MaxSetSize> option limits the number of items that the I<Sorted Sets> can
-hold. Set to zero to disable (i.e. no size limit), which is the default
-behavior.
+The B<Timeout> option sets the socket connection timeout in seconds. Use
+fractional numbers to specify timeouts shorter than one second, e.g. C<0.5> for
+500 ms.
 
-=item B<MaxSetDuration> I<Seconds>
+=item B<Retention> I<Seconds>
 
-The B<MaxSetDuration> option limits the maximum age of items in the I<Sorted
-Sets>. After reaching this age, metrics will get purged. Set to zero to disable
-(i.e. no age lmit), which is the default behavior.
+The B<Retention> option sets the retention period of new Time series keys. After
+reaching this age, data points will get purged. Set to zero to disable (i.e. no
+age lmit), which is the default behavior.
 
 =item B<StoreRates> B<true>|B<false>
 
 If set to B<true> (the default), convert counter values to rates. If set to
-B<false> counter values are stored as is, i.e. as an increasing integer number.
+B<false>, counter values are stored as is, i.e. as an increasing integer
+number.
+
+=item B<Prefix> I<Prefix>
+
+Prefix used when keys. Defaults to no prefix. If a prefix is configured, metric
+names are prefixed with the configured string verbatim, without an additional
+separation character.
 
 =back
 
index 6c511f5d521413210b00bbad20fd579845af324e..1b6606e7851d8fd05ce00f72f201805e99f3982f 100644 (file)
@@ -34,8 +34,8 @@
 #include <hiredis/hiredis.h>
 #include <sys/time.h>
 
-#ifndef REDIS_DEFAULT_PREFIX
-#define REDIS_DEFAULT_PREFIX "collectd/"
+#ifndef REDIS_DEFAULT_PORT
+#define REDIS_DEFAULT_PORT 6379
 #endif
 
 struct wr_node_s;
@@ -48,8 +48,7 @@ struct wr_node_s {
   cdtime_t timeout;
   char *prefix;
   int database;
-  int max_set_size;
-  cdtime_t max_set_duration;
+  cdtime_t retention;
   bool store_rates;
 
   int (*reconnect)(wr_node_t *node);
@@ -83,13 +82,14 @@ static int reconnect(wr_node_t *node) {
     ERROR("write_redis plugin: Connecting to host \"%s\" (port %i) failed: "
           "Unknown reason",
           (node->host != NULL) ? node->host : "localhost",
-          (node->port != 0) ? node->port : 6379);
+          (node->port != 0) ? node->port : REDIS_DEFAULT_PORT);
     return ENOTCONN;
   }
   if (node->conn->err) {
     ERROR("write_redis plugin: Connecting to host \"%s\" (port %i) failed: %s",
           (node->host != NULL) ? node->host : "localhost",
-          (node->port != 0) ? node->port : 6379, node->conn->errstr);
+          (node->port != 0) ? node->port : REDIS_DEFAULT_PORT,
+          node->conn->errstr);
     disconnect(node);
     return ENOTCONN;
   }
@@ -106,58 +106,34 @@ static int reconnect(wr_node_t *node) {
   return 0;
 }
 
-static int execute(wr_node_t *node, int argc, char const **argv) {
-  redisReply *rr = redisCommandArgv(node->conn, argc, argv, NULL);
-  if (rr == NULL) {
-    strbuf_t cmd = STRBUF_CREATE;
-    for (int i = 0; i < argc; i++) {
-      if (i != 0) {
-        strbuf_print(&cmd, " ");
-      }
-      strbuf_print(&cmd, argv[i]);
+static void print_execute_error(int argc, char const **argv, char const *msg) {
+  strbuf_t cmd = STRBUF_CREATE;
+  for (int i = 0; i < argc; i++) {
+    if (i != 0) {
+      strbuf_print(&cmd, " ");
     }
-
-    ERROR("write_redis plugin: Command \"%s\" failed: %s", cmd.ptr,
-          node->conn->errstr);
-    STRBUF_DESTROY(cmd);
-    return (node->conn->err != 0) ? node->conn->err : -1;
+    strbuf_print(&cmd, argv[i]);
   }
 
-  freeReplyObject(rr);
-  return 0;
-}
-
-static int apply_set_size(wr_node_t *node, char const *id) {
-  if (node->max_set_size <= 0) {
-    return 0;
-  }
+  ERROR("write_redis plugin: Command \"%s\" failed: %s", cmd.ptr, msg);
 
-  strbuf_t max_rank = STRBUF_CREATE;
-  strbuf_printf(&max_rank, "%d", -1 * node->max_set_size - 1);
-
-  char const *cmd[] = {"ZREMRANGEBYRANK", id, "0", max_rank.ptr};
-  int err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
-
-  STRBUF_DESTROY(max_rank);
-  return err;
+  STRBUF_DESTROY(cmd);
 }
 
-static int apply_set_duration(wr_node_t *node, char const *id,
-                              cdtime_t last_update) {
-  if (node->max_set_duration == 0 || last_update < node->max_set_duration) {
-    return 0;
+static int execute(wr_node_t *node, int argc, char const **argv) {
+  redisReply *rr = redisCommandArgv(node->conn, argc, argv, NULL);
+  if (rr == NULL) {
+    print_execute_error(argc, argv, node->conn->errstr);
+    return (node->conn->err != 0) ? node->conn->err : -1;
+  }
+  if (rr->type == REDIS_REPLY_ERROR) {
+    print_execute_error(argc, argv, rr->str);
+    freeReplyObject(rr);
+    return -1;
   }
 
-  strbuf_t min_time = STRBUF_CREATE;
-  // '(' indicates 'less than' in redis CLI.
-  strbuf_printf(&min_time, "(%.9f",
-                CDTIME_T_TO_DOUBLE(last_update - node->max_set_duration));
-
-  char const *cmd[] = {"ZREMRANGEBYSCORE", id, "-inf", min_time.ptr};
-  int err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
-
-  STRBUF_DESTROY(min_time);
-  return err;
+  freeReplyObject(rr);
+  return 0;
 }
 
 static int add_resource_to_global_set(wr_node_t *node, char const *id) {
@@ -180,22 +156,73 @@ static int add_metric_to_resource(wr_node_t *node, char const *resource_id,
   return node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
 }
 
-static int write_metric_value(wr_node_t *node, metric_t const *m,
-                              char const *id) {
+static int ts_add(wr_node_t *node, metric_t const *m, char const *id) {
+  strbuf_t m_time = STRBUF_CREATE;
+  strbuf_printf(&m_time, "%" PRIu64, CDTIME_T_TO_MS(m->time));
+
   strbuf_t value = STRBUF_CREATE;
   int err = format_values(&value, m, node->store_rates);
   if (err != 0) {
-    return err;
+    ERROR("write_redis plugin: format_values failed: %s", STRERROR(err));
+    goto cleanup;
   }
 
-  strbuf_t m_time = STRBUF_CREATE;
-  strbuf_printf(&m_time, "%.9f", CDTIME_T_TO_DOUBLE(m->time));
+  // format_values returns a string with "<timestamp>:<value>"; create a new
+  // pointer that points to "<value>" directly.
+  char const *value_ptr = strchr(value.ptr, ':');
+  if (value_ptr == NULL) {
+    ERROR("write_redis plugin: format_values returned \"%s\", want string "
+          "containing ':'",
+          value.ptr);
+    goto cleanup;
+  }
+  value_ptr++;
 
-  char const *cmd[] = {"ZADD", id, m_time.ptr, value.ptr};
+  // RedisTimeSeries doesn't handle NANs.
+  if (strcmp("nan", value_ptr) == 0) {
+    goto cleanup;
+  }
+
+  char const *cmd[] = {"TS.ADD", id, m_time.ptr, value_ptr};
   err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
 
-  STRBUF_DESTROY(m_time);
+cleanup:
   STRBUF_DESTROY(value);
+  STRBUF_DESTROY(m_time);
+  return err;
+}
+
+static int ts_create(wr_node_t *node, metric_t const *m,
+                     char const *metric_id) {
+  strbuf_t retention = STRBUF_CREATE;
+  strbuf_printf(&retention, "%" PRIu64, CDTIME_T_TO_MS(node->retention));
+
+  size_t cmd_cap = 11 + 2 * m->label.num;
+  char const *cmd[cmd_cap];
+  memset(cmd, 0, sizeof(cmd));
+  cmd[0] = "TS.CREATE";
+  cmd[1] = metric_id;
+  cmd[2] = "RETENTION";
+  cmd[3] = retention.ptr;
+  cmd[4] = "ENCODING";
+  cmd[5] = "COMPRESSED";
+  cmd[6] = "DUPLICATE_POLICY";
+  cmd[7] = "FIRST";
+  cmd[8] = "LABELS";
+  cmd[9] = "metric.name";
+  cmd[10] = m->family->name;
+
+  size_t cmd_len = 11;
+  for (size_t i = 0; i < m->label.num; i++) {
+    assert(cmd_len + 2 <= cmd_cap);
+    cmd[cmd_len] = m->label.ptr[i].name;
+    cmd[cmd_len + 1] = m->label.ptr[i].value;
+    cmd_len += 2;
+  }
+
+  int err = node->execute(node, (int)cmd_len, cmd);
+
+  STRBUF_DESTROY(retention);
   return err;
 }
 
@@ -213,7 +240,13 @@ static int write_metric(wr_node_t *node, char const *resource_id,
     return err;
   }
 
-  err = write_metric_value(node, m, id.ptr);
+  if (is_new) {
+    // An error is returned even if the existing and new keys are equal
+    // => ignore
+    (void)ts_create(node, m, id.ptr);
+  }
+
+  err = ts_add(node, m, id.ptr);
   if (err != 0) {
     goto cleanup;
   }
@@ -225,16 +258,6 @@ static int write_metric(wr_node_t *node, char const *resource_id,
     }
   }
 
-  err = apply_set_size(node, id.ptr);
-  if (err != 0) {
-    goto cleanup;
-  }
-
-  err = apply_set_duration(node, id.ptr, m->time);
-  if (err != 0) {
-    goto cleanup;
-  }
-
 cleanup:
   STRBUF_DESTROY(id);
   return err;
@@ -286,7 +309,10 @@ static int wr_write(metric_family_t const *fam, user_data_t *ud) {
     int err =
         write_metric(node, resource_id.ptr, fam->metric.ptr + i, is_new[i]);
     if (err != 0) {
-      goto cleanup;
+      WARNING("write_redis plugin: write_metric failed with %s, aborting at "
+              "index %zu",
+              STRERROR(err), i);
+      continue;
     }
   }
 
@@ -325,6 +351,7 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
 
   *node = (wr_node_t){
       .store_rates = true,
+      .port = REDIS_DEFAULT_PORT,
 
       .reconnect = reconnect,
       .disconnect = disconnect,
@@ -355,10 +382,8 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_string(child, &node->prefix);
     } else if (strcasecmp("Database", child->key) == 0) {
       status = cf_util_get_int(child, &node->database);
-    } else if (strcasecmp("MaxSetSize", child->key) == 0) {
-      status = cf_util_get_int(child, &node->max_set_size);
-    } else if (strcasecmp("MaxSetDuration", child->key) == 0) {
-      status = cf_util_get_cdtime(child, &node->max_set_duration);
+    } else if (strcasecmp("Retention", child->key) == 0) {
+      status = cf_util_get_cdtime(child, &node->retention);
     } else if (strcasecmp("StoreRates", child->key) == 0) {
       status = cf_util_get_boolean(child, &node->store_rates);
     } else
index 85f8f916d8327e7c40585704f7c1254f66d20c9c..8b4abf26d9ab69648e2a5b756520465324751776 100644 (file)
@@ -83,7 +83,7 @@ DEF_TEST(wr_write) {
                               {
                                   .ptr =
                                       (label_pair_t[]){
-                                          {"metric.name", "m1"},
+                                          {"lbl", "v1"},
                                       },
                                   .num = 1,
                               },
@@ -95,7 +95,7 @@ DEF_TEST(wr_write) {
                               {
                                   .ptr =
                                       (label_pair_t[]){
-                                          {"metric.name", "m2"},
+                                          {"lbl", "v2"},
                                       },
                                   .num = 1,
                               },
@@ -115,6 +115,7 @@ DEF_TEST(wr_write) {
 
   wr_node_t node = {
       .store_rates = false,
+      .retention = TIME_T_TO_CDTIME_T(86400),
 
       .reconnect = fake_reconnect,
       .disconnect = fake_disconnect,
@@ -132,16 +133,20 @@ DEF_TEST(wr_write) {
 #define RESOURCE_ID "{\"test\":\"wr_write\"}"
 #define METRIC_ONE_ID                                                          \
   "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
-  "{\"metric.name\":\"m1\"}}"
+  "{\"lbl\":\"v1\"}}"
 #define METRIC_TWO_ID                                                          \
   "{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":"           \
-  "{\"metric.name\":\"m2\"}}"
+  "{\"lbl\":\"v2\"}}"
   char *want_commands_new[] = {
-      "ZADD metric/" METRIC_ONE_ID " 100.000000000 100.000:42",
+      // clang-format off
+      "TS.CREATE metric/" METRIC_ONE_ID " RETENTION 86400000 ENCODING COMPRESSED DUPLICATE_POLICY FIRST LABELS metric.name unit.test lbl v1",
+      "TS.ADD metric/" METRIC_ONE_ID " 100000 42",
       "SADD resource/" RESOURCE_ID " metric/" METRIC_ONE_ID,
-      "ZADD metric/" METRIC_TWO_ID " 100.123456780 100.123:23",
+      "TS.CREATE metric/" METRIC_TWO_ID " RETENTION 86400000 ENCODING COMPRESSED DUPLICATE_POLICY FIRST LABELS metric.name unit.test lbl v2",
+      "TS.ADD metric/" METRIC_TWO_ID " 100123 23",
       "SADD resource/" RESOURCE_ID " metric/" METRIC_TWO_ID,
       "SADD resources resource/" RESOURCE_ID,
+      // clang-format on
   };
   size_t want_commands_new_num = STATIC_ARRAY_SIZE(want_commands_new);
 
@@ -165,8 +170,8 @@ DEF_TEST(wr_write) {
 
   // for known metrics we expect only the ZADD commands
   char *want_commands_known[] = {
-      "ZADD metric/" METRIC_ONE_ID " 110.000000000 110.000:42",
-      "ZADD metric/" METRIC_TWO_ID " 110.123456780 110.123:23",
+      "TS.ADD metric/" METRIC_ONE_ID " 110000 42",
+      "TS.ADD metric/" METRIC_TWO_ID " 110123 23",
   };
   size_t want_commands_known_num = STATIC_ARRAY_SIZE(want_commands_known);