=head2 Plugin C<write_redis>
The I<write_redis plugin> submits values to I<Redis>, a data structure server.
+It uses the I<Time series> type provided by the I<RedisTimeSeries> module.
Synopsis:
<Plugin "write_redis">
<Node "example">
Host "localhost"
- Port "6379"
- Timeout 1000
- Prefix "collectd/"
- Database 1
- MaxSetSize -1
- MaxSetDuration -1
+ Port 6379
+ Database 0
+ Timeout 0
+ Retention 0
StoreRates true
+ Prefix ""
</Node>
</Plugin>
-Metric values are written to I<Sorted Sets>, using the JSON encoded metric
-identity prefixed by C<metric/> as the key, and the timestamp as the score.
-This allows retrieving a date range using the C<ZRANGEBYSCORE> I<Redis>
-command.
+Metric values are written to I<Time series> keys, using the JSON encoded metric
+identity prefixed by C<metric/> as the key. When a metric is encountered for
+the first time, collectd tries to create a new time series using C<TS.CREATE>.
+This fails if the time series already exists, therefore C<TS.CREATE> errors are
+logged but ignored.
The plugin writes one I<Set> per I<resource> (e.g. host). The set's key is the
JSON encoded resource attributes, prefixed by C<resource/>. The values are the
following Redit operations are performced when the metric is encountered for
the first time:
- ZADD metric/<metric_id> <timestamp> <value>
+ TS.CREATE metric/<metric_id> RETENTION <Retention> LABELS <name0> <value0> ...
+ TS.ADD metric/<metric_id> <timestamp> <value>
SADD resource/<resource_id> metric/<metric_id>
SADD resources resource/<resource_id>
-On subsequent writes, only the C<ZADD> command is performed.
+On subsequent writes, only the C<TS.ADD> command is performed.
+
+More information about the Redis data types used here is available at
+L<https://redis.io/docs/data-types/timeseries/> and
+L<http://redis.io/commands#set>.
You can specify the database to use with the B<Database> parameter (default is
-C<0>). See L<http://redis.io/commands#sorted_set> and
-L<http://redis.io/commands#set> for details.
+C<0>).
The information shown in the synopsis above is the I<default configuration>
which is used by the plugin if no configuration is present.
The B<Port> option is the TCP port on which the Redis instance accepts
connections. Either a service name of a port number may be given.
-=item B<Timeout> I<Seconds>
-
-The B<Timeout> option sets the socket connection timeout in seconds. Use
-fractional numbers to specify timeouts shorter than one second, e.g. C<0.5> for
-500 ms.
-
-=item B<Prefix> I<Prefix>
-
-Prefix used when constructing the name of the I<Sorted Sets> and the I<Set>
-containing all metrics. Defaults to no prefix. If a prefix is configured,
-metric names are prefixed with the configured string verbatim, without an
-additional separation character.
-
=item B<Database> I<Index>
This index selects the redis database to use for writing operations. Defaults
to C<0>.
-=item B<MaxSetSize> I<Items>
+=item B<Timeout> I<Seconds>
-The B<MaxSetSize> option limits the number of items that the I<Sorted Sets> can
-hold. Set to zero to disable (i.e. no size limit), which is the default
-behavior.
+The B<Timeout> option sets the socket connection timeout in seconds. Use
+fractional numbers to specify timeouts shorter than one second, e.g. C<0.5> for
+500 ms.
-=item B<MaxSetDuration> I<Seconds>
+=item B<Retention> I<Seconds>
-The B<MaxSetDuration> option limits the maximum age of items in the I<Sorted
-Sets>. After reaching this age, metrics will get purged. Set to zero to disable
-(i.e. no age lmit), which is the default behavior.
+The B<Retention> option sets the retention period of new Time series keys. After
+reaching this age, data points will get purged. Set to zero to disable (i.e. no
+age lmit), which is the default behavior.
=item B<StoreRates> B<true>|B<false>
If set to B<true> (the default), convert counter values to rates. If set to
-B<false> counter values are stored as is, i.e. as an increasing integer number.
+B<false>, counter values are stored as is, i.e. as an increasing integer
+number.
+
+=item B<Prefix> I<Prefix>
+
+Prefix used when keys. Defaults to no prefix. If a prefix is configured, metric
+names are prefixed with the configured string verbatim, without an additional
+separation character.
=back
#include <hiredis/hiredis.h>
#include <sys/time.h>
-#ifndef REDIS_DEFAULT_PREFIX
-#define REDIS_DEFAULT_PREFIX "collectd/"
+#ifndef REDIS_DEFAULT_PORT
+#define REDIS_DEFAULT_PORT 6379
#endif
struct wr_node_s;
cdtime_t timeout;
char *prefix;
int database;
- int max_set_size;
- cdtime_t max_set_duration;
+ cdtime_t retention;
bool store_rates;
int (*reconnect)(wr_node_t *node);
ERROR("write_redis plugin: Connecting to host \"%s\" (port %i) failed: "
"Unknown reason",
(node->host != NULL) ? node->host : "localhost",
- (node->port != 0) ? node->port : 6379);
+ (node->port != 0) ? node->port : REDIS_DEFAULT_PORT);
return ENOTCONN;
}
if (node->conn->err) {
ERROR("write_redis plugin: Connecting to host \"%s\" (port %i) failed: %s",
(node->host != NULL) ? node->host : "localhost",
- (node->port != 0) ? node->port : 6379, node->conn->errstr);
+ (node->port != 0) ? node->port : REDIS_DEFAULT_PORT,
+ node->conn->errstr);
disconnect(node);
return ENOTCONN;
}
return 0;
}
-static int execute(wr_node_t *node, int argc, char const **argv) {
- redisReply *rr = redisCommandArgv(node->conn, argc, argv, NULL);
- if (rr == NULL) {
- strbuf_t cmd = STRBUF_CREATE;
- for (int i = 0; i < argc; i++) {
- if (i != 0) {
- strbuf_print(&cmd, " ");
- }
- strbuf_print(&cmd, argv[i]);
+static void print_execute_error(int argc, char const **argv, char const *msg) {
+ strbuf_t cmd = STRBUF_CREATE;
+ for (int i = 0; i < argc; i++) {
+ if (i != 0) {
+ strbuf_print(&cmd, " ");
}
-
- ERROR("write_redis plugin: Command \"%s\" failed: %s", cmd.ptr,
- node->conn->errstr);
- STRBUF_DESTROY(cmd);
- return (node->conn->err != 0) ? node->conn->err : -1;
+ strbuf_print(&cmd, argv[i]);
}
- freeReplyObject(rr);
- return 0;
-}
-
-static int apply_set_size(wr_node_t *node, char const *id) {
- if (node->max_set_size <= 0) {
- return 0;
- }
+ ERROR("write_redis plugin: Command \"%s\" failed: %s", cmd.ptr, msg);
- strbuf_t max_rank = STRBUF_CREATE;
- strbuf_printf(&max_rank, "%d", -1 * node->max_set_size - 1);
-
- char const *cmd[] = {"ZREMRANGEBYRANK", id, "0", max_rank.ptr};
- int err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
-
- STRBUF_DESTROY(max_rank);
- return err;
+ STRBUF_DESTROY(cmd);
}
-static int apply_set_duration(wr_node_t *node, char const *id,
- cdtime_t last_update) {
- if (node->max_set_duration == 0 || last_update < node->max_set_duration) {
- return 0;
+static int execute(wr_node_t *node, int argc, char const **argv) {
+ redisReply *rr = redisCommandArgv(node->conn, argc, argv, NULL);
+ if (rr == NULL) {
+ print_execute_error(argc, argv, node->conn->errstr);
+ return (node->conn->err != 0) ? node->conn->err : -1;
+ }
+ if (rr->type == REDIS_REPLY_ERROR) {
+ print_execute_error(argc, argv, rr->str);
+ freeReplyObject(rr);
+ return -1;
}
- strbuf_t min_time = STRBUF_CREATE;
- // '(' indicates 'less than' in redis CLI.
- strbuf_printf(&min_time, "(%.9f",
- CDTIME_T_TO_DOUBLE(last_update - node->max_set_duration));
-
- char const *cmd[] = {"ZREMRANGEBYSCORE", id, "-inf", min_time.ptr};
- int err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
-
- STRBUF_DESTROY(min_time);
- return err;
+ freeReplyObject(rr);
+ return 0;
}
static int add_resource_to_global_set(wr_node_t *node, char const *id) {
return node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
}
-static int write_metric_value(wr_node_t *node, metric_t const *m,
- char const *id) {
+static int ts_add(wr_node_t *node, metric_t const *m, char const *id) {
+ strbuf_t m_time = STRBUF_CREATE;
+ strbuf_printf(&m_time, "%" PRIu64, CDTIME_T_TO_MS(m->time));
+
strbuf_t value = STRBUF_CREATE;
int err = format_values(&value, m, node->store_rates);
if (err != 0) {
- return err;
+ ERROR("write_redis plugin: format_values failed: %s", STRERROR(err));
+ goto cleanup;
}
- strbuf_t m_time = STRBUF_CREATE;
- strbuf_printf(&m_time, "%.9f", CDTIME_T_TO_DOUBLE(m->time));
+ // format_values returns a string with "<timestamp>:<value>"; create a new
+ // pointer that points to "<value>" directly.
+ char const *value_ptr = strchr(value.ptr, ':');
+ if (value_ptr == NULL) {
+ ERROR("write_redis plugin: format_values returned \"%s\", want string "
+ "containing ':'",
+ value.ptr);
+ goto cleanup;
+ }
+ value_ptr++;
- char const *cmd[] = {"ZADD", id, m_time.ptr, value.ptr};
+ // RedisTimeSeries doesn't handle NANs.
+ if (strcmp("nan", value_ptr) == 0) {
+ goto cleanup;
+ }
+
+ char const *cmd[] = {"TS.ADD", id, m_time.ptr, value_ptr};
err = node->execute(node, STATIC_ARRAY_SIZE(cmd), cmd);
- STRBUF_DESTROY(m_time);
+cleanup:
STRBUF_DESTROY(value);
+ STRBUF_DESTROY(m_time);
+ return err;
+}
+
+static int ts_create(wr_node_t *node, metric_t const *m,
+ char const *metric_id) {
+ strbuf_t retention = STRBUF_CREATE;
+ strbuf_printf(&retention, "%" PRIu64, CDTIME_T_TO_MS(node->retention));
+
+ size_t cmd_cap = 11 + 2 * m->label.num;
+ char const *cmd[cmd_cap];
+ memset(cmd, 0, sizeof(cmd));
+ cmd[0] = "TS.CREATE";
+ cmd[1] = metric_id;
+ cmd[2] = "RETENTION";
+ cmd[3] = retention.ptr;
+ cmd[4] = "ENCODING";
+ cmd[5] = "COMPRESSED";
+ cmd[6] = "DUPLICATE_POLICY";
+ cmd[7] = "FIRST";
+ cmd[8] = "LABELS";
+ cmd[9] = "metric.name";
+ cmd[10] = m->family->name;
+
+ size_t cmd_len = 11;
+ for (size_t i = 0; i < m->label.num; i++) {
+ assert(cmd_len + 2 <= cmd_cap);
+ cmd[cmd_len] = m->label.ptr[i].name;
+ cmd[cmd_len + 1] = m->label.ptr[i].value;
+ cmd_len += 2;
+ }
+
+ int err = node->execute(node, (int)cmd_len, cmd);
+
+ STRBUF_DESTROY(retention);
return err;
}
return err;
}
- err = write_metric_value(node, m, id.ptr);
+ if (is_new) {
+ // An error is returned even if the existing and new keys are equal
+ // => ignore
+ (void)ts_create(node, m, id.ptr);
+ }
+
+ err = ts_add(node, m, id.ptr);
if (err != 0) {
goto cleanup;
}
}
}
- err = apply_set_size(node, id.ptr);
- if (err != 0) {
- goto cleanup;
- }
-
- err = apply_set_duration(node, id.ptr, m->time);
- if (err != 0) {
- goto cleanup;
- }
-
cleanup:
STRBUF_DESTROY(id);
return err;
int err =
write_metric(node, resource_id.ptr, fam->metric.ptr + i, is_new[i]);
if (err != 0) {
- goto cleanup;
+ WARNING("write_redis plugin: write_metric failed with %s, aborting at "
+ "index %zu",
+ STRERROR(err), i);
+ continue;
}
}
*node = (wr_node_t){
.store_rates = true,
+ .port = REDIS_DEFAULT_PORT,
.reconnect = reconnect,
.disconnect = disconnect,
status = cf_util_get_string(child, &node->prefix);
} else if (strcasecmp("Database", child->key) == 0) {
status = cf_util_get_int(child, &node->database);
- } else if (strcasecmp("MaxSetSize", child->key) == 0) {
- status = cf_util_get_int(child, &node->max_set_size);
- } else if (strcasecmp("MaxSetDuration", child->key) == 0) {
- status = cf_util_get_cdtime(child, &node->max_set_duration);
+ } else if (strcasecmp("Retention", child->key) == 0) {
+ status = cf_util_get_cdtime(child, &node->retention);
} else if (strcasecmp("StoreRates", child->key) == 0) {
status = cf_util_get_boolean(child, &node->store_rates);
} else
{
.ptr =
(label_pair_t[]){
- {"metric.name", "m1"},
+ {"lbl", "v1"},
},
.num = 1,
},
{
.ptr =
(label_pair_t[]){
- {"metric.name", "m2"},
+ {"lbl", "v2"},
},
.num = 1,
},
wr_node_t node = {
.store_rates = false,
+ .retention = TIME_T_TO_CDTIME_T(86400),
.reconnect = fake_reconnect,
.disconnect = fake_disconnect,
#define RESOURCE_ID "{\"test\":\"wr_write\"}"
#define METRIC_ONE_ID \
"{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":" \
- "{\"metric.name\":\"m1\"}}"
+ "{\"lbl\":\"v1\"}}"
#define METRIC_TWO_ID \
"{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":" \
- "{\"metric.name\":\"m2\"}}"
+ "{\"lbl\":\"v2\"}}"
char *want_commands_new[] = {
- "ZADD metric/" METRIC_ONE_ID " 100.000000000 100.000:42",
+ // clang-format off
+ "TS.CREATE metric/" METRIC_ONE_ID " RETENTION 86400000 ENCODING COMPRESSED DUPLICATE_POLICY FIRST LABELS metric.name unit.test lbl v1",
+ "TS.ADD metric/" METRIC_ONE_ID " 100000 42",
"SADD resource/" RESOURCE_ID " metric/" METRIC_ONE_ID,
- "ZADD metric/" METRIC_TWO_ID " 100.123456780 100.123:23",
+ "TS.CREATE metric/" METRIC_TWO_ID " RETENTION 86400000 ENCODING COMPRESSED DUPLICATE_POLICY FIRST LABELS metric.name unit.test lbl v2",
+ "TS.ADD metric/" METRIC_TWO_ID " 100123 23",
"SADD resource/" RESOURCE_ID " metric/" METRIC_TWO_ID,
"SADD resources resource/" RESOURCE_ID,
+ // clang-format on
};
size_t want_commands_new_num = STATIC_ARRAY_SIZE(want_commands_new);
// for known metrics we expect only the ZADD commands
char *want_commands_known[] = {
- "ZADD metric/" METRIC_ONE_ID " 110.000000000 110.000:42",
- "ZADD metric/" METRIC_TWO_ID " 110.123456780 110.123:23",
+ "TS.ADD metric/" METRIC_ONE_ID " 110000 42",
+ "TS.ADD metric/" METRIC_TWO_ID " 110123 23",
};
size_t want_commands_known_num = STATIC_ARRAY_SIZE(want_commands_known);