The plugin writes one I<Set> per I<resource> (e.g. host). The set's key is the
JSON encoded resource attributes, prefixed by C<resource/>. The values are the
-metric IDs as outlined above.
+metric IDs as outlined above. The write is only performed when the metric is
+"new", i.e. it is observed for the first time.
Finally, the keys of all resource I<Sets> are written to a global I<Set>,
called C<resources>. A list of resources can be retrieved using the C<SMEMBERS>
-I<Redis> command.
+I<Redis> command. The write is only performed if all metrics in the metric
+family are "new", see above.
Putting it all together: for a metric family with a single metric, the
-following Redit operations are performced:
+following Redit operations are performced when the metric is encountered for
+the first time:
ZADD metric/<metric_id> <timestamp> <value>
SADD resource/<resource_id> metric/<metric_id>
SADD resources resource/<resource_id>
+On subsequent writes, only the C<ZADD> command is performed.
+
You can specify the database to use with the B<Database> parameter (default is
C<0>). See L<http://redis.io/commands#sorted_set> and
L<http://redis.io/commands#set> for details.
#include "collectd.h"
-#include "plugin.h"
+#include "daemon/plugin.h"
+#include "daemon/utils_cache.h"
#include "utils/common/common.h"
#include "utils/format_json/format_json.h"
}
static int write_metric(wr_node_t *node, char const *resource_id,
- metric_t const *m) {
+ metric_t const *m, bool is_new) {
strbuf_t id = STRBUF_CREATE;
if (node->prefix != NULL) {
strbuf_print(&id, node->prefix);
goto cleanup;
}
- err = add_metric_to_resource(node, resource_id, id.ptr);
- if (err != 0) {
- goto cleanup;
+ if (is_new) {
+ err = add_metric_to_resource(node, resource_id, id.ptr);
+ if (err != 0) {
+ goto cleanup;
+ }
}
err = apply_set_size(node, id.ptr);
return err;
}
+static bool metric_is_new(metric_t const *m) {
+ cdtime_t first_time = 0;
+ int err = uc_get_first_time(m, &first_time);
+ if (err != 0) {
+ ERROR("write_redis plugin: uc_get_first_time failed: %s", STRERROR(err));
+ return true;
+ }
+
+ return m->time == first_time;
+}
+
static int wr_write(metric_family_t const *fam, user_data_t *ud) {
wr_node_t *node = ud->data;
return err;
}
+ // determine whether metrics are new before grabbing the log.
+ bool all_new = true;
+ bool is_new[fam->metric.num];
+ memset(is_new, 0, sizeof(is_new));
+
+ for (size_t i = 0; i < fam->metric.num; i++) {
+ is_new[i] = metric_is_new(fam->metric.ptr + i);
+ all_new = all_new && is_new[i];
+ }
+
pthread_mutex_lock(&node->lock);
err = node->reconnect(node);
}
for (size_t i = 0; i < fam->metric.num; i++) {
- int err = write_metric(node, resource_id.ptr, fam->metric.ptr + i);
+ int err =
+ write_metric(node, resource_id.ptr, fam->metric.ptr + i, is_new[i]);
if (err != 0) {
goto cleanup;
}
}
- err = add_resource_to_global_set(node, resource_id.ptr);
- if (err != 0) {
- goto cleanup;
+ if (all_new) {
+ err = add_resource_to_global_set(node, resource_id.ptr);
+ if (err != 0) {
+ goto cleanup;
+ }
}
cleanup:
static int fake_reconnect(wr_node_t *node) { return 0; }
-DEF_TEST(usage_rate) {
+DEF_TEST(wr_write) {
+ uc_init();
+
metric_family_t fam = {
.name = "unit.test",
.type = METRIC_TYPE_GAUGE,
{
.ptr =
(label_pair_t[]){
- {"test", "usage_rate"},
+ {"test", "wr_write"},
},
.num = 1,
},
fam.metric.ptr[i].family = &fam;
}
+ CHECK_ZERO(uc_update(&fam));
+
wr_node_t node = {
.store_rates = false,
CHECK_ZERO(wr_write(&fam, &ud));
-#define RESOURCE_ID "{\"test\":\"usage_rate\"}"
+#define RESOURCE_ID "{\"test\":\"wr_write\"}"
#define METRIC_ONE_ID \
"{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":" \
"{\"metric.name\":\"m1\"}}"
#define METRIC_TWO_ID \
"{\"name\":\"unit.test\",\"resource\":" RESOURCE_ID ",\"labels\":" \
"{\"metric.name\":\"m2\"}}"
- char *want_commands[] = {
+ char *want_commands_new[] = {
"ZADD metric/" METRIC_ONE_ID " 100.000000000 100.000:42",
"SADD resource/" RESOURCE_ID " metric/" METRIC_ONE_ID,
"ZADD metric/" METRIC_TWO_ID " 100.123456780 100.123:23",
"SADD resource/" RESOURCE_ID " metric/" METRIC_TWO_ID,
"SADD resources resource/" RESOURCE_ID,
};
- size_t want_commands_num = STATIC_ARRAY_SIZE(want_commands);
+ size_t want_commands_new_num = STATIC_ARRAY_SIZE(want_commands_new);
+
+ for (size_t i = 0; i < want_commands_new_num && i < got_commands_num; i++) {
+ EXPECT_EQ_STR(want_commands_new[i], got_commands[i]);
+ }
+ EXPECT_EQ_INT(want_commands_new_num, got_commands_num);
+
+ // clear the global got_commands array
+ node.disconnect(&node);
+
+ // advance time
+ cdtime_t interval = TIME_T_TO_CDTIME_T(10);
+ for (size_t i = 0; i < fam.metric.num; i++) {
+ fam.metric.ptr[i].time += interval;
+ }
+
+ CHECK_ZERO(uc_update(&fam));
+
+ CHECK_ZERO(wr_write(&fam, &ud));
+
+ // for known metrics we expect only the ZADD commands
+ char *want_commands_known[] = {
+ "ZADD metric/" METRIC_ONE_ID " 110.000000000 110.000:42",
+ "ZADD metric/" METRIC_TWO_ID " 110.123456780 110.123:23",
+ };
+ size_t want_commands_known_num = STATIC_ARRAY_SIZE(want_commands_known);
- for (size_t i = 0; i < want_commands_num && i < got_commands_num; i++) {
- EXPECT_EQ_STR(want_commands[i], got_commands[i]);
+ for (size_t i = 0; i < want_commands_known_num && i < got_commands_num; i++) {
+ EXPECT_EQ_STR(want_commands_known[i], got_commands[i]);
}
- EXPECT_EQ_INT(want_commands_num, got_commands_num);
+ EXPECT_EQ_INT(want_commands_known_num, got_commands_num);
+
+ // clear the global got_commands array
+ node.disconnect(&node);
return 0;
}
int main(void) {
- RUN_TEST(usage_rate);
+ RUN_TEST(wr_write);
END_TEST;
}