]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
Various: Use metric_t internally
authorFlorian Forster <octo@google.com>
Fri, 17 Jul 2020 10:28:07 +0000 (12:28 +0200)
committerFlorian Forster <octo@google.com>
Wed, 29 Jul 2020 11:40:03 +0000 (13:40 +0200)
This is a WIP

This commit is where the major transform takes palce; the internal
representation is changed, and the read plugins use that indirectly
when they submit a value list transparently.

The baromerter read plugin has been converted to using metric_t
representation natively.

The write plugins have to be adapted to use these new single data
source metrics.

Signed-off-by: Manoj Srivastava <srivasta@google.com>
Change-Id: I9bdeaa59de5c58d11c0ae0e7b55d68d9e4d77fa1

# Conflicts:
# src/daemon/metrics_list_test.c

18 files changed:
src/aggregation.c
src/amqp.c
src/amqp1.c
src/barometer.c
src/check_uptime.c
src/grpc.cc
src/mqtt.c
src/perl.c
src/postgresql.c
src/pyvalues.c
src/snmp_agent.c
src/threshold.c
src/write_graphite.c
src/write_http.c
src/write_kafka.c
src/write_log.c
src/write_redis.c
src/write_riemann_threshold.c

index 308df7448c1cfb6b6be27b13eff08977b637b717..23d259caabaeda11777f8a78e386652aa9057477 100644 (file)
@@ -715,12 +715,12 @@ static int agg_read(void) /* {{{ */
   return (success > 0) ? 0 : -1;
 } /* }}} int agg_read */
 
-static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
+static int agg_write(metric_t const *metric_p, /* {{{ */
                      __attribute__((unused)) user_data_t *user_data) {
   bool created_by_aggregation = false;
   /* Ignore values that were created by the aggregation plugin to avoid weird
    * effects. */
-  (void)meta_data_get_boolean(vl->meta, "aggregation:created",
+  (void)meta_data_get_boolean(metric_p->meta->meta, "aggregation:created",
                               &created_by_aggregation);
   if (created_by_aggregation)
     return 0;
@@ -730,7 +730,7 @@ static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
   if (lookup == NULL)
     status = ENOENT;
   else {
-    status = lookup_search(lookup, ds, vl);
+    status = lookup_search(lookup, metric_p);
     if (status > 0)
       status = 0;
   }
index fceb40b7a9e2dafa24bf3cc0a8a10a9c7dc65044..8405bc02763cec994c0d6d42f2bca0cee8453b21 100644 (file)
@@ -31,6 +31,7 @@
 #include "plugin.h"
 #include "utils/cmds/putval.h"
 #include "utils/common/common.h"
+#include "utils/avltree/avltree.h"
 #include "utils/format_graphite/format_graphite.h"
 #include "utils/format_json/format_json.h"
 #include "utils_random.h"
@@ -787,26 +788,56 @@ static int camqp_write_locked(camqp_config_t *conf, /* {{{ */
   return status;
 } /* }}} int camqp_write_locked */
 
-static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int camqp_write(const metric_t *metric_p, /* {{{ */
                        user_data_t *user_data) {
   camqp_config_t *conf = user_data->data;
   char routing_key[6 * DATA_MAX_NAME_LEN];
   char buffer[8192];
   int status;
+  char *index_p = (char *)&buffer[0];
 
-  if ((ds == NULL) || (vl == NULL) || (conf == NULL))
+  if ((metric_p == NULL) || (metric_p->ds == NULL) || (conf == NULL))
     return EINVAL;
 
   if (conf->routing_key != NULL) {
     sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
   } else {
-    ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
-              vl->host, vl->plugin, vl->plugin_instance, vl->type,
-              vl->type_instance);
+    int buffer_size = sizeof(routing_key);
+    int tmp_str_len = 0;
+    tmp_str_len = strlen(metric_p->identity->name);
+    if (tmp_str_len < buffer_size) {
+      snprintf(index_p, tmp_str_len + 1, "%s", metric_p->identity->name);
+      index_p += tmp_str_len; /* This is the location of the trailing nul */
+      buffer_size -= tmp_str_len;
+    }
+
+    if (metric_p->identity->root_p != NULL) {
+      c_avl_iterator_t *iter_p = c_avl_get_iterator(metric_p->identity->root_p);
+      if (iter_p != NULL) {
+        char *key_p = NULL;
+        char *value_p = NULL;
+        while ((c_avl_iterator_next(iter_p, (void **)&key_p,
+                                    (void **)&value_p)) == 0) {
+          if ((key_p != NULL) && (value_p != NULL)) {
+            tmp_str_len = strlen(key_p) + strlen(value_p) + 2;
+            if (tmp_str_len < buffer_size) {
+              snprintf(index_p, tmp_str_len + 1, ";%s=%s", key_p, value_p);
+              index_p += tmp_str_len;
+              buffer_size -= tmp_str_len;
+            } else {
+              snprintf(index_p, buffer_size, ";%s", value_p);
+             buffer_size = 0;
+             break;
+            }
+          }
+        }
+        c_avl_iterator_destroy(iter_p);
+      }
+    }
 
     /* Switch slashes (the only character forbidden by collectd) and dots
      * (the separation character used by AMQP). */
-    for (size_t i = 0; routing_key[i] != 0; i++) {
+    for (size_t i = 0; index_p[i] != 0; i++) {
       if (routing_key[i] == '.')
         routing_key[i] = '/';
       else if (routing_key[i] == '/')
@@ -815,7 +846,7 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
 
   if (conf->format == CAMQP_FORMAT_COMMAND) {
-    status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
+    status = cmd_create_putval(buffer, sizeof(buffer), metric_p);
     if (status != 0) {
       ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
       return status;
@@ -825,11 +856,11 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     size_t bfill = 0;
 
     format_json_initialize(buffer, &bfill, &bfree);
-    format_json_value_list(buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+    format_json_metric(buffer, &bfill, &bfree, metric_p, conf->store_rates);
     format_json_finalize(buffer, &bfill, &bfree);
   } else if (conf->format == CAMQP_FORMAT_GRAPHITE) {
     status =
-        format_graphite(buffer, sizeof(buffer), ds, vl, conf->prefix,
+        format_graphite(buffer, sizeof(buffer), metric_p, conf->prefix,
                         conf->postfix, conf->escape_char, conf->graphite_flags);
     if (status != 0) {
       ERROR("amqp plugin: format_graphite failed with status %i.", status);
index c4f14d12c7066b917a18423872fd2171b7984f34..20e7cc543bb7a1b9d06d1d3ed7d2d6ec5920b6cf 100644 (file)
@@ -430,7 +430,7 @@ static int amqp1_notify(notification_t const *n,
 
 } /* }}} int amqp1_notify */
 
-static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int amqp1_write(const metric_t *metric_p, /* {{{ */
                        user_data_t *user_data) {
   int status = 0;
   size_t bfree = BUFSIZE;
@@ -479,7 +479,7 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     break;
   case AMQP1_FORMAT_JSON:
     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
-    format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
+    format_json_metric((char *)cdm->mbuf.start, &bfill, &bfree, metric_p,
                            instance->store_rates);
     status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
     if (status != 0) {
index fd733b4959ef002c49766c9c58ba4bea444aaf27..121e8beac273d159421aaa6f4bb11874f846336f 100644 (file)
@@ -17,6 +17,7 @@
  *
  * Authors:
  *   Tomas Menzl
+ *   Manoj Srivastava <srivasta at google.com>
  **/
 
 #include "collectd.h"
@@ -315,7 +316,6 @@ static double averaging_add_sample(averaging_t *avg, long int sample) {
  */
 typedef struct temperature_list_s {
   char *sensor_name;               /**< sensor name/reference */
-  size_t num_values;               /**< number of values (usually one) */
   bool initialized;                /**< sensor already provides data */
   struct temperature_list_s *next; /**< next in the list */
 } temperature_list_t;
@@ -339,7 +339,6 @@ static int temp_list_add(temperature_list_t *list, const char *sensor) {
 
   new_temp->sensor_name = strdup(sensor);
   new_temp->initialized = 0;
-  new_temp->num_values = 0;
   if (new_temp->sensor_name == NULL) {
     free(new_temp);
     return -1;
@@ -387,8 +386,7 @@ static void temp_list_delete(temperature_list_t **list) {
 static int get_reference_temperature(double *result) {
   temperature_list_t *list = temp_list;
 
-  gauge_t *values = NULL; /**< rate values */
-  size_t values_num = 0;  /**< number of rate values */
+  gauge_t value; /**< rate values */
 
   gauge_t values_history[REF_TEMP_AVG_NUM];
 
@@ -407,7 +405,7 @@ static int get_reference_temperature(double *result) {
        dynamic changing of number of temperarure values in runtime yet (are
        there any such cases?). */
     if (!list->initialized) {
-      if (uc_get_rate_by_name(list->sensor_name, &values, &values_num)) {
+      if (uc_get_rate_by_name(list->sensor_name, &value)) {
         DEBUG(
             "barometer: get_reference_temperature - rate \"%s\" not found yet",
             list->sensor_name);
@@ -415,38 +413,32 @@ static int get_reference_temperature(double *result) {
         continue;
       }
 
-      DEBUG("barometer: get_reference_temperature - initialize \"%s\", %" PRIsz
+      DEBUG("barometer: get_reference_temperature - initialize \"%s\"" PRIsz
             " vals",
-            list->sensor_name, values_num);
+            list->sensor_name);
 
       list->initialized = 1;
-      list->num_values = values_num;
-
-      for (size_t i = 0; i < values_num; ++i) {
-        DEBUG("barometer: get_reference_temperature - rate %" PRIsz ": %lf **",
-              i, values[i]);
-        if (!isnan(values[i])) {
-          avg_sum += values[i];
-          ++avg_num;
-        }
+
+      DEBUG("barometer: get_reference_temperature - rate " PRIsz ": %lf **",
+            value);
+      if (!isnan(value)) {
+        avg_sum += value;
+        ++avg_num;
       }
-      free(values);
-      values = NULL;
     }
 
     /* It is OK to get here the first time as well, in the worst case
        the history will full of NANs. */
     if (uc_get_history_by_name(list->sensor_name, values_history,
-                               REF_TEMP_AVG_NUM, list->num_values)) {
+                               REF_TEMP_AVG_NUM)) {
       ERROR("barometer: get_reference_temperature - history \"%s\" lost",
             list->sensor_name);
       list->initialized = 0;
-      list->num_values = 0;
       list = list->next;
       continue;
     }
 
-    for (size_t i = 0; i < REF_TEMP_AVG_NUM * list->num_values; ++i) {
+    for (size_t i = 0; i < REF_TEMP_AVG_NUM; ++i) {
       DEBUG("barometer: get_reference_temperature - history %" PRIsz ": %lf", i,
             values_history[i]);
       if (!isnan(values_history[i])) {
@@ -457,33 +449,27 @@ static int get_reference_temperature(double *result) {
 
     if (avg_num == 0) /* still no history? fallback to current */
     {
-      if (uc_get_rate_by_name(list->sensor_name, &values, &values_num)) {
+      if (uc_get_rate_by_name(list->sensor_name, &value)) {
         ERROR("barometer: get_reference_temperature - rate \"%s\" lost",
               list->sensor_name);
         list->initialized = 0;
-        list->num_values = 0;
         list = list->next;
         continue;
       }
 
-      for (size_t i = 0; i < values_num; ++i) {
-        DEBUG("barometer: get_reference_temperature - rate last %" PRIsz
-              ": %lf **",
-              i, values[i]);
-        if (!isnan(values[i])) {
-          avg_sum += values[i];
-          ++avg_num;
-        }
+      DEBUG("barometer: get_reference_temperature - rate last %" PRIsz
+            ": %lf **",
+            value);
+      if (!isnan(value)) {
+        avg_sum += value;
+        ++avg_num;
       }
-      free(values);
-      values = NULL;
     }
 
     if (avg_num == 0) {
       ERROR("barometer: get_reference_temperature - could not read \"%s\"",
             list->sensor_name);
       list->initialized = 0;
-      list->num_values = 0;
     } else {
       average = avg_sum / (double)avg_num;
       if (isnan(*result))
@@ -1372,12 +1358,24 @@ static int MPL115_collectd_barometer_read(void) {
   double temperature = 0.0;
   double norm_pressure = 0.0;
 
-  value_list_t vl = VALUE_LIST_INIT;
-  value_t values[1];
+  metrics_list_t *index_p = NULL;
+  metrics_list_t *ml = NULL;
+  ml = (metrics_list_t *)malloc(sizeof(*ml));
+  if (ml == NULL)
+    return -1;
 
-  DEBUG("barometer: MPL115_collectd_barometer_read");
+  index_p = ml;
+
+  index_p->metric.ds = NULL;
+  index_p->metric.identity = NULL;
+  index_p->metric.meta = NULL;
+  index_p->metric.time = cdtime();
+  index_p->metric.interval = plugin_get_interval();
+  index_p->next_p = NULL;
 
+  DEBUG("barometer: MPL115_collectd_barometer_read");
   if (!configured) {
+    destroy_metrics_list(ml);
     return -1;
   }
 
@@ -1399,36 +1397,81 @@ static int MPL115_collectd_barometer_read(void) {
   }
 
   result = MPL115_read_averaged(&pressure, &temperature);
-  if (result)
+  if (result) {
+    destroy_metrics_list(ml);
     return result;
+  }
 
   norm_pressure = abs_to_mean_sea_level_pressure(pressure);
 
-  sstrncpy(vl.plugin, "barometer", sizeof(vl.plugin));
-  sstrncpy(vl.plugin_instance, "mpl115", sizeof(vl.plugin_instance));
+  index_p->metric.identity =
+      create_identity("barometer", "pressure", "value", NULL);
+  if (index_p->metric.identity == NULL) {
+    destroy_metrics_list(ml);
+    return -1;
+  }
+  result =
+      identity_add_label(index_p->metric.identity, "plugin_instance", "mpl115");
+  if (result) {
+    destroy_metrics_list(ml);
+    return result;
+  }
 
-  vl.values_len = 1;
-  vl.values = values;
+  result = identity_add_label(index_p->metric.identity, "type_instance",
+                              "normalized");
+  if (result) {
+    destroy_metrics_list(ml);
+    return result;
+  }
 
-  /* dispatch normalized air pressure */
-  sstrncpy(vl.type, "pressure", sizeof(vl.type));
-  sstrncpy(vl.type_instance, "normalized", sizeof(vl.type_instance));
-  values[0].gauge = norm_pressure;
-  plugin_dispatch_values(&vl);
+  index_p->metric.value.gauge = norm_pressure;
+  index_p = index_p->next_p;
+  index_p->metric.ds = NULL;
+  index_p->metric.identity = NULL;
+  index_p->metric.meta = NULL;
+  index_p->metric.time = ml->metric.time;
+  index_p->metric.interval = ml->metric.interval;
+  index_p->next_p = NULL;
+
+  index_p->metric.identity =
+      create_identity("barometer", "pressure", "value", NULL);
+  if (index_p->metric.identity == NULL) {
+    destroy_metrics_list(ml);
+    return -1;
+  }
+  result =
+      identity_add_label(index_p->metric.identity, "plugin_instance", "mpl115");
+  if (result) {
+    destroy_metrics_list(ml);
+    return result;
+  }
 
-  /* dispatch absolute air pressure */
-  sstrncpy(vl.type, "pressure", sizeof(vl.type));
-  sstrncpy(vl.type_instance, "absolute", sizeof(vl.type_instance));
-  values[0].gauge = pressure;
-  plugin_dispatch_values(&vl);
+  result =
+      identity_add_label(index_p->metric.identity, "type_instance", "absolute");
+  if (result) {
+    destroy_metrics_list(ml);
+    return result;
+  }
+
+  index_p->metric.value.gauge = pressure;
+
+  index_p = index_p->next_p;
+  index_p->metric.ds = NULL;
+  index_p->metric.identity = NULL;
+  index_p->metric.meta = NULL;
+  index_p->metric.time = ml->metric.time;
+  index_p->metric.interval = ml->metric.interval;
+  index_p->next_p = NULL;
 
   /* dispatch sensor temperature */
-  sstrncpy(vl.type, "temperature", sizeof(vl.type));
-  sstrncpy(vl.type_instance, "", sizeof(vl.type_instance));
-  values[0].gauge = temperature;
-  plugin_dispatch_values(&vl);
+  index_p->metric.identity =
+      create_identity("barometer", "temperature", "value", NULL);
 
-  return 0;
+  index_p->metric.value.gauge = temperature;
+
+  result = plugin_dispatch_metric_list(ml);
+  destroy_metrics_list(ml);
+  return result;
 }
 
 /**
index 33363b54f5b03248183db3ea0872e73c8968a103..e877505bcc4b7a90b3d9535b7fa85d8aca981290 100644 (file)
@@ -54,19 +54,19 @@ static int format_uptime(unsigned long uptime_sec, char *buf, size_t bufsize) {
   return ret;
 }
 
-static int cu_notify(enum cache_event_type_e event_type, const value_list_t *vl,
+static int cu_notify(enum cache_event_type_e event_type, const metric_t *metric_p,
                      gauge_t old_uptime, gauge_t new_uptime) {
   notification_t n;
-  NOTIFICATION_INIT_VL(&n, vl);
+  notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
 
   int status;
   char *buf = n.message;
   size_t bufsize = sizeof(n.message);
 
-  n.time = vl->time;
+  n.time = metric_p->time;
 
   const char *service = "Service";
-  if (strcmp(vl->plugin, "uptime") == 0)
+  if (strcmp(metric_p->plugin, "uptime") == 0)
     service = "Host";
 
   switch (event_type) {
@@ -126,42 +126,39 @@ static int cu_cache_event(cache_event_t *event,
 
   /* For CE_VALUE_EXPIRED */
   int ret;
-  value_t *values;
-  size_t values_num;
+  value_t value;
   gauge_t old_uptime = NAN;
 
   switch (event->type) {
   case CE_VALUE_NEW:
     DEBUG("check_uptime: CE_VALUE_NEW, %s", event->value_list_name);
-    if (c_avl_get(types_tree, event->value_list->type, NULL) == 0) {
+    if (c_avl_get(types_tree, event->metric_p->type, NULL) == 0) {
       event->ret = 1;
-      assert(event->value_list->values_len > 0);
-      cu_notify(CE_VALUE_NEW, event->value_list, NAN /* old */,
-                event->value_list->values[0].gauge /* new */);
+      cu_notify(CE_VALUE_NEW, event->metric_p, NAN /* old */,
+                event->metric_p->value.gauge /* new */);
     }
     break;
   case CE_VALUE_UPDATE:
     DEBUG("check_uptime: CE_VALUE_UPDATE, %s", event->value_list_name);
-    if (uc_get_history_by_name(event->value_list_name, values_history, 2, 1)) {
+    if (uc_get_history_by_name(event->value_list_name, values_history, 2)) {
       ERROR("check_uptime plugin: Failed to get value history for %s.",
             event->value_list_name);
     } else {
       if (!isnan(values_history[0]) && !isnan(values_history[1]) &&
           values_history[0] < values_history[1]) {
-        cu_notify(CE_VALUE_UPDATE, event->value_list,
+        cu_notify(CE_VALUE_UPDATE, event->metric_p,
                   values_history[1] /* old */, values_history[0] /* new */);
       }
     }
     break;
   case CE_VALUE_EXPIRED:
     DEBUG("check_uptime: CE_VALUE_EXPIRED, %s", event->value_list_name);
-    ret = uc_get_value_by_name(event->value_list_name, &values, &values_num);
+    ret = uc_get_value_by_name(event->value_list_name, &value);
     if (ret == 0) {
-      old_uptime = values[0].gauge;
-      sfree(values);
+      old_uptime = value.gauge;
     }
 
-    cu_notify(CE_VALUE_EXPIRED, event->value_list, old_uptime, NAN /* new */);
+    cu_notify(CE_VALUE_EXPIRED, event->metric_p, old_uptime, NAN /* new */);
     break;
   }
   return 0;
index 3799e48f3f7ed88028508a24a18c1b941df11d12..522fe97bd8a671376a0bd4f26c49e5a650860736 100644 (file)
@@ -40,9 +40,9 @@ extern "C" {
 #include <fnmatch.h>
 #include <stdbool.h>
 
-#include "collectd.h"
 #include "plugin.h"
 #include "utils/common/common.h"
+#include "collectd.h"
 
 #include "daemon/utils_cache.h"
 }
@@ -76,7 +76,7 @@ static grpc::string default_addr("0.0.0.0:50051");
  * helper functions
  */
 
-static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
+static bool ident_matches(const metric_t *vl, const metric_t *matcher) {
   if (fnmatch(matcher->host, vl->host, 0))
     return false;
 
index 01c2ea26340977b26e02182f6bb76c9cc52cad22..0f52f1cb8df1a74d4e0b1a751e949c844f978784 100644 (file)
@@ -526,7 +526,7 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
     return status;
   }
 
-  status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
+  status = format_values_vl(payload, sizeof(payload), ds, vl, conf->store_rates);
   if (status != 0) {
     ERROR("mqtt plugin: format_values failed with status %d.", status);
     return status;
index c570707debebff4c767084e2e109df60795021a1..d6f316bf0b4e6c9ef4098795b3263d31ffab45d4 100644 (file)
@@ -33,8 +33,8 @@
 /* do not automatically get the thread specific Perl interpreter */
 #define PERL_NO_GET_CONTEXT
 
-#include "collectd.h"
 #include <stdbool.h>
+#include "collectd.h"
 
 #include <EXTERN.h>
 #include <perl.h>
@@ -115,8 +115,7 @@ static XS(Collectd__fc_register);
 static XS(Collectd_call_by_name);
 
 static int perl_read(user_data_t *ud);
-static int perl_write(const data_set_t *ds, const value_list_t *vl,
-                      user_data_t *user_data);
+static int perl_write(const metric_t *metric_p, user_data_t *user_data);
 static void perl_log(int level, const char *msg, user_data_t *user_data);
 static int perl_notify(const notification_t *notif, user_data_t *user_data);
 static int perl_flush(cdtime_t timeout, const char *identifier,
@@ -348,6 +347,27 @@ static size_t av2value(pTHX_ char *name, AV *array, value_t *value,
   return ds->ds_num;
 } /* static size_t av2value (char *, AV *, value_t *, size_t) */
 
+static int hv2metric_list(pTHX_ HV *hash, metric_t *metric_p) {
+  if ((NULL == hash) || (NULL == metric_p))
+    return -1;
+  SV **tmp = av_fetch(array, 0, 0);
+
+  if (NULL != tmp) {
+    if (DS_TYPE_COUNTER == metric_p->value_ds_type)
+      metric_p->value.counter = SvIV(*tmp);
+    else if (DS_TYPE_GAUGE == metric_p->value_ds_type)
+      metric_p->value.gauge = SvNV(*tmp);
+    else if (DS_TYPE_DERIVE == metric_p->value_ds_type)
+      metric_p->value.derive = SvIV(*tmp);
+    else if (DS_TYPE_ABSOLUTE == metric_p->value_ds_type)
+      metric_p->value.absolute = SvIV(*tmp);
+  } else {
+    return 0;
+  }
+  return 1;
+}
+
+
 /*
  * value list:
  * {
@@ -933,7 +953,7 @@ static int pplugin_write(pTHX_ const char *plugin, AV *data_set, HV *values) {
   if ((NULL != data_set) && (0 != av2data_set(aTHX_ data_set, vl.type, &ds)))
     return -1;
 
-  ret = plugin_write(plugin, NULL == data_set ? NULL : &ds, &vl);
+  ret = plugin_write(plugin, &metric);
   if (0 != ret)
     log_warn("Dispatching value to plugin \"%s\" failed with status %i.",
              NULL == plugin ? "<any>" : plugin, ret);
@@ -2113,7 +2133,7 @@ static int perl_read(user_data_t *user_data) {
   return pplugin_call(aTHX_ PLUGIN_READ, user_data->data);
 } /* static int perl_read (user_data_t *user_data) */
 
-static int perl_write(const data_set_t *ds, const value_list_t *vl,
+static int perl_write(const metric_t *metric_p,
                       user_data_t *user_data) {
   int status;
   dTHX;
@@ -2139,7 +2159,7 @@ static int perl_write(const data_set_t *ds, const value_list_t *vl,
 
   log_debug("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX,
             perl_threads->number_of_threads);
-  status = pplugin_call(aTHX_ PLUGIN_WRITE, user_data->data, ds, vl);
+  status = pplugin_call(aTHX_ PLUGIN_WRITE, user_data->data, metric_p);
 
   if (aTHX == perl_threads->head->interp)
     pthread_mutex_unlock(&perl_threads->mutex);
index 1495a99b889f7fab21b349db1fdc2d551399e870..bdc64982342395f7bfdd10fdcda39320e16399b7 100644 (file)
@@ -728,7 +728,6 @@ static char *values_to_sqlarray(const data_set_t *ds, const value_list_t *vl,
     else if (store_rates) {
       if (rates == NULL)
         rates = uc_get_rate_vl(ds, vl);
-
       if (rates == NULL) {
         log_err("c_psql_write: Failed to determine rate");
         return NULL;
index 0fc987a35d8f0963abf68cb1898370d7bb4c21f8..c733224b854c056c2aa6aaf66e9e61deb7e6f352 100644 (file)
@@ -710,16 +710,15 @@ static PyObject *Values_write(Values *self, PyObject *args, PyObject *kwds) {
   int ret;
   const data_set_t *ds;
   size_t size;
-  value_t *value;
-  value_list_t value_list = VALUE_LIST_INIT;
+  metric_t metric = STRUCT_METRIC_INIT;
   PyObject *values = self->values, *meta = self->meta;
   double time = self->data.time, interval = self->interval;
-  char *host = NULL, *plugin = NULL, *plugin_instance = NULL, *type = NULL,
-       *type_instance = NULL, *dest = NULL;
+  char *host = NULL, *plugin = NULL, *type = NULL,
+       *data_source = NULL, *dest = NULL;
 
   static char *kwlist[] = {
-      "destination",   "type",   "values", "plugin_instance",
-      "type_instance", "plugin", "host",   "time",
+    "destination",   "type",   "values",
+      "dat_source", "plugin", "host",   "time",
       "interval",      "meta",   NULL};
   if (!PyArg_ParseTupleAndKeywords(
           args, kwds, "et|etOetetetetdiO", kwlist, NULL, &dest, NULL, &type,
@@ -727,6 +726,10 @@ static PyObject *Values_write(Values *self, PyObject *args, PyObject *kwds) {
           NULL, &host, &time, &interval, &meta))
     return NULL;
 
+  metric.identity = create_identity((plugin ? plugin : self->data.plugin),
+                                    (type ? type : self->data.type),
+                                    (host ? host : self->data.host));
+
   sstrncpy(value_list.host, host ? host : self->data.host,
            sizeof(value_list.host));
   sstrncpy(value_list.plugin, plugin ? plugin : self->data.plugin,
@@ -744,7 +747,7 @@ static PyObject *Values_write(Values *self, PyObject *args, PyObject *kwds) {
     PyErr_SetString(PyExc_RuntimeError, "type not set");
     return NULL;
   }
-  ds = plugin_get_ds(value_list.type);
+  ds = plugin_get_ds((type ? type : self->data.type));
   if (ds == NULL) {
     PyErr_Format(PyExc_TypeError, "Dataset %s not found", value_list.type);
     return NULL;
index 342faf44bdfab3c20d7b37118b04eae0d15c1431..8339cb45126f1ee02c77e8201da7d872a250d6aa 100644 (file)
 
 #include "collectd.h"
 
+#include <regex.h>
 #include "utils/avltree/avltree.h"
 #include "utils/common/common.h"
 #include "utils_cache.h"
 #include "utils_llist.h"
-#include <regex.h>
 
 #include <net-snmp/net-snmp-config.h>
 
 #define GROUP_UNUSED -1
 #define OID_EXISTS 1
 #define MAX_KEY_SOURCES 5
-#define MAX_INDEX_KEYS 5
 #define MAX_MATCHES 5
 
 /* Identifies index key source */
 enum index_key_src_e {
   INDEX_HOST = 0,
   INDEX_PLUGIN,
-  INDEX_PLUGIN_INSTANCE,
   INDEX_TYPE,
-  INDEX_TYPE_INSTANCE
+  INDEX_DATA_SOURCE,
+  MAX_INDEX_KEYS
 };
 typedef enum index_key_src_e index_key_src_t;
 
@@ -91,8 +90,8 @@ struct table_definition_s {
   c_avl_tree_t *index_instance;
   c_avl_tree_t *instance_oids; /* Tells us how many OIDs registered for every
                                   instance; */
-  index_key_t index_keys[MAX_INDEX_KEYS]; /* Stores information about what each
-                                             index key represents */
+  index_key_t index_keys[MAX_INDEX_KEYS - 1]; /* Stores information about what
+                                             each index key represents */
   int index_keys_len;
   netsnmp_variable_list *index_list_cont; /* Index key container used for
                                              generating as well as parsing
@@ -108,9 +107,8 @@ typedef struct table_definition_s table_definition_t;
 struct data_definition_s {
   char *name;
   char *plugin;
-  char *plugin_instance;
   char *type;
-  char *type_instance;
+  char *data_source;
   const table_definition_t *table;
   bool is_index_key; /* indicates if table column is an index key */
   int index_key_pos; /* position in indexes list */
@@ -137,11 +135,10 @@ static snmp_agent_ctx_t *g_agent;
 static const char *index_opts[MAX_KEY_SOURCES] = {
     "Hostname", "Plugin", "PluginInstance", "Type", "TypeInstance"};
 
-#define CHECK_DD_TYPE(_dd, _p, _pi, _t, _ti)                                   \
+#define CHECK_DD_TYPE(_dd, _p, _t, _ds)                                        \
   (_dd->plugin ? !strcmp(_dd->plugin, _p) : 0) &&                              \
-      (_dd->plugin_instance ? !strcmp(_dd->plugin_instance, _pi) : 1) &&       \
       (_dd->type ? !strcmp(_dd->type, _t) : 0) &&                              \
-      (_dd->type_instance ? !strcmp(_dd->type_instance, _ti) : 1)
+      (_dd->data_source ? !strcmp(_dd->data_source, _ds) : 1)
 
 static int snmp_agent_shutdown(void);
 static void *snmp_agent_thread_run(void *arg);
@@ -273,13 +270,6 @@ static int snmp_agent_validate_config(void) {
         return -EINVAL;
       }
 
-      if (dd->plugin_instance) {
-        ERROR(PLUGIN_NAME ": PluginInstance should not be defined for table "
-                          "data type '%s'.'%s'",
-              td->name, dd->name);
-        return -EINVAL;
-      }
-
       if (dd->oids_len == 0) {
         ERROR(PLUGIN_NAME ": No OIDs defined for '%s'.'%s'", td->name,
               dd->name);
@@ -287,8 +277,8 @@ static int snmp_agent_validate_config(void) {
       }
 
       if (dd->is_index_key) {
-        if (dd->type || dd->type_instance) {
-          ERROR(PLUGIN_NAME ": Type and TypeInstance are not valid for "
+        if (dd->type) {
+          ERROR(PLUGIN_NAME ": Type is not valid for "
                             "index data '%s'.'%s'",
                 td->name, dd->name);
           return -EINVAL;
@@ -491,11 +481,12 @@ static int snmp_agent_tokenize(const char *input, c_avl_tree_t *tokens,
 }
 
 static int snmp_agent_fill_index_list(table_definition_t *td,
-                                      value_list_t const *vl) {
+                                      metric_t const *metric_p) {
   int ret;
   int i;
   netsnmp_variable_list *key = td->index_list_cont;
   char const *ptr;
+  char *host_p = NULL;
 
   for (i = 0; i < td->index_keys_len; i++) {
     /* var should never be NULL */
@@ -506,19 +497,22 @@ static int snmp_agent_fill_index_list(table_definition_t *td,
     /* Generating list filled with all data necessary to generate an OID */
     switch (source) {
     case INDEX_HOST:
-      ptr = vl->host;
+      ret = c_avl_get(metric_p->identity->root_p, (void *)"__host__",
+                      (void **)&host_p);
+      if (ret != 0) {
+        ERROR(PLUGIN_NAME ": Unknown index key resource host");
+        return -EINVAL;
+      }
+      ptr = host_p;
       break;
     case INDEX_PLUGIN:
-      ptr = vl->plugin;
-      break;
-    case INDEX_PLUGIN_INSTANCE:
-      ptr = vl->plugin_instance;
+      ptr = metric_p->plugin;
       break;
     case INDEX_TYPE:
-      ptr = vl->type;
+      ptr = metric_p->type;
       break;
-    case INDEX_TYPE_INSTANCE:
-      ptr = vl->type_instance;
+    case INDEX_DATA_SOURCE:
+      ptr = metric_p->ds->name;
       break;
     default:
       ERROR(PLUGIN_NAME ": Unknown index key source provided");
@@ -587,9 +581,8 @@ static int snmp_agent_prep_index_list(table_definition_t const *td,
     switch (td->index_keys[i].source) {
     case INDEX_HOST:
     case INDEX_PLUGIN:
-    case INDEX_PLUGIN_INSTANCE:
     case INDEX_TYPE:
-    case INDEX_TYPE_INSTANCE:
+    case INDEX_DATA_SOURCE:
       snmp_varlist_add_variable(index_list, NULL, 0, td->index_keys[i].type,
                                 NULL, 0);
       break;
@@ -602,12 +595,13 @@ static int snmp_agent_prep_index_list(table_definition_t const *td,
 }
 
 static int snmp_agent_generate_index(table_definition_t *td,
-                                     value_list_t const *vl, oid_t *index_oid) {
+                                     metric_t const *metric_p,
+                                     oid_t *index_oid) {
 
   /* According to given information by index_keys list
    * index OID is going to be built
    */
-  int ret = snmp_agent_fill_index_list(td, vl);
+  int ret = snmp_agent_fill_index_list(td, metric_p);
   if (ret != 0)
     return -EINVAL;
 
@@ -766,9 +760,9 @@ static void snmp_agent_table_data_remove(data_definition_t *dd,
   }
 }
 
-static int snmp_agent_clear_missing(const value_list_t *vl,
+static int snmp_agent_clear_missing(const metric_t *metric_p,
                                     __attribute__((unused)) user_data_t *ud) {
-  if (vl == NULL)
+  if (metric_p == NULL)
     return -EINVAL;
 
   for (llentry_t *te = llist_head(g_agent->tables); te != NULL; te = te->next) {
@@ -778,8 +772,8 @@ static int snmp_agent_clear_missing(const value_list_t *vl,
       data_definition_t *dd = de->value;
 
       if (!dd->is_index_key) {
-        if (CHECK_DD_TYPE(dd, vl->plugin, vl->plugin_instance, vl->type,
-                          vl->type_instance)) {
+        if (CHECK_DD_TYPE(dd, metric_p->plugin, metric_p->type,
+                          metric_p->ds->name)) {
           oid_t *index_oid = calloc(1, sizeof(*index_oid));
 
           if (index_oid == NULL) {
@@ -787,7 +781,7 @@ static int snmp_agent_clear_missing(const value_list_t *vl,
             return -ENOMEM;
           }
 
-          int ret = snmp_agent_generate_index(td, vl, index_oid);
+          int ret = snmp_agent_generate_index(td, metric_p, index_oid);
 
           if (ret == 0)
             snmp_agent_table_data_remove(dd, td, index_oid);
@@ -815,9 +809,8 @@ static void snmp_agent_free_data(data_definition_t **dd) {
 
   sfree((*dd)->name);
   sfree((*dd)->plugin);
-  sfree((*dd)->plugin_instance);
   sfree((*dd)->type);
-  sfree((*dd)->type_instance);
+  sfree((*dd)->data_source);
   sfree((*dd)->oids);
 
   sfree(*dd);
@@ -987,8 +980,11 @@ static int snmp_agent_format_name(char *name, int name_len,
 
   if (index_oid == NULL) {
     /* It's a scalar */
-    format_name(name, name_len, hostname_g, dd->plugin, dd->plugin_instance,
-                dd->type, dd->type_instance);
+    snprintf(name, name_len, "%s/%s/%s/%s",
+             (hostname_g == NULL) ? "" : hostname_g,
+             (dd->plugin == NULL) ? "" : dd->plugin,
+             (dd->type == NULL) ? "" : dd->type,
+             (dd->data_source == NULL) ? "" : dd->data_source);
   } else {
     /* Need to parse string index OID */
     const table_definition_t *td = dd->table;
@@ -1000,15 +996,15 @@ static int snmp_agent_format_name(char *name, int name_len,
     netsnmp_variable_list *key = td->index_list_cont;
     char str[DATA_MAX_NAME_LEN];
     char *fields[MAX_KEY_SOURCES] = {hostname_g, dd->plugin,
-                                     dd->plugin_instance, dd->type,
-                                     dd->type_instance};
+                                     dd->type,
+                                     dd->data_source};
 
     /* Looking for simple keys only */
     while (key != NULL) {
       if (!td->index_keys[i].regex) {
         index_key_src_t source = td->index_keys[i].source;
 
-        if (source < INDEX_HOST || source > INDEX_TYPE_INSTANCE) {
+        if (source < INDEX_HOST || source > INDEX_DATA_SOURCE) {
           ERROR(PLUGIN_NAME ": Unkown index key source!");
           return -EINVAL;
         }
@@ -1031,9 +1027,8 @@ static int snmp_agent_format_name(char *name, int name_len,
       if (ret != 0)
         return ret;
     }
-    format_name(name, name_len, fields[INDEX_HOST], fields[INDEX_PLUGIN],
-                fields[INDEX_PLUGIN_INSTANCE], fields[INDEX_TYPE],
-                fields[INDEX_TYPE_INSTANCE]);
+    snprintf(name, name_len, fields[INDEX_HOST], fields[INDEX_PLUGIN],
+            fields[INDEX_TYPE],fields[INDEX_DATA_SOURCE]);
     for (i = 0; i < MAX_KEY_SOURCES; i++) {
       if (td->tokens[i])
         sfree(fields[i]);
@@ -1095,36 +1090,28 @@ static int snmp_agent_form_reply(struct netsnmp_request_info_s *requests,
 
   DEBUG(PLUGIN_NAME ": Identifier '%s'", name);
 
-  value_t *values;
-  size_t values_num;
+  value_t value;
   const data_set_t *ds = plugin_get_ds(dd->type);
   if (ds == NULL) {
     ERROR(PLUGIN_NAME ": Data set not found for '%s' type", dd->type);
     return SNMP_NOSUCHINSTANCE;
   }
 
-  ret = uc_get_value_by_name(name, &values, &values_num);
+  ret = uc_get_value_by_name(name, &value);
 
   if (ret != 0) {
     ERROR(PLUGIN_NAME ": Failed to get value for '%s'", name);
     return SNMP_NOSUCHINSTANCE;
   }
 
-  assert(ds->ds_num == values_num);
-  assert(oid_index < (int)values_num);
-
   char data[DATA_MAX_NAME_LEN];
   size_t data_len = sizeof(data);
-  ret = snmp_agent_set_vardata(
-      data, &data_len, dd->oids[oid_index].type, dd->scale, dd->shift,
-      &values[oid_index], sizeof(values[oid_index]), ds->ds[oid_index].type);
-
-  sfree(values);
+  ret = snmp_agent_set_vardata(data, &data_len, dd->oids[oid_index].type,
+                               dd->scale, dd->shift, &value, sizeof(value),
+                               ds->ds[oid_index].type);
 
-  if (ret != 0) {
-    ERROR(PLUGIN_NAME ": Failed to convert '%s' value to snmp data", name);
-    return SNMP_NOSUCHINSTANCE;
-  }
+  ERROR(PLUGIN_NAME ": Failed to convert '%s' value to snmp data", name);
+  return SNMP_NOSUCHINSTANCE;
 
   requests->requestvb->type = dd->oids[oid_index].type;
   snmp_set_var_typed_value(requests->requestvb, requests->requestvb->type,
@@ -1620,12 +1607,10 @@ static int snmp_agent_config_table_column(table_definition_t *td,
       option_tmp = option;
     } else if (strcasecmp("Plugin", option->key) == 0)
       ret = cf_util_get_string(option, &dd->plugin);
-    else if (strcasecmp("PluginInstance", option->key) == 0)
-      ret = cf_util_get_string(option, &dd->plugin_instance);
     else if (strcasecmp("Type", option->key) == 0)
       ret = cf_util_get_string(option, &dd->type);
-    else if (strcasecmp("TypeInstance", option->key) == 0)
-      ret = cf_util_get_string(option, &dd->type_instance);
+    else if (strcasecmp("DatSource", option->key) == 0)
+      ret = cf_util_get_string(option, &dd->data_source);
     else if (strcasecmp("Shift", option->key) == 0)
       ret = cf_util_get_double(option, &dd->shift);
     else if (strcasecmp("Scale", option->key) == 0)
@@ -2026,8 +2011,8 @@ error:
   return ret;
 }
 
-static int snmp_agent_write(value_list_t const *vl) {
-  if (vl == NULL)
+static int snmp_agent_write(metric_t const *metric_p) {
+  if (metric_p == NULL)
     return -EINVAL;
 
   for (llentry_t *te = llist_head(g_agent->tables); te != NULL; te = te->next) {
@@ -2037,8 +2022,8 @@ static int snmp_agent_write(value_list_t const *vl) {
       data_definition_t *dd = de->value;
 
       if (!dd->is_index_key) {
-        if (CHECK_DD_TYPE(dd, vl->plugin, vl->plugin_instance, vl->type,
-                          vl->type_instance)) {
+        if (CHECK_DD_TYPE(dd, metric_p->plugin, metric_p->type,
+                          metric_p->ds->name)) {
           oid_t *index_oid = calloc(1, sizeof(*index_oid));
           bool free_index_oid = true;
 
@@ -2047,7 +2032,7 @@ static int snmp_agent_write(value_list_t const *vl) {
             return -ENOMEM;
           }
 
-          int ret = snmp_agent_generate_index(td, vl, index_oid);
+          int ret = snmp_agent_generate_index(td, metric_p, index_oid);
 
           if (ret == 0)
             ret = snmp_agent_update_index(dd, td, index_oid, &free_index_oid);
@@ -2065,12 +2050,12 @@ static int snmp_agent_write(value_list_t const *vl) {
   return 0;
 }
 
-static int snmp_agent_collect(const data_set_t *ds, const value_list_t *vl,
+static int snmp_agent_collect(const metric_t *metric_p,
                               user_data_t __attribute__((unused)) * user_data) {
 
   pthread_mutex_lock(&g_agent->lock);
 
-  snmp_agent_write(vl);
+  snmp_agent_write(metric_p);
 
   pthread_mutex_unlock(&g_agent->lock);
 
index cf40a882bf83383029035868a51970f8565f4800..72557d385b130da9c21a1a832145ae97ff2be9b9 100644 (file)
@@ -52,8 +52,11 @@ static int ut_threshold_add(const threshold_t *th) { /* {{{ */
   threshold_t *th_ptr;
   int status = 0;
 
-  if (format_name(name, sizeof(name), th->host, th->plugin, th->plugin_instance,
-                  th->type, th->type_instance) != 0) {
+  if (snprintf(
+          name, sizeof(name), "%s/%s/%s/%s", (th->host == NULL) ? "" : th->host,
+          (th->plugin == NULL) ? "" : th->plugin,
+          (th->type == NULL) ? "" : th->type,
+          (th->data_source == NULL) ? "" : th->data_source) > sizeof(name)) {
     ERROR("ut_threshold_add: format_name failed.");
     return -1;
   }
@@ -76,8 +79,7 @@ static int ut_threshold_add(const threshold_t *th) { /* {{{ */
 
   pthread_mutex_lock(&threshold_lock);
 
-  th_ptr = threshold_get(th->host, th->plugin, th->plugin_instance, th->type,
-                         th->type_instance);
+  th_ptr = threshold_get(th->host, th->plugin, th->type, th->data_source);
 
   while ((th_ptr != NULL) && (th_ptr->next != NULL))
     th_ptr = th_ptr->next;
@@ -138,10 +140,7 @@ static int ut_config_type(const threshold_t *th_orig, oconfig_item_t *ci) {
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *option = ci->children + i;
 
-    if (strcasecmp("Instance", option->key) == 0)
-      status = cf_util_get_string_buffer(option, th.type_instance,
-                                         sizeof(th.type_instance));
-    else if (strcasecmp("DataSource", option->key) == 0)
+    if (strcasecmp("DataSource", option->key) == 0)
       status = cf_util_get_string_buffer(option, th.data_source,
                                          sizeof(th.data_source));
     else if (strcasecmp("WarningMax", option->key) == 0)
@@ -208,9 +207,9 @@ static int ut_config_plugin(const threshold_t *th_orig, oconfig_item_t *ci) {
 
     if (strcasecmp("Type", option->key) == 0)
       status = ut_config_type(&th, option);
-    else if (strcasecmp("Instance", option->key) == 0)
-      status = cf_util_get_string_buffer(option, th.plugin_instance,
-                                         sizeof(th.plugin_instance));
+    else if (strcasecmp("Source", option->key) == 0)
+      status = cf_util_get_string_buffer(option, th.data_source,
+                                         sizeof(th.data_source));
     else {
       WARNING("threshold values: Option `%s' not allowed inside a `Plugin' "
               "block.",
@@ -276,9 +275,8 @@ static int ut_config_host(const threshold_t *th_orig, oconfig_item_t *ci) {
  * if appropriate.
  * Does not fail.
  */
-static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
-                           const threshold_t *th, const gauge_t *values,
-                           int ds_index, int state) { /* {{{ */
+static int ut_report_state(const metric_t *metric_p, const threshold_t *th,
+                           const gauge_t value, int state) { /* {{{ */
   int state_old;
   notification_t n;
 
@@ -289,22 +287,22 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
 
   /* Check if hits matched */
   if ((th->hits != 0)) {
-    int hits = uc_get_hits(ds, vl);
+    int hits = uc_get_hits(metric_p);
     /* STATE_OKAY resets hits unless PERSIST_OK flag is set. Hits resets if
      * threshold is hit. */
     if (((state == STATE_OKAY) && ((th->flags & UT_FLAG_PERSIST_OK) == 0)) ||
         (hits > th->hits)) {
-      DEBUG("ut_report_state: reset uc_get_hits = 0");
-      uc_set_hits(ds, vl, 0); /* reset hit counter and notify */
+      DEBUG("ut_report_state: reset uc_get_hits_vl = 0");
+      uc_set_hits(metric_p, 0); /* reset hit counter and notify */
     } else {
       DEBUG("ut_report_state: th->hits = %d, uc_get_hits = %d", th->hits,
-            uc_get_hits(ds, vl));
-      (void)uc_inc_hits(ds, vl, 1); /* increase hit counter */
+            uc_get_hits(mstric_p));
+      (void)uc_inc_hits(metric_p, 1); /* increase hit counter */
       return 0;
     }
   } /* end check hits */
 
-  state_old = uc_get_state(ds, vl);
+  state_old = uc_get_state(metric_p);
 
   /* If the state didn't change, report if `persistent' is specified. If the
    * state is `okay', then only report if `persist_ok` flag is set. */
@@ -319,9 +317,9 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
   }
 
   if (state != state_old)
-    uc_set_state(ds, vl, state);
+    uc_set_state(metric_p, state);
 
-  NOTIFICATION_INIT_VL(&n, vl);
+  notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
 
   buf = n.message;
   bufsize = sizeof(n.message);
@@ -333,30 +331,34 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
   else
     n.severity = NOTIF_FAILURE;
 
-  n.time = vl->time;
-
-  status = ssnprintf(buf, bufsize, "Host %s, plugin %s", vl->host, vl->plugin);
-  buf += status;
-  bufsize -= status;
-
-  if (vl->plugin_instance[0] != '\0') {
-    status = ssnprintf(buf, bufsize, " (instance %s)", vl->plugin_instance);
-    buf += status;
-    bufsize -= status;
-  }
+  n.time = metric_p->time;
 
-  status = ssnprintf(buf, bufsize, " type %s", vl->type);
+  status = ssnprintf(buf, bufsize, "Name %s", metric_p->identity->name);
   buf += status;
   bufsize -= status;
 
-  if (vl->type_instance[0] != '\0') {
-    status = ssnprintf(buf, bufsize, " (instance %s)", vl->type_instance);
-    buf += status;
-    bufsize -= status;
+  if (metric_p->identity->root_p != NULL) {
+    c_avl_iterator_t *iter_p = c_avl_get_iterator(metric_p->identity->root_p);
+    if (iter_p != NULL) {
+      char *key_p = NULL;
+      char *value_p = NULL;
+      while ((c_avl_iterator_next(iter_p, (void **)&key_p,
+                                  (void **)&value_p)) == 0) {
+        if ((key_p != NULL) && (value_p != NULL)) {
+          int tmp_str_len = strlen(key_p) + strlen(value_p) + 2;
+          if (tmp_str_len < bufsize) {
+            status = ssnprintf(buf, bufsize, "%s %s", key_p, value_p);
+            buf += status;
+            bufsize -= status;
+          }
+        }
+      }
+    }
+    c_avl_iterator_destroy(iter_p);
   }
 
-  plugin_notification_meta_add_string(&n, "DataSource", ds->ds[ds_index].name);
-  plugin_notification_meta_add_double(&n, "CurrentValue", values[ds_index]);
+  plugin_notification_meta_add_string(&n, "DataSource", metric_p->ds->name);
+  plugin_notification_meta_add_double(&n, "CurrentValue", value);
   plugin_notification_meta_add_double(&n, "WarningMin", th->warning_min);
   plugin_notification_meta_add_double(&n, "WarningMax", th->warning_max);
   plugin_notification_meta_add_double(&n, "FailureMin", th->failure_min);
@@ -370,7 +372,7 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
       ssnprintf(buf, bufsize,
                 ": All data sources are within range again. "
                 "Current value of \"%s\" is %f.",
-                ds->ds[ds_index].name, values[ds_index]);
+                metric_p->ds->name, value);
   } else if (state == STATE_UNKNOWN) {
     ERROR("ut_report_state: metric transition to UNKNOWN from a different "
           "state. This shouldn't happen.");
@@ -387,7 +389,7 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
         ssnprintf(buf, bufsize,
                   ": Data source \"%s\" is currently "
                   "%f. That is within the %s region of %f%s and %f%s.",
-                  ds->ds[ds_index].name, values[ds_index],
+                  metric_p->ds->name, value,
                   (state == STATE_ERROR) ? "failure" : "warning", min,
                   ((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "", max,
                   ((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "");
@@ -395,33 +397,16 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
         ssnprintf(buf, bufsize,
                   ": Data source \"%s\" is currently "
                   "%f. That is %s the %s threshold of %f%s.",
-                  ds->ds[ds_index].name, values[ds_index],
-                  isnan(min) ? "below" : "above",
+                  metric_p->ds->name, value, isnan(min) ? "below" : "above",
                   (state == STATE_ERROR) ? "failure" : "warning",
                   isnan(min) ? max : min,
                   ((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "");
       }
     } else if (th->flags & UT_FLAG_PERCENTAGE) {
-      gauge_t value;
-      gauge_t sum;
-
-      sum = 0.0;
-      for (size_t i = 0; i < vl->values_len; i++) {
-        if (isnan(values[i]))
-          continue;
-
-        sum += values[i];
-      }
-
-      if (sum == 0.0)
-        value = NAN;
-      else
-        value = 100.0 * values[ds_index] / sum;
-
       ssnprintf(buf, bufsize,
                 ": Data source \"%s\" is currently "
                 "%g (%.2f%%). That is %s the %s threshold of %.2f%%.",
-                ds->ds[ds_index].name, values[ds_index], value,
+                metric_p->ds->name, value, value,
                 (value < min) ? "below" : "above",
                 (state == STATE_ERROR) ? "failure" : "warning",
                 (value < min) ? min : max);
@@ -430,10 +415,9 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
       ssnprintf(buf, bufsize,
                 ": Data source \"%s\" is currently "
                 "%f. That is %s the %s threshold of %f.",
-                ds->ds[ds_index].name, values[ds_index],
-                (values[ds_index] < min) ? "below" : "above",
+                metric_p->ds->name, value, (value < min) ? "below" : "above",
                 (state == STATE_ERROR) ? "failure" : "warning",
-                (values[ds_index] < min) ? min : max);
+                (value < min) ? min : max);
     }
   }
 
@@ -453,20 +437,17 @@ static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
  * appropriate.
  * Does not fail.
  */
-static int ut_check_one_data_source(
-    const data_set_t *ds, const value_list_t __attribute__((unused)) * vl,
-    const threshold_t *th, const gauge_t *values, int ds_index) { /* {{{ */
-  const char *ds_name;
+static int ut_check_one_data_source(const metric_t *metric_p,
+                                    const threshold_t *th,
+                                    const gauge_t value) { /* {{{ */
   int is_warning = 0;
   int is_failure = 0;
   int prev_state = STATE_OKAY;
 
   /* check if this threshold applies to this data source */
-  if (ds != NULL) {
-    ds_name = ds->ds[ds_index].name;
-    if ((th->data_source[0] != 0) && (strcmp(ds_name, th->data_source) != 0))
-      return STATE_UNKNOWN;
-  }
+  if ((th->data_source[0] != 0) &&
+      (strcmp(metric_p->ds->name, th->data_source) != 0))
+    return STATE_UNKNOWN;
 
   if ((th->flags & UT_FLAG_INVERT) != 0) {
     is_warning--;
@@ -476,7 +457,7 @@ static int ut_check_one_data_source(
   /* XXX: This is an experimental code, not optimized, not fast, not reliable,
    * and probably, do not work as you expect. Enjoy! :D */
   if (th->hysteresis > 0) {
-    prev_state = uc_get_state(ds, vl);
+    prev_state = uc_get_state(metric_p);
     /* The purpose of hysteresis is elliminating flapping state when the value
      * oscilates around the thresholds. In other words, what is important is
      * the previous state; if the new value would trigger a transition, make
@@ -501,24 +482,24 @@ static int ut_check_one_data_source(
     }
 
     if ((!isnan(th->failure_min) &&
-         (th->failure_min + hysteresis_for_failure > values[ds_index])) ||
+         (th->failure_min + hysteresis_for_failure > value)) ||
         (!isnan(th->failure_max) &&
-         (th->failure_max - hysteresis_for_failure < values[ds_index])))
+         (th->failure_max - hysteresis_for_failure < value)))
       is_failure++;
 
     if ((!isnan(th->warning_min) &&
-         (th->warning_min + hysteresis_for_warning > values[ds_index])) ||
+         (th->warning_min + hysteresis_for_warning > value)) ||
         (!isnan(th->warning_max) &&
-         (th->warning_max - hysteresis_for_warning < values[ds_index])))
+         (th->warning_max - hysteresis_for_warning < value)))
       is_warning++;
 
   } else { /* no hysteresis */
-    if ((!isnan(th->failure_min) && (th->failure_min > values[ds_index])) ||
-        (!isnan(th->failure_max) && (th->failure_max < values[ds_index])))
+    if ((!isnan(th->failure_min) && (th->failure_min > value)) ||
+        (!isnan(th->failure_max) && (th->failure_max < value)))
       is_failure++;
 
-    if ((!isnan(th->warning_min) && (th->warning_min > values[ds_index])) ||
-        (!isnan(th->warning_max) && (th->warning_max < values[ds_index])))
+    if ((!isnan(th->warning_min) && (th->warning_min > value)) ||
+        (!isnan(th->warning_max) && (th->warning_max < value)))
       is_warning++;
   }
 
@@ -540,60 +521,38 @@ static int ut_check_one_data_source(
  * defined.
  * Returns less than zero if the data set doesn't have any data sources.
  */
-static int ut_check_one_threshold(const data_set_t *ds, const value_list_t *vl,
-                                  const threshold_t *th, const gauge_t *values,
-                                  int *ret_ds_index) { /* {{{ */
+static int ut_check_one_threshold(const metric_t *metric_p,
+                                  const threshold_t *th,
+                                  const gauge_t value) { /* {{{ */
   int ret = -1;
-  int ds_index = -1;
-  gauge_t values_copy[ds->ds_num];
-
-  memcpy(values_copy, values, sizeof(values_copy));
+  gauge_t values_copy = value;
 
   if ((th->flags & UT_FLAG_PERCENTAGE) != 0) {
     int num = 0;
     gauge_t sum = 0.0;
 
-    if (ds->ds_num == 1) {
-      WARNING(
-          "ut_check_one_threshold: The %s type has only one data "
-          "source, but you have configured to check this as a percentage. "
-          "That doesn't make much sense, because the percentage will always "
-          "be 100%%!",
-          ds->type);
-    }
-
     /* Prepare `sum' and `num'. */
-    for (size_t i = 0; i < ds->ds_num; i++)
-      if (!isnan(values[i])) {
-        num++;
-        sum += values[i];
-      }
+    if (!isnan(value)) {
+      num++;
+      sum += value;
+    }
 
     if ((num == 0)       /* All data sources are undefined. */
         || (sum == 0.0)) /* Sum is zero, cannot calculate percentage. */
     {
-      for (size_t i = 0; i < ds->ds_num; i++)
-        values_copy[i] = NAN;
+      values_copy = NAN;
     } else /* We can actually calculate the percentage. */
     {
-      for (size_t i = 0; i < ds->ds_num; i++)
-        values_copy[i] = 100.0 * values[i] / sum;
+      values_copy = 100.0;
     }
   } /* if (UT_FLAG_PERCENTAGE) */
 
-  for (size_t i = 0; i < ds->ds_num; i++) {
-    int status;
-
-    status = ut_check_one_data_source(ds, vl, th, values_copy, i);
-    if (ret < status) {
-      ret = status;
-      ds_index = i;
-    }
-  } /* for (ds->ds_num) */
-
-  if (ret_ds_index != NULL)
-    *ret_ds_index = ds_index;
+  int status;
 
+  status = ut_check_one_data_source(metric_p, th, values_copy);
+  if (ret < status) {
+    ret = status;
+  }
   return ret;
 } /* }}} int ut_check_one_threshold */
 
@@ -606,16 +565,15 @@ static int ut_check_one_threshold(const data_set_t *ds, const value_list_t *vl,
  * Returns zero on success and if no threshold has been configured. Returns
  * less than zero on failure.
  */
-static int ut_check_threshold(const data_set_t *ds, const value_list_t *vl,
+static int ut_check_threshold(const metric_t *metric_p,
                               __attribute__((unused))
                               user_data_t *ud) { /* {{{ */
   threshold_t *th;
-  gauge_t *values;
+  gauge_t value;
   int status;
 
   int worst_state = -1;
   threshold_t *worst_th = NULL;
-  int worst_ds_index = -1;
 
   if (threshold_tree == NULL)
     return 0;
@@ -623,46 +581,39 @@ static int ut_check_threshold(const data_set_t *ds, const value_list_t *vl,
   /* Is this lock really necessary? So far, thresholds are only inserted at
    * startup. -octo */
   pthread_mutex_lock(&threshold_lock);
-  th = threshold_search(vl);
+  th = threshold_search(metric_p);
   pthread_mutex_unlock(&threshold_lock);
   if (th == NULL)
     return 0;
 
   DEBUG("ut_check_threshold: Found matching threshold(s)");
 
-  values = uc_get_rate_vl(ds, vl);
-  if (values == NULL)
+  status = uc_get_rate(metric_p, &value);
+  if (status != 0)
     return 0;
 
   while (th != NULL) {
-    int ds_index = -1;
 
-    status = ut_check_one_threshold(ds, vl, th, values, &ds_index);
+    status = ut_check_one_threshold(metric_p, th, value);
     if (status < 0) {
       ERROR("ut_check_threshold: ut_check_one_threshold failed.");
-      sfree(values);
       return -1;
     }
 
     if (worst_state < status) {
       worst_state = status;
       worst_th = th;
-      worst_ds_index = ds_index;
     }
 
     th = th->next;
   } /* while (th) */
 
-  status =
-      ut_report_state(ds, vl, worst_th, values, worst_ds_index, worst_state);
+  status = ut_report_state(metric_p, worst_th, value, worst_state);
   if (status != 0) {
     ERROR("ut_check_threshold: ut_report_state failed.");
-    sfree(values);
     return -1;
   }
 
-  sfree(values);
-
   return 0;
 } /* }}} int ut_check_threshold */
 
@@ -671,31 +622,34 @@ static int ut_check_threshold(const data_set_t *ds, const value_list_t *vl,
  *
  * This function is called whenever a value goes "missing".
  */
-static int ut_missing(const value_list_t *vl,
+static int ut_missing(const metric_t *metric_p,
                       __attribute__((unused)) user_data_t *ud) { /* {{{ */
   threshold_t *th;
   cdtime_t missing_time;
-  char identifier[6 * DATA_MAX_NAME_LEN];
+  char *identifier_p = NULL;
   notification_t n;
   cdtime_t now;
 
   if (threshold_tree == NULL)
     return 0;
 
-  th = threshold_search(vl);
+  th = threshold_search(metric_p);
   /* dispatch notifications for "interesting" values only */
   if ((th == NULL) || ((th->flags & UT_FLAG_INTERESTING) == 0))
     return 0;
 
   now = cdtime();
-  missing_time = now - vl->time;
-  FORMAT_VL(identifier, sizeof(identifier), vl);
+  missing_time = now - metric_p->time;
+  if ((identifier_p = plugin_format_metric(metric_p)) != 0) {
+    ERROR("uc_update: plugin_format_metric failed.");
+  }
 
-  NOTIFICATION_INIT_VL(&n, vl);
+  notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
   ssnprintf(n.message, sizeof(n.message),
-            "%s has not been updated for %.3f seconds.", identifier,
+            "%s has not been updated for %.3f seconds.", identifier_p,
             CDTIME_T_TO_DOUBLE(missing_time));
   n.time = now;
+  sfree(identifier_p);
 
   plugin_dispatch_notification(&n);
 
index dc3b2c74878568ade6220831c77891010e5d6421..354ea71cf0e415c5d377eef1b293f3a43e324b9b 100644 (file)
@@ -387,18 +387,12 @@ static int wg_send_message(char const *message, struct wg_callback *cb) {
   return 0;
 }
 
-static int wg_write_messages(const data_set_t *ds, const value_list_t *vl,
+static int wg_write_messages(const metric_t *metric_p,
                              struct wg_callback *cb) {
   char buffer[WG_SEND_BUF_SIZE] = {0};
   int status;
 
-  if (0 != strcmp(ds->type, vl->type)) {
-    ERROR("write_graphite plugin: DS type does not match "
-          "value list type");
-    return -1;
-  }
-
-  status = format_graphite(buffer, sizeof(buffer), ds, vl, cb->prefix,
+  status = format_graphite(buffer, sizeof(buffer), metric_p, cb->prefix,
                            cb->postfix, cb->escape_char, cb->format_flags);
   if (status != 0) /* error message has been printed already. */
     return status;
@@ -411,7 +405,7 @@ static int wg_write_messages(const data_set_t *ds, const value_list_t *vl,
   return 0;
 } /* int wg_write_messages */
 
-static int wg_write(const data_set_t *ds, const value_list_t *vl,
+static int wg_write(const metric_t *metric_p,
                     user_data_t *user_data) {
   struct wg_callback *cb;
   int status;
@@ -421,7 +415,7 @@ static int wg_write(const data_set_t *ds, const value_list_t *vl,
 
   cb = user_data->data;
 
-  status = wg_write_messages(ds, vl, cb);
+  status = wg_write_messages(metric_p, cb);
 
   return status;
 }
index 6da06b42b21fc05c500ab7ee3989eb002dd3c27f..f930760b83e62eb7b26c8eedb312128fcf803e4d 100644 (file)
@@ -399,10 +399,8 @@ static void wh_callback_free(void *data) /* {{{ */
   sfree(cb);
 } /* }}} void wh_callback_free */
 
-static int wh_write_command(const data_set_t *ds,
-                            const value_list_t *vl, /* {{{ */
+static int wh_write_command(const metric_t *metric_p, /* {{{ */
                             wh_callback_t *cb) {
-  char key[10 * DATA_MAX_NAME_LEN];
   char values[512];
   char command[1024];
   size_t command_len;
@@ -413,38 +411,35 @@ static int wh_write_command(const data_set_t *ds,
   if ((cb == NULL) || (cb->send_buffer == NULL))
     return -1;
 
-  if (strcmp(ds->type, vl->type) != 0) {
-    ERROR("write_http plugin: DS type does not match "
-          "value list type");
+  /* Copy the identifier to `key' and escape it. */
+  char *metric_string_p = NULL;
+  if ((metric_string_p = plugin_format_metric(metric_p)) != 0) {
     return -1;
   }
 
-  /* Copy the identifier to `key' and escape it. */
-  status = FORMAT_VL(key, sizeof(key), vl);
-  if (status != 0) {
-    ERROR("write_http plugin: error with format_name");
-    return status;
-  }
-  escape_string(key, sizeof(key));
+  escape_string(metric_string_p, sizeof(metric_string_p));
 
   /* Convert the values to an ASCII representation and put that into
    * `values'. */
-  status = format_values(values, sizeof(values), ds, vl, cb->store_rates);
+  status = format_values(values, sizeof(values), metric_p, cb->store_rates);
   if (status != 0) {
     ERROR("write_http plugin: error with "
           "wh_value_list_to_string");
+    sfree(metric_string_p);
     return status;
   }
 
-  command_len = (size_t)snprintf(command, sizeof(command),
-                                 "PUTVAL %s interval=%.3f %s\r\n", key,
-                                 CDTIME_T_TO_DOUBLE(vl->interval), values);
+  command_len = (size_t)snprintf(
+      command, sizeof(command), "PUTVAL %s interval=%.3f %s\r\n",
+      metric_string_p, CDTIME_T_TO_DOUBLE(metric_p->interval), values);
   if (command_len >= sizeof(command)) {
     ERROR("write_http plugin: Command buffer too small: "
           "Need %" PRIsz " bytes.",
           command_len + 1);
+    sfree(metric_string_p);
     return -1;
   }
+  sfree(metric_string_p);
 
   pthread_mutex_lock(&cb->send_lock);
   if (wh_callback_init(cb) != 0) {
@@ -482,7 +477,7 @@ static int wh_write_command(const data_set_t *ds,
   return 0;
 } /* }}} int wh_write_command */
 
-static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int wh_write_json(const metric_t *metric_p, /* {{{ */
                          wh_callback_t *cb) {
   int status;
 
@@ -494,8 +489,8 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
 
   status =
-      format_json_value_list(cb->send_buffer, &cb->send_buffer_fill,
-                             &cb->send_buffer_free, ds, vl, cb->store_rates);
+      format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
+                        &cb->send_buffer_free, metric_p, cb->store_rates);
   if (status == -ENOMEM) {
     status = wh_flush_nolock(/* timeout = */ 0, cb);
     if (status != 0) {
@@ -505,8 +500,8 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     }
 
     status =
-        format_json_value_list(cb->send_buffer, &cb->send_buffer_fill,
-                               &cb->send_buffer_free, ds, vl, cb->store_rates);
+        format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
+                          &cb->send_buffer_free, metric_p, cb->store_rates);
   }
   if (status != 0) {
     pthread_mutex_unlock(&cb->send_lock);
@@ -524,8 +519,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   return 0;
 } /* }}} int wh_write_json */
 
-static int wh_write_kairosdb(const data_set_t *ds,
-                             const value_list_t *vl, /* {{{ */
+static int wh_write_kairosdb(const metric_t *metric_p, /* {{{ */
                              wh_callback_t *cb) {
   int status;
 
@@ -540,8 +534,8 @@ static int wh_write_kairosdb(const data_set_t *ds,
     }
   }
 
-  status = format_kairosdb_value_list(
-      cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+  status = format_kairosdb_metric(
+      cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, metric_p,
       cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
       cb->data_ttl, cb->metrics_prefix);
   if (status == -ENOMEM) {
@@ -552,8 +546,8 @@ static int wh_write_kairosdb(const data_set_t *ds,
       return status;
     }
 
-    status = format_kairosdb_value_list(
-        cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+    status = format_kairosdb_metric(
+        cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, metric_p,
         cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
         cb->data_ttl, cb->metrics_prefix);
   }
@@ -573,7 +567,7 @@ static int wh_write_kairosdb(const data_set_t *ds,
   return 0;
 } /* }}} int wh_write_kairosdb */
 
-static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int wh_write(const metric_t *metric_p, /* {{{ */
                     user_data_t *user_data) {
   wh_callback_t *cb;
   int status;
@@ -586,13 +580,13 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
 
   switch (cb->format) {
   case WH_FORMAT_JSON:
-    status = wh_write_json(ds, vl, cb);
+    status = wh_write_json(metric_p, cb);
     break;
   case WH_FORMAT_KAIROSDB:
-    status = wh_write_kairosdb(ds, vl, cb);
+    status = wh_write_kairosdb(metric_p, cb);
     break;
   default:
-    status = wh_write_command(ds, vl, cb);
+    status = wh_write_command(metric_p, cb);
     break;
   }
   return status;
index 09bb639ad30f350ecf042b80cf66431d1df0563b..5307bcd2a4d6a61cd4ab6dcd3b830c6981607d57 100644 (file)
@@ -57,7 +57,7 @@ struct kafka_topic_context {
 };
 
 static int kafka_handle(struct kafka_topic_context *);
-static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
+static int kafka_write(const metric_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
 
@@ -170,8 +170,8 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
 
 } /* }}} int kafka_handle */
 
-static int kafka_write(const data_set_t *ds, /* {{{ */
-                       const value_list_t *vl, user_data_t *ud) {
+static int kafka_write(/* {{{ */
+                       const metric_t *metric_p, user_data_t *ud) {
   int status = 0;
   void *key;
   size_t keylen = 0;
@@ -181,7 +181,8 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
   size_t blen = 0;
   struct kafka_topic_context *ctx = ud->data;
 
-  if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
+  if ((metric_p == NULL) || (metric_p->ds == NULL) ||
+      (metric_p->identity == NULL) || (ctx == NULL))
     return EINVAL;
 
   pthread_mutex_lock(&ctx->lock);
@@ -194,7 +195,7 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
 
   switch (ctx->format) {
   case KAFKA_FORMAT_COMMAND:
-    status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
+    status = cmd_create_putval(buffer, sizeof(buffer), metric_p);
     if (status != 0) {
       ERROR("write_kafka plugin: cmd_create_putval failed with status %i.",
             status);
@@ -204,13 +205,13 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
     break;
   case KAFKA_FORMAT_JSON:
     format_json_initialize(buffer, &bfill, &bfree);
-    format_json_value_list(buffer, &bfill, &bfree, ds, vl, ctx->store_rates);
+    format_json_metric(buffer, &bfill, &bfree, metric_p, ctx->store_rates);
     format_json_finalize(buffer, &bfill, &bfree);
     blen = strlen(buffer);
     break;
   case KAFKA_FORMAT_GRAPHITE:
     status =
-        format_graphite(buffer, sizeof(buffer), ds, vl, ctx->prefix,
+        format_graphite(buffer, sizeof(buffer), metric_p, ctx->prefix,
                         ctx->postfix, ctx->escape_char, ctx->graphite_flags);
     if (status != 0) {
       ERROR("write_kafka plugin: format_graphite failed with status %i.",
index 3e143168e8746b938a217bed7bb38d3a36b50cc4..9570bc6f92e59105c9946eb2341db14291bc4b7b 100644 (file)
 /* Plugin:WriteLog has to also operate without a config, so use a global. */
 int wl_format = WL_FORMAT_GRAPHITE;
 
-static int wl_write_graphite(const data_set_t *ds, const value_list_t *vl) {
+static int wl_write_graphite(const metric_t *metric_p) {
   char buffer[WL_BUF_SIZE] = {0};
   int status;
 
-  if (0 != strcmp(ds->type, vl->type)) {
-    ERROR("write_log plugin: DS type does not match value list type");
-    return -1;
-  }
-
-  status = format_graphite(buffer, sizeof(buffer), ds, vl, NULL, NULL, '_', 0);
+  status = format_graphite(buffer, sizeof(buffer), metric_p, NULL, NULL, '_', 0);
   if (status != 0) /* error message has been printed already. */
     return status;
 
@@ -61,18 +56,13 @@ static int wl_write_graphite(const data_set_t *ds, const value_list_t *vl) {
   return 0;
 } /* int wl_write_graphite */
 
-static int wl_write_json(const data_set_t *ds, const value_list_t *vl) {
+static int wl_write_json(const metric_t *metric_p) {
   char buffer[WL_BUF_SIZE] = {0};
   size_t bfree = sizeof(buffer);
   size_t bfill = 0;
 
-  if (0 != strcmp(ds->type, vl->type)) {
-    ERROR("write_log plugin: DS type does not match value list type");
-    return -1;
-  }
-
   format_json_initialize(buffer, &bfill, &bfree);
-  format_json_value_list(buffer, &bfill, &bfree, ds, vl,
+  format_json_metric(buffer, &bfill, &bfree, metric_p,
                          /* store rates = */ 0);
   format_json_finalize(buffer, &bfill, &bfree);
 
@@ -81,14 +71,14 @@ static int wl_write_json(const data_set_t *ds, const value_list_t *vl) {
   return 0;
 } /* int wl_write_json */
 
-static int wl_write(const data_set_t *ds, const value_list_t *vl,
+static int wl_write(const metric_t *metric_p,
                     __attribute__((unused)) user_data_t *user_data) {
   int status = 0;
 
   if (wl_format == WL_FORMAT_GRAPHITE) {
-    status = wl_write_graphite(ds, vl);
+    status = wl_write_graphite(metric_p);
   } else if (wl_format == WL_FORMAT_JSON) {
-    status = wl_write_json(ds, vl);
+    status = wl_write_json(metric_p);
   }
 
   return status;
index 32005cd67e7cb18f53d61f22ffbd1bf0a62f30e1..3a6003d50085e853409138191f79d4483014502c 100644 (file)
@@ -56,10 +56,9 @@ typedef struct wr_node_s wr_node_t;
 /*
  * Functions
  */
-static int wr_write(const data_set_t *ds, /* {{{ */
-                    const value_list_t *vl, user_data_t *ud) {
+static int wr_write(/* {{{ */
+                    const metric_t *metric_p, user_data_t *ud) {
   wr_node_t *node = ud->data;
-  char ident[512];
   char key[512];
   char value[512] = {0};
   char time[24];
@@ -68,19 +67,23 @@ static int wr_write(const data_set_t *ds, /* {{{ */
   int status;
   redisReply *rr;
 
-  status = FORMAT_VL(ident, sizeof(ident), vl);
-  if (status != 0)
-    return status;
+  char *metric_string_p = NULL;
+  if ((metric_string_p = plugin_format_metric(metric_p)) != 0) {
+    return -1;
+  }
+
   ssnprintf(key, sizeof(key), "%s%s",
             (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
-            ident);
-  ssnprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
+            metric_string_p);
+  ssnprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(metric_p->time));
 
   value_size = sizeof(value);
   value_ptr = &value[0];
-  status = format_values(value_ptr, value_size, ds, vl, node->store_rates);
-  if (status != 0)
+  status = format_values(value_ptr, value_size, metric_p, node->store_rates);
+  if (status != 0) {
+    sfree(metric_string_p);
     return status;
+  }
 
   pthread_mutex_lock(&node->lock);
 
@@ -93,6 +96,7 @@ static int wr_write(const data_set_t *ds, /* {{{ */
             (node->host != NULL) ? node->host : "localhost",
             (node->port != 0) ? node->port : 6379);
       pthread_mutex_unlock(&node->lock);
+      sfree(metric_string_p);
       return -1;
     } else if (node->conn->err) {
       ERROR(
@@ -100,6 +104,7 @@ static int wr_write(const data_set_t *ds, /* {{{ */
           (node->host != NULL) ? node->host : "localhost",
           (node->port != 0) ? node->port : 6379, node->conn->errstr);
       pthread_mutex_unlock(&node->lock);
+      sfree(metric_string_p);
       return -1;
     }
 
@@ -132,8 +137,9 @@ static int wr_write(const data_set_t *ds, /* {{{ */
      * remove element, scored less than 'current-max_set_duration'
      * '(...' indicates 'less than' in redis CLI.
      */
-    rr = redisCommand(node->conn, "ZREMRANGEBYSCORE %s -1 (%.9f", key,
-                      (CDTIME_T_TO_DOUBLE(vl->time) - node->max_set_duration));
+    rr = redisCommand(
+        node->conn, "ZREMRANGEBYSCORE %s -1 (%.9f", key,
+        (CDTIME_T_TO_DOUBLE(metric_p->time) - node->max_set_duration));
     if (rr == NULL)
       WARNING("ZREMRANGEBYSCORE command error. key:%s message:%s", key,
               node->conn->errstr);
@@ -144,17 +150,18 @@ static int wr_write(const data_set_t *ds, /* {{{ */
   /* TODO(octo): This is more overhead than necessary. Use the cache and
    * metadata to determine if it is a new metric and call SADD only once for
    * each metric. */
-  rr = redisCommand(
-      node->conn, "SADD %svalues %s",
-      (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX, ident);
+  rr =
+      redisCommand(node->conn, "SADD %svalues %s",
+                   (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+                   metric_string_p);
   if (rr == NULL)
-    WARNING("SADD command error. ident:%s message:%s", ident,
+    WARNING("SADD command error. ident:%s message:%s", metric_string_p,
             node->conn->errstr);
   else
     freeReplyObject(rr);
 
   pthread_mutex_unlock(&node->lock);
-
+  sfree(metric_string_p);
   return 0;
 } /* }}} int wr_write */
 
index 742379d81547e81b0e5535e900df810e40fd9ccd..d38f18e5c2e2b218b5ba23b55366f40087ca5701 100644 (file)
@@ -73,7 +73,7 @@ static int ut_check_one_data_source(
 
   /* XXX: This is an experimental code, not optimized, not fast, not reliable,
    * and probably, do not work as you expect. Enjoy! :D */
-  prev_state = uc_get_state(ds, vl);
+  prev_state = uc_get_state_vl(ds, vl);
   if ((th->hysteresis > 0) && (prev_state != STATE_OKAY) &&
       (prev_state != STATE_UNKNOWN)) {
     switch (prev_state) {