return (success > 0) ? 0 : -1;
} /* }}} int agg_read */
-static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
+static int agg_write(metric_t const *metric_p, /* {{{ */
__attribute__((unused)) user_data_t *user_data) {
bool created_by_aggregation = false;
/* Ignore values that were created by the aggregation plugin to avoid weird
* effects. */
- (void)meta_data_get_boolean(vl->meta, "aggregation:created",
+ (void)meta_data_get_boolean(metric_p->meta->meta, "aggregation:created",
&created_by_aggregation);
if (created_by_aggregation)
return 0;
if (lookup == NULL)
status = ENOENT;
else {
- status = lookup_search(lookup, ds, vl);
+ status = lookup_search(lookup, metric_p);
if (status > 0)
status = 0;
}
#include "plugin.h"
#include "utils/cmds/putval.h"
#include "utils/common/common.h"
+#include "utils/avltree/avltree.h"
#include "utils/format_graphite/format_graphite.h"
#include "utils/format_json/format_json.h"
#include "utils_random.h"
return status;
} /* }}} int camqp_write_locked */
-static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int camqp_write(const metric_t *metric_p, /* {{{ */
user_data_t *user_data) {
camqp_config_t *conf = user_data->data;
char routing_key[6 * DATA_MAX_NAME_LEN];
char buffer[8192];
int status;
+ char *index_p = (char *)&buffer[0];
- if ((ds == NULL) || (vl == NULL) || (conf == NULL))
+ if ((metric_p == NULL) || (metric_p->ds == NULL) || (conf == NULL))
return EINVAL;
if (conf->routing_key != NULL) {
sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
} else {
- ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
- vl->host, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance);
+ int buffer_size = sizeof(routing_key);
+ int tmp_str_len = 0;
+ tmp_str_len = strlen(metric_p->identity->name);
+ if (tmp_str_len < buffer_size) {
+ snprintf(index_p, tmp_str_len + 1, "%s", metric_p->identity->name);
+ index_p += tmp_str_len; /* This is the location of the trailing nul */
+ buffer_size -= tmp_str_len;
+ }
+
+ if (metric_p->identity->root_p != NULL) {
+ c_avl_iterator_t *iter_p = c_avl_get_iterator(metric_p->identity->root_p);
+ if (iter_p != NULL) {
+ char *key_p = NULL;
+ char *value_p = NULL;
+ while ((c_avl_iterator_next(iter_p, (void **)&key_p,
+ (void **)&value_p)) == 0) {
+ if ((key_p != NULL) && (value_p != NULL)) {
+ tmp_str_len = strlen(key_p) + strlen(value_p) + 2;
+ if (tmp_str_len < buffer_size) {
+ snprintf(index_p, tmp_str_len + 1, ";%s=%s", key_p, value_p);
+ index_p += tmp_str_len;
+ buffer_size -= tmp_str_len;
+ } else {
+ snprintf(index_p, buffer_size, ";%s", value_p);
+ buffer_size = 0;
+ break;
+ }
+ }
+ }
+ c_avl_iterator_destroy(iter_p);
+ }
+ }
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
- for (size_t i = 0; routing_key[i] != 0; i++) {
+ for (size_t i = 0; index_p[i] != 0; i++) {
if (routing_key[i] == '.')
routing_key[i] = '/';
else if (routing_key[i] == '/')
}
if (conf->format == CAMQP_FORMAT_COMMAND) {
- status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
+ status = cmd_create_putval(buffer, sizeof(buffer), metric_p);
if (status != 0) {
ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
return status;
size_t bfill = 0;
format_json_initialize(buffer, &bfill, &bfree);
- format_json_value_list(buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+ format_json_metric(buffer, &bfill, &bfree, metric_p, conf->store_rates);
format_json_finalize(buffer, &bfill, &bfree);
} else if (conf->format == CAMQP_FORMAT_GRAPHITE) {
status =
- format_graphite(buffer, sizeof(buffer), ds, vl, conf->prefix,
+ format_graphite(buffer, sizeof(buffer), metric_p, conf->prefix,
conf->postfix, conf->escape_char, conf->graphite_flags);
if (status != 0) {
ERROR("amqp plugin: format_graphite failed with status %i.", status);
} /* }}} int amqp1_notify */
-static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int amqp1_write(const metric_t *metric_p, /* {{{ */
user_data_t *user_data) {
int status = 0;
size_t bfree = BUFSIZE;
break;
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
- format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
+ format_json_metric((char *)cdm->mbuf.start, &bfill, &bfree, metric_p,
instance->store_rates);
status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
if (status != 0) {
*
* Authors:
* Tomas Menzl
+ * Manoj Srivastava <srivasta at google.com>
**/
#include "collectd.h"
*/
typedef struct temperature_list_s {
char *sensor_name; /**< sensor name/reference */
- size_t num_values; /**< number of values (usually one) */
bool initialized; /**< sensor already provides data */
struct temperature_list_s *next; /**< next in the list */
} temperature_list_t;
new_temp->sensor_name = strdup(sensor);
new_temp->initialized = 0;
- new_temp->num_values = 0;
if (new_temp->sensor_name == NULL) {
free(new_temp);
return -1;
static int get_reference_temperature(double *result) {
temperature_list_t *list = temp_list;
- gauge_t *values = NULL; /**< rate values */
- size_t values_num = 0; /**< number of rate values */
+ gauge_t value; /**< rate values */
gauge_t values_history[REF_TEMP_AVG_NUM];
dynamic changing of number of temperarure values in runtime yet (are
there any such cases?). */
if (!list->initialized) {
- if (uc_get_rate_by_name(list->sensor_name, &values, &values_num)) {
+ if (uc_get_rate_by_name(list->sensor_name, &value)) {
DEBUG(
"barometer: get_reference_temperature - rate \"%s\" not found yet",
list->sensor_name);
continue;
}
- DEBUG("barometer: get_reference_temperature - initialize \"%s\", %" PRIsz
+ DEBUG("barometer: get_reference_temperature - initialize \"%s\"" PRIsz
" vals",
- list->sensor_name, values_num);
+ list->sensor_name);
list->initialized = 1;
- list->num_values = values_num;
-
- for (size_t i = 0; i < values_num; ++i) {
- DEBUG("barometer: get_reference_temperature - rate %" PRIsz ": %lf **",
- i, values[i]);
- if (!isnan(values[i])) {
- avg_sum += values[i];
- ++avg_num;
- }
+
+ DEBUG("barometer: get_reference_temperature - rate " PRIsz ": %lf **",
+ value);
+ if (!isnan(value)) {
+ avg_sum += value;
+ ++avg_num;
}
- free(values);
- values = NULL;
}
/* It is OK to get here the first time as well, in the worst case
the history will full of NANs. */
if (uc_get_history_by_name(list->sensor_name, values_history,
- REF_TEMP_AVG_NUM, list->num_values)) {
+ REF_TEMP_AVG_NUM)) {
ERROR("barometer: get_reference_temperature - history \"%s\" lost",
list->sensor_name);
list->initialized = 0;
- list->num_values = 0;
list = list->next;
continue;
}
- for (size_t i = 0; i < REF_TEMP_AVG_NUM * list->num_values; ++i) {
+ for (size_t i = 0; i < REF_TEMP_AVG_NUM; ++i) {
DEBUG("barometer: get_reference_temperature - history %" PRIsz ": %lf", i,
values_history[i]);
if (!isnan(values_history[i])) {
if (avg_num == 0) /* still no history? fallback to current */
{
- if (uc_get_rate_by_name(list->sensor_name, &values, &values_num)) {
+ if (uc_get_rate_by_name(list->sensor_name, &value)) {
ERROR("barometer: get_reference_temperature - rate \"%s\" lost",
list->sensor_name);
list->initialized = 0;
- list->num_values = 0;
list = list->next;
continue;
}
- for (size_t i = 0; i < values_num; ++i) {
- DEBUG("barometer: get_reference_temperature - rate last %" PRIsz
- ": %lf **",
- i, values[i]);
- if (!isnan(values[i])) {
- avg_sum += values[i];
- ++avg_num;
- }
+ DEBUG("barometer: get_reference_temperature - rate last %" PRIsz
+ ": %lf **",
+ value);
+ if (!isnan(value)) {
+ avg_sum += value;
+ ++avg_num;
}
- free(values);
- values = NULL;
}
if (avg_num == 0) {
ERROR("barometer: get_reference_temperature - could not read \"%s\"",
list->sensor_name);
list->initialized = 0;
- list->num_values = 0;
} else {
average = avg_sum / (double)avg_num;
if (isnan(*result))
double temperature = 0.0;
double norm_pressure = 0.0;
- value_list_t vl = VALUE_LIST_INIT;
- value_t values[1];
+ metrics_list_t *index_p = NULL;
+ metrics_list_t *ml = NULL;
+ ml = (metrics_list_t *)malloc(sizeof(*ml));
+ if (ml == NULL)
+ return -1;
- DEBUG("barometer: MPL115_collectd_barometer_read");
+ index_p = ml;
+
+ index_p->metric.ds = NULL;
+ index_p->metric.identity = NULL;
+ index_p->metric.meta = NULL;
+ index_p->metric.time = cdtime();
+ index_p->metric.interval = plugin_get_interval();
+ index_p->next_p = NULL;
+ DEBUG("barometer: MPL115_collectd_barometer_read");
if (!configured) {
+ destroy_metrics_list(ml);
return -1;
}
}
result = MPL115_read_averaged(&pressure, &temperature);
- if (result)
+ if (result) {
+ destroy_metrics_list(ml);
return result;
+ }
norm_pressure = abs_to_mean_sea_level_pressure(pressure);
- sstrncpy(vl.plugin, "barometer", sizeof(vl.plugin));
- sstrncpy(vl.plugin_instance, "mpl115", sizeof(vl.plugin_instance));
+ index_p->metric.identity =
+ create_identity("barometer", "pressure", "value", NULL);
+ if (index_p->metric.identity == NULL) {
+ destroy_metrics_list(ml);
+ return -1;
+ }
+ result =
+ identity_add_label(index_p->metric.identity, "plugin_instance", "mpl115");
+ if (result) {
+ destroy_metrics_list(ml);
+ return result;
+ }
- vl.values_len = 1;
- vl.values = values;
+ result = identity_add_label(index_p->metric.identity, "type_instance",
+ "normalized");
+ if (result) {
+ destroy_metrics_list(ml);
+ return result;
+ }
- /* dispatch normalized air pressure */
- sstrncpy(vl.type, "pressure", sizeof(vl.type));
- sstrncpy(vl.type_instance, "normalized", sizeof(vl.type_instance));
- values[0].gauge = norm_pressure;
- plugin_dispatch_values(&vl);
+ index_p->metric.value.gauge = norm_pressure;
+ index_p = index_p->next_p;
+ index_p->metric.ds = NULL;
+ index_p->metric.identity = NULL;
+ index_p->metric.meta = NULL;
+ index_p->metric.time = ml->metric.time;
+ index_p->metric.interval = ml->metric.interval;
+ index_p->next_p = NULL;
+
+ index_p->metric.identity =
+ create_identity("barometer", "pressure", "value", NULL);
+ if (index_p->metric.identity == NULL) {
+ destroy_metrics_list(ml);
+ return -1;
+ }
+ result =
+ identity_add_label(index_p->metric.identity, "plugin_instance", "mpl115");
+ if (result) {
+ destroy_metrics_list(ml);
+ return result;
+ }
- /* dispatch absolute air pressure */
- sstrncpy(vl.type, "pressure", sizeof(vl.type));
- sstrncpy(vl.type_instance, "absolute", sizeof(vl.type_instance));
- values[0].gauge = pressure;
- plugin_dispatch_values(&vl);
+ result =
+ identity_add_label(index_p->metric.identity, "type_instance", "absolute");
+ if (result) {
+ destroy_metrics_list(ml);
+ return result;
+ }
+
+ index_p->metric.value.gauge = pressure;
+
+ index_p = index_p->next_p;
+ index_p->metric.ds = NULL;
+ index_p->metric.identity = NULL;
+ index_p->metric.meta = NULL;
+ index_p->metric.time = ml->metric.time;
+ index_p->metric.interval = ml->metric.interval;
+ index_p->next_p = NULL;
/* dispatch sensor temperature */
- sstrncpy(vl.type, "temperature", sizeof(vl.type));
- sstrncpy(vl.type_instance, "", sizeof(vl.type_instance));
- values[0].gauge = temperature;
- plugin_dispatch_values(&vl);
+ index_p->metric.identity =
+ create_identity("barometer", "temperature", "value", NULL);
- return 0;
+ index_p->metric.value.gauge = temperature;
+
+ result = plugin_dispatch_metric_list(ml);
+ destroy_metrics_list(ml);
+ return result;
}
/**
return ret;
}
-static int cu_notify(enum cache_event_type_e event_type, const value_list_t *vl,
+static int cu_notify(enum cache_event_type_e event_type, const metric_t *metric_p,
gauge_t old_uptime, gauge_t new_uptime) {
notification_t n;
- NOTIFICATION_INIT_VL(&n, vl);
+ notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
int status;
char *buf = n.message;
size_t bufsize = sizeof(n.message);
- n.time = vl->time;
+ n.time = metric_p->time;
const char *service = "Service";
- if (strcmp(vl->plugin, "uptime") == 0)
+ if (strcmp(metric_p->plugin, "uptime") == 0)
service = "Host";
switch (event_type) {
/* For CE_VALUE_EXPIRED */
int ret;
- value_t *values;
- size_t values_num;
+ value_t value;
gauge_t old_uptime = NAN;
switch (event->type) {
case CE_VALUE_NEW:
DEBUG("check_uptime: CE_VALUE_NEW, %s", event->value_list_name);
- if (c_avl_get(types_tree, event->value_list->type, NULL) == 0) {
+ if (c_avl_get(types_tree, event->metric_p->type, NULL) == 0) {
event->ret = 1;
- assert(event->value_list->values_len > 0);
- cu_notify(CE_VALUE_NEW, event->value_list, NAN /* old */,
- event->value_list->values[0].gauge /* new */);
+ cu_notify(CE_VALUE_NEW, event->metric_p, NAN /* old */,
+ event->metric_p->value.gauge /* new */);
}
break;
case CE_VALUE_UPDATE:
DEBUG("check_uptime: CE_VALUE_UPDATE, %s", event->value_list_name);
- if (uc_get_history_by_name(event->value_list_name, values_history, 2, 1)) {
+ if (uc_get_history_by_name(event->value_list_name, values_history, 2)) {
ERROR("check_uptime plugin: Failed to get value history for %s.",
event->value_list_name);
} else {
if (!isnan(values_history[0]) && !isnan(values_history[1]) &&
values_history[0] < values_history[1]) {
- cu_notify(CE_VALUE_UPDATE, event->value_list,
+ cu_notify(CE_VALUE_UPDATE, event->metric_p,
values_history[1] /* old */, values_history[0] /* new */);
}
}
break;
case CE_VALUE_EXPIRED:
DEBUG("check_uptime: CE_VALUE_EXPIRED, %s", event->value_list_name);
- ret = uc_get_value_by_name(event->value_list_name, &values, &values_num);
+ ret = uc_get_value_by_name(event->value_list_name, &value);
if (ret == 0) {
- old_uptime = values[0].gauge;
- sfree(values);
+ old_uptime = value.gauge;
}
- cu_notify(CE_VALUE_EXPIRED, event->value_list, old_uptime, NAN /* new */);
+ cu_notify(CE_VALUE_EXPIRED, event->metric_p, old_uptime, NAN /* new */);
break;
}
return 0;
#include <fnmatch.h>
#include <stdbool.h>
-#include "collectd.h"
#include "plugin.h"
#include "utils/common/common.h"
+#include "collectd.h"
#include "daemon/utils_cache.h"
}
* helper functions
*/
-static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
+static bool ident_matches(const metric_t *vl, const metric_t *matcher) {
if (fnmatch(matcher->host, vl->host, 0))
return false;
return status;
}
- status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
+ status = format_values_vl(payload, sizeof(payload), ds, vl, conf->store_rates);
if (status != 0) {
ERROR("mqtt plugin: format_values failed with status %d.", status);
return status;
/* do not automatically get the thread specific Perl interpreter */
#define PERL_NO_GET_CONTEXT
-#include "collectd.h"
#include <stdbool.h>
+#include "collectd.h"
#include <EXTERN.h>
#include <perl.h>
static XS(Collectd_call_by_name);
static int perl_read(user_data_t *ud);
-static int perl_write(const data_set_t *ds, const value_list_t *vl,
- user_data_t *user_data);
+static int perl_write(const metric_t *metric_p, user_data_t *user_data);
static void perl_log(int level, const char *msg, user_data_t *user_data);
static int perl_notify(const notification_t *notif, user_data_t *user_data);
static int perl_flush(cdtime_t timeout, const char *identifier,
return ds->ds_num;
} /* static size_t av2value (char *, AV *, value_t *, size_t) */
+static int hv2metric_list(pTHX_ HV *hash, metric_t *metric_p) {
+ if ((NULL == hash) || (NULL == metric_p))
+ return -1;
+ SV **tmp = av_fetch(array, 0, 0);
+
+ if (NULL != tmp) {
+ if (DS_TYPE_COUNTER == metric_p->value_ds_type)
+ metric_p->value.counter = SvIV(*tmp);
+ else if (DS_TYPE_GAUGE == metric_p->value_ds_type)
+ metric_p->value.gauge = SvNV(*tmp);
+ else if (DS_TYPE_DERIVE == metric_p->value_ds_type)
+ metric_p->value.derive = SvIV(*tmp);
+ else if (DS_TYPE_ABSOLUTE == metric_p->value_ds_type)
+ metric_p->value.absolute = SvIV(*tmp);
+ } else {
+ return 0;
+ }
+ return 1;
+}
+
+
/*
* value list:
* {
if ((NULL != data_set) && (0 != av2data_set(aTHX_ data_set, vl.type, &ds)))
return -1;
- ret = plugin_write(plugin, NULL == data_set ? NULL : &ds, &vl);
+ ret = plugin_write(plugin, &metric);
if (0 != ret)
log_warn("Dispatching value to plugin \"%s\" failed with status %i.",
NULL == plugin ? "<any>" : plugin, ret);
return pplugin_call(aTHX_ PLUGIN_READ, user_data->data);
} /* static int perl_read (user_data_t *user_data) */
-static int perl_write(const data_set_t *ds, const value_list_t *vl,
+static int perl_write(const metric_t *metric_p,
user_data_t *user_data) {
int status;
dTHX;
log_debug("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX,
perl_threads->number_of_threads);
- status = pplugin_call(aTHX_ PLUGIN_WRITE, user_data->data, ds, vl);
+ status = pplugin_call(aTHX_ PLUGIN_WRITE, user_data->data, metric_p);
if (aTHX == perl_threads->head->interp)
pthread_mutex_unlock(&perl_threads->mutex);
else if (store_rates) {
if (rates == NULL)
rates = uc_get_rate_vl(ds, vl);
-
if (rates == NULL) {
log_err("c_psql_write: Failed to determine rate");
return NULL;
int ret;
const data_set_t *ds;
size_t size;
- value_t *value;
- value_list_t value_list = VALUE_LIST_INIT;
+ metric_t metric = STRUCT_METRIC_INIT;
PyObject *values = self->values, *meta = self->meta;
double time = self->data.time, interval = self->interval;
- char *host = NULL, *plugin = NULL, *plugin_instance = NULL, *type = NULL,
- *type_instance = NULL, *dest = NULL;
+ char *host = NULL, *plugin = NULL, *type = NULL,
+ *data_source = NULL, *dest = NULL;
static char *kwlist[] = {
- "destination", "type", "values", "plugin_instance",
- "type_instance", "plugin", "host", "time",
+ "destination", "type", "values",
+ "dat_source", "plugin", "host", "time",
"interval", "meta", NULL};
if (!PyArg_ParseTupleAndKeywords(
args, kwds, "et|etOetetetetdiO", kwlist, NULL, &dest, NULL, &type,
NULL, &host, &time, &interval, &meta))
return NULL;
+ metric.identity = create_identity((plugin ? plugin : self->data.plugin),
+ (type ? type : self->data.type),
+ (host ? host : self->data.host));
+
sstrncpy(value_list.host, host ? host : self->data.host,
sizeof(value_list.host));
sstrncpy(value_list.plugin, plugin ? plugin : self->data.plugin,
PyErr_SetString(PyExc_RuntimeError, "type not set");
return NULL;
}
- ds = plugin_get_ds(value_list.type);
+ ds = plugin_get_ds((type ? type : self->data.type));
if (ds == NULL) {
PyErr_Format(PyExc_TypeError, "Dataset %s not found", value_list.type);
return NULL;
#include "collectd.h"
+#include <regex.h>
#include "utils/avltree/avltree.h"
#include "utils/common/common.h"
#include "utils_cache.h"
#include "utils_llist.h"
-#include <regex.h>
#include <net-snmp/net-snmp-config.h>
#define GROUP_UNUSED -1
#define OID_EXISTS 1
#define MAX_KEY_SOURCES 5
-#define MAX_INDEX_KEYS 5
#define MAX_MATCHES 5
/* Identifies index key source */
enum index_key_src_e {
INDEX_HOST = 0,
INDEX_PLUGIN,
- INDEX_PLUGIN_INSTANCE,
INDEX_TYPE,
- INDEX_TYPE_INSTANCE
+ INDEX_DATA_SOURCE,
+ MAX_INDEX_KEYS
};
typedef enum index_key_src_e index_key_src_t;
c_avl_tree_t *index_instance;
c_avl_tree_t *instance_oids; /* Tells us how many OIDs registered for every
instance; */
- index_key_t index_keys[MAX_INDEX_KEYS]; /* Stores information about what each
- index key represents */
+ index_key_t index_keys[MAX_INDEX_KEYS - 1]; /* Stores information about what
+ each index key represents */
int index_keys_len;
netsnmp_variable_list *index_list_cont; /* Index key container used for
generating as well as parsing
struct data_definition_s {
char *name;
char *plugin;
- char *plugin_instance;
char *type;
- char *type_instance;
+ char *data_source;
const table_definition_t *table;
bool is_index_key; /* indicates if table column is an index key */
int index_key_pos; /* position in indexes list */
static const char *index_opts[MAX_KEY_SOURCES] = {
"Hostname", "Plugin", "PluginInstance", "Type", "TypeInstance"};
-#define CHECK_DD_TYPE(_dd, _p, _pi, _t, _ti) \
+#define CHECK_DD_TYPE(_dd, _p, _t, _ds) \
(_dd->plugin ? !strcmp(_dd->plugin, _p) : 0) && \
- (_dd->plugin_instance ? !strcmp(_dd->plugin_instance, _pi) : 1) && \
(_dd->type ? !strcmp(_dd->type, _t) : 0) && \
- (_dd->type_instance ? !strcmp(_dd->type_instance, _ti) : 1)
+ (_dd->data_source ? !strcmp(_dd->data_source, _ds) : 1)
static int snmp_agent_shutdown(void);
static void *snmp_agent_thread_run(void *arg);
return -EINVAL;
}
- if (dd->plugin_instance) {
- ERROR(PLUGIN_NAME ": PluginInstance should not be defined for table "
- "data type '%s'.'%s'",
- td->name, dd->name);
- return -EINVAL;
- }
-
if (dd->oids_len == 0) {
ERROR(PLUGIN_NAME ": No OIDs defined for '%s'.'%s'", td->name,
dd->name);
}
if (dd->is_index_key) {
- if (dd->type || dd->type_instance) {
- ERROR(PLUGIN_NAME ": Type and TypeInstance are not valid for "
+ if (dd->type) {
+ ERROR(PLUGIN_NAME ": Type is not valid for "
"index data '%s'.'%s'",
td->name, dd->name);
return -EINVAL;
}
static int snmp_agent_fill_index_list(table_definition_t *td,
- value_list_t const *vl) {
+ metric_t const *metric_p) {
int ret;
int i;
netsnmp_variable_list *key = td->index_list_cont;
char const *ptr;
+ char *host_p = NULL;
for (i = 0; i < td->index_keys_len; i++) {
/* var should never be NULL */
/* Generating list filled with all data necessary to generate an OID */
switch (source) {
case INDEX_HOST:
- ptr = vl->host;
+ ret = c_avl_get(metric_p->identity->root_p, (void *)"__host__",
+ (void **)&host_p);
+ if (ret != 0) {
+ ERROR(PLUGIN_NAME ": Unknown index key resource host");
+ return -EINVAL;
+ }
+ ptr = host_p;
break;
case INDEX_PLUGIN:
- ptr = vl->plugin;
- break;
- case INDEX_PLUGIN_INSTANCE:
- ptr = vl->plugin_instance;
+ ptr = metric_p->plugin;
break;
case INDEX_TYPE:
- ptr = vl->type;
+ ptr = metric_p->type;
break;
- case INDEX_TYPE_INSTANCE:
- ptr = vl->type_instance;
+ case INDEX_DATA_SOURCE:
+ ptr = metric_p->ds->name;
break;
default:
ERROR(PLUGIN_NAME ": Unknown index key source provided");
switch (td->index_keys[i].source) {
case INDEX_HOST:
case INDEX_PLUGIN:
- case INDEX_PLUGIN_INSTANCE:
case INDEX_TYPE:
- case INDEX_TYPE_INSTANCE:
+ case INDEX_DATA_SOURCE:
snmp_varlist_add_variable(index_list, NULL, 0, td->index_keys[i].type,
NULL, 0);
break;
}
static int snmp_agent_generate_index(table_definition_t *td,
- value_list_t const *vl, oid_t *index_oid) {
+ metric_t const *metric_p,
+ oid_t *index_oid) {
/* According to given information by index_keys list
* index OID is going to be built
*/
- int ret = snmp_agent_fill_index_list(td, vl);
+ int ret = snmp_agent_fill_index_list(td, metric_p);
if (ret != 0)
return -EINVAL;
}
}
-static int snmp_agent_clear_missing(const value_list_t *vl,
+static int snmp_agent_clear_missing(const metric_t *metric_p,
__attribute__((unused)) user_data_t *ud) {
- if (vl == NULL)
+ if (metric_p == NULL)
return -EINVAL;
for (llentry_t *te = llist_head(g_agent->tables); te != NULL; te = te->next) {
data_definition_t *dd = de->value;
if (!dd->is_index_key) {
- if (CHECK_DD_TYPE(dd, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance)) {
+ if (CHECK_DD_TYPE(dd, metric_p->plugin, metric_p->type,
+ metric_p->ds->name)) {
oid_t *index_oid = calloc(1, sizeof(*index_oid));
if (index_oid == NULL) {
return -ENOMEM;
}
- int ret = snmp_agent_generate_index(td, vl, index_oid);
+ int ret = snmp_agent_generate_index(td, metric_p, index_oid);
if (ret == 0)
snmp_agent_table_data_remove(dd, td, index_oid);
sfree((*dd)->name);
sfree((*dd)->plugin);
- sfree((*dd)->plugin_instance);
sfree((*dd)->type);
- sfree((*dd)->type_instance);
+ sfree((*dd)->data_source);
sfree((*dd)->oids);
sfree(*dd);
if (index_oid == NULL) {
/* It's a scalar */
- format_name(name, name_len, hostname_g, dd->plugin, dd->plugin_instance,
- dd->type, dd->type_instance);
+ snprintf(name, name_len, "%s/%s/%s/%s",
+ (hostname_g == NULL) ? "" : hostname_g,
+ (dd->plugin == NULL) ? "" : dd->plugin,
+ (dd->type == NULL) ? "" : dd->type,
+ (dd->data_source == NULL) ? "" : dd->data_source);
} else {
/* Need to parse string index OID */
const table_definition_t *td = dd->table;
netsnmp_variable_list *key = td->index_list_cont;
char str[DATA_MAX_NAME_LEN];
char *fields[MAX_KEY_SOURCES] = {hostname_g, dd->plugin,
- dd->plugin_instance, dd->type,
- dd->type_instance};
+ dd->type,
+ dd->data_source};
/* Looking for simple keys only */
while (key != NULL) {
if (!td->index_keys[i].regex) {
index_key_src_t source = td->index_keys[i].source;
- if (source < INDEX_HOST || source > INDEX_TYPE_INSTANCE) {
+ if (source < INDEX_HOST || source > INDEX_DATA_SOURCE) {
ERROR(PLUGIN_NAME ": Unkown index key source!");
return -EINVAL;
}
if (ret != 0)
return ret;
}
- format_name(name, name_len, fields[INDEX_HOST], fields[INDEX_PLUGIN],
- fields[INDEX_PLUGIN_INSTANCE], fields[INDEX_TYPE],
- fields[INDEX_TYPE_INSTANCE]);
+ snprintf(name, name_len, fields[INDEX_HOST], fields[INDEX_PLUGIN],
+ fields[INDEX_TYPE],fields[INDEX_DATA_SOURCE]);
for (i = 0; i < MAX_KEY_SOURCES; i++) {
if (td->tokens[i])
sfree(fields[i]);
DEBUG(PLUGIN_NAME ": Identifier '%s'", name);
- value_t *values;
- size_t values_num;
+ value_t value;
const data_set_t *ds = plugin_get_ds(dd->type);
if (ds == NULL) {
ERROR(PLUGIN_NAME ": Data set not found for '%s' type", dd->type);
return SNMP_NOSUCHINSTANCE;
}
- ret = uc_get_value_by_name(name, &values, &values_num);
+ ret = uc_get_value_by_name(name, &value);
if (ret != 0) {
ERROR(PLUGIN_NAME ": Failed to get value for '%s'", name);
return SNMP_NOSUCHINSTANCE;
}
- assert(ds->ds_num == values_num);
- assert(oid_index < (int)values_num);
-
char data[DATA_MAX_NAME_LEN];
size_t data_len = sizeof(data);
- ret = snmp_agent_set_vardata(
- data, &data_len, dd->oids[oid_index].type, dd->scale, dd->shift,
- &values[oid_index], sizeof(values[oid_index]), ds->ds[oid_index].type);
-
- sfree(values);
+ ret = snmp_agent_set_vardata(data, &data_len, dd->oids[oid_index].type,
+ dd->scale, dd->shift, &value, sizeof(value),
+ ds->ds[oid_index].type);
- if (ret != 0) {
- ERROR(PLUGIN_NAME ": Failed to convert '%s' value to snmp data", name);
- return SNMP_NOSUCHINSTANCE;
- }
+ ERROR(PLUGIN_NAME ": Failed to convert '%s' value to snmp data", name);
+ return SNMP_NOSUCHINSTANCE;
requests->requestvb->type = dd->oids[oid_index].type;
snmp_set_var_typed_value(requests->requestvb, requests->requestvb->type,
option_tmp = option;
} else if (strcasecmp("Plugin", option->key) == 0)
ret = cf_util_get_string(option, &dd->plugin);
- else if (strcasecmp("PluginInstance", option->key) == 0)
- ret = cf_util_get_string(option, &dd->plugin_instance);
else if (strcasecmp("Type", option->key) == 0)
ret = cf_util_get_string(option, &dd->type);
- else if (strcasecmp("TypeInstance", option->key) == 0)
- ret = cf_util_get_string(option, &dd->type_instance);
+ else if (strcasecmp("DatSource", option->key) == 0)
+ ret = cf_util_get_string(option, &dd->data_source);
else if (strcasecmp("Shift", option->key) == 0)
ret = cf_util_get_double(option, &dd->shift);
else if (strcasecmp("Scale", option->key) == 0)
return ret;
}
-static int snmp_agent_write(value_list_t const *vl) {
- if (vl == NULL)
+static int snmp_agent_write(metric_t const *metric_p) {
+ if (metric_p == NULL)
return -EINVAL;
for (llentry_t *te = llist_head(g_agent->tables); te != NULL; te = te->next) {
data_definition_t *dd = de->value;
if (!dd->is_index_key) {
- if (CHECK_DD_TYPE(dd, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance)) {
+ if (CHECK_DD_TYPE(dd, metric_p->plugin, metric_p->type,
+ metric_p->ds->name)) {
oid_t *index_oid = calloc(1, sizeof(*index_oid));
bool free_index_oid = true;
return -ENOMEM;
}
- int ret = snmp_agent_generate_index(td, vl, index_oid);
+ int ret = snmp_agent_generate_index(td, metric_p, index_oid);
if (ret == 0)
ret = snmp_agent_update_index(dd, td, index_oid, &free_index_oid);
return 0;
}
-static int snmp_agent_collect(const data_set_t *ds, const value_list_t *vl,
+static int snmp_agent_collect(const metric_t *metric_p,
user_data_t __attribute__((unused)) * user_data) {
pthread_mutex_lock(&g_agent->lock);
- snmp_agent_write(vl);
+ snmp_agent_write(metric_p);
pthread_mutex_unlock(&g_agent->lock);
threshold_t *th_ptr;
int status = 0;
- if (format_name(name, sizeof(name), th->host, th->plugin, th->plugin_instance,
- th->type, th->type_instance) != 0) {
+ if (snprintf(
+ name, sizeof(name), "%s/%s/%s/%s", (th->host == NULL) ? "" : th->host,
+ (th->plugin == NULL) ? "" : th->plugin,
+ (th->type == NULL) ? "" : th->type,
+ (th->data_source == NULL) ? "" : th->data_source) > sizeof(name)) {
ERROR("ut_threshold_add: format_name failed.");
return -1;
}
pthread_mutex_lock(&threshold_lock);
- th_ptr = threshold_get(th->host, th->plugin, th->plugin_instance, th->type,
- th->type_instance);
+ th_ptr = threshold_get(th->host, th->plugin, th->type, th->data_source);
while ((th_ptr != NULL) && (th_ptr->next != NULL))
th_ptr = th_ptr->next;
for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *option = ci->children + i;
- if (strcasecmp("Instance", option->key) == 0)
- status = cf_util_get_string_buffer(option, th.type_instance,
- sizeof(th.type_instance));
- else if (strcasecmp("DataSource", option->key) == 0)
+ if (strcasecmp("DataSource", option->key) == 0)
status = cf_util_get_string_buffer(option, th.data_source,
sizeof(th.data_source));
else if (strcasecmp("WarningMax", option->key) == 0)
if (strcasecmp("Type", option->key) == 0)
status = ut_config_type(&th, option);
- else if (strcasecmp("Instance", option->key) == 0)
- status = cf_util_get_string_buffer(option, th.plugin_instance,
- sizeof(th.plugin_instance));
+ else if (strcasecmp("Source", option->key) == 0)
+ status = cf_util_get_string_buffer(option, th.data_source,
+ sizeof(th.data_source));
else {
WARNING("threshold values: Option `%s' not allowed inside a `Plugin' "
"block.",
* if appropriate.
* Does not fail.
*/
-static int ut_report_state(const data_set_t *ds, const value_list_t *vl,
- const threshold_t *th, const gauge_t *values,
- int ds_index, int state) { /* {{{ */
+static int ut_report_state(const metric_t *metric_p, const threshold_t *th,
+ const gauge_t value, int state) { /* {{{ */
int state_old;
notification_t n;
/* Check if hits matched */
if ((th->hits != 0)) {
- int hits = uc_get_hits(ds, vl);
+ int hits = uc_get_hits(metric_p);
/* STATE_OKAY resets hits unless PERSIST_OK flag is set. Hits resets if
* threshold is hit. */
if (((state == STATE_OKAY) && ((th->flags & UT_FLAG_PERSIST_OK) == 0)) ||
(hits > th->hits)) {
- DEBUG("ut_report_state: reset uc_get_hits = 0");
- uc_set_hits(ds, vl, 0); /* reset hit counter and notify */
+ DEBUG("ut_report_state: reset uc_get_hits_vl = 0");
+ uc_set_hits(metric_p, 0); /* reset hit counter and notify */
} else {
DEBUG("ut_report_state: th->hits = %d, uc_get_hits = %d", th->hits,
- uc_get_hits(ds, vl));
- (void)uc_inc_hits(ds, vl, 1); /* increase hit counter */
+ uc_get_hits(mstric_p));
+ (void)uc_inc_hits(metric_p, 1); /* increase hit counter */
return 0;
}
} /* end check hits */
- state_old = uc_get_state(ds, vl);
+ state_old = uc_get_state(metric_p);
/* If the state didn't change, report if `persistent' is specified. If the
* state is `okay', then only report if `persist_ok` flag is set. */
}
if (state != state_old)
- uc_set_state(ds, vl, state);
+ uc_set_state(metric_p, state);
- NOTIFICATION_INIT_VL(&n, vl);
+ notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
buf = n.message;
bufsize = sizeof(n.message);
else
n.severity = NOTIF_FAILURE;
- n.time = vl->time;
-
- status = ssnprintf(buf, bufsize, "Host %s, plugin %s", vl->host, vl->plugin);
- buf += status;
- bufsize -= status;
-
- if (vl->plugin_instance[0] != '\0') {
- status = ssnprintf(buf, bufsize, " (instance %s)", vl->plugin_instance);
- buf += status;
- bufsize -= status;
- }
+ n.time = metric_p->time;
- status = ssnprintf(buf, bufsize, " type %s", vl->type);
+ status = ssnprintf(buf, bufsize, "Name %s", metric_p->identity->name);
buf += status;
bufsize -= status;
- if (vl->type_instance[0] != '\0') {
- status = ssnprintf(buf, bufsize, " (instance %s)", vl->type_instance);
- buf += status;
- bufsize -= status;
+ if (metric_p->identity->root_p != NULL) {
+ c_avl_iterator_t *iter_p = c_avl_get_iterator(metric_p->identity->root_p);
+ if (iter_p != NULL) {
+ char *key_p = NULL;
+ char *value_p = NULL;
+ while ((c_avl_iterator_next(iter_p, (void **)&key_p,
+ (void **)&value_p)) == 0) {
+ if ((key_p != NULL) && (value_p != NULL)) {
+ int tmp_str_len = strlen(key_p) + strlen(value_p) + 2;
+ if (tmp_str_len < bufsize) {
+ status = ssnprintf(buf, bufsize, "%s %s", key_p, value_p);
+ buf += status;
+ bufsize -= status;
+ }
+ }
+ }
+ }
+ c_avl_iterator_destroy(iter_p);
}
- plugin_notification_meta_add_string(&n, "DataSource", ds->ds[ds_index].name);
- plugin_notification_meta_add_double(&n, "CurrentValue", values[ds_index]);
+ plugin_notification_meta_add_string(&n, "DataSource", metric_p->ds->name);
+ plugin_notification_meta_add_double(&n, "CurrentValue", value);
plugin_notification_meta_add_double(&n, "WarningMin", th->warning_min);
plugin_notification_meta_add_double(&n, "WarningMax", th->warning_max);
plugin_notification_meta_add_double(&n, "FailureMin", th->failure_min);
ssnprintf(buf, bufsize,
": All data sources are within range again. "
"Current value of \"%s\" is %f.",
- ds->ds[ds_index].name, values[ds_index]);
+ metric_p->ds->name, value);
} else if (state == STATE_UNKNOWN) {
ERROR("ut_report_state: metric transition to UNKNOWN from a different "
"state. This shouldn't happen.");
ssnprintf(buf, bufsize,
": Data source \"%s\" is currently "
"%f. That is within the %s region of %f%s and %f%s.",
- ds->ds[ds_index].name, values[ds_index],
+ metric_p->ds->name, value,
(state == STATE_ERROR) ? "failure" : "warning", min,
((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "", max,
((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "");
ssnprintf(buf, bufsize,
": Data source \"%s\" is currently "
"%f. That is %s the %s threshold of %f%s.",
- ds->ds[ds_index].name, values[ds_index],
- isnan(min) ? "below" : "above",
+ metric_p->ds->name, value, isnan(min) ? "below" : "above",
(state == STATE_ERROR) ? "failure" : "warning",
isnan(min) ? max : min,
((th->flags & UT_FLAG_PERCENTAGE) != 0) ? "%" : "");
}
} else if (th->flags & UT_FLAG_PERCENTAGE) {
- gauge_t value;
- gauge_t sum;
-
- sum = 0.0;
- for (size_t i = 0; i < vl->values_len; i++) {
- if (isnan(values[i]))
- continue;
-
- sum += values[i];
- }
-
- if (sum == 0.0)
- value = NAN;
- else
- value = 100.0 * values[ds_index] / sum;
-
ssnprintf(buf, bufsize,
": Data source \"%s\" is currently "
"%g (%.2f%%). That is %s the %s threshold of %.2f%%.",
- ds->ds[ds_index].name, values[ds_index], value,
+ metric_p->ds->name, value, value,
(value < min) ? "below" : "above",
(state == STATE_ERROR) ? "failure" : "warning",
(value < min) ? min : max);
ssnprintf(buf, bufsize,
": Data source \"%s\" is currently "
"%f. That is %s the %s threshold of %f.",
- ds->ds[ds_index].name, values[ds_index],
- (values[ds_index] < min) ? "below" : "above",
+ metric_p->ds->name, value, (value < min) ? "below" : "above",
(state == STATE_ERROR) ? "failure" : "warning",
- (values[ds_index] < min) ? min : max);
+ (value < min) ? min : max);
}
}
* appropriate.
* Does not fail.
*/
-static int ut_check_one_data_source(
- const data_set_t *ds, const value_list_t __attribute__((unused)) * vl,
- const threshold_t *th, const gauge_t *values, int ds_index) { /* {{{ */
- const char *ds_name;
+static int ut_check_one_data_source(const metric_t *metric_p,
+ const threshold_t *th,
+ const gauge_t value) { /* {{{ */
int is_warning = 0;
int is_failure = 0;
int prev_state = STATE_OKAY;
/* check if this threshold applies to this data source */
- if (ds != NULL) {
- ds_name = ds->ds[ds_index].name;
- if ((th->data_source[0] != 0) && (strcmp(ds_name, th->data_source) != 0))
- return STATE_UNKNOWN;
- }
+ if ((th->data_source[0] != 0) &&
+ (strcmp(metric_p->ds->name, th->data_source) != 0))
+ return STATE_UNKNOWN;
if ((th->flags & UT_FLAG_INVERT) != 0) {
is_warning--;
/* XXX: This is an experimental code, not optimized, not fast, not reliable,
* and probably, do not work as you expect. Enjoy! :D */
if (th->hysteresis > 0) {
- prev_state = uc_get_state(ds, vl);
+ prev_state = uc_get_state(metric_p);
/* The purpose of hysteresis is elliminating flapping state when the value
* oscilates around the thresholds. In other words, what is important is
* the previous state; if the new value would trigger a transition, make
}
if ((!isnan(th->failure_min) &&
- (th->failure_min + hysteresis_for_failure > values[ds_index])) ||
+ (th->failure_min + hysteresis_for_failure > value)) ||
(!isnan(th->failure_max) &&
- (th->failure_max - hysteresis_for_failure < values[ds_index])))
+ (th->failure_max - hysteresis_for_failure < value)))
is_failure++;
if ((!isnan(th->warning_min) &&
- (th->warning_min + hysteresis_for_warning > values[ds_index])) ||
+ (th->warning_min + hysteresis_for_warning > value)) ||
(!isnan(th->warning_max) &&
- (th->warning_max - hysteresis_for_warning < values[ds_index])))
+ (th->warning_max - hysteresis_for_warning < value)))
is_warning++;
} else { /* no hysteresis */
- if ((!isnan(th->failure_min) && (th->failure_min > values[ds_index])) ||
- (!isnan(th->failure_max) && (th->failure_max < values[ds_index])))
+ if ((!isnan(th->failure_min) && (th->failure_min > value)) ||
+ (!isnan(th->failure_max) && (th->failure_max < value)))
is_failure++;
- if ((!isnan(th->warning_min) && (th->warning_min > values[ds_index])) ||
- (!isnan(th->warning_max) && (th->warning_max < values[ds_index])))
+ if ((!isnan(th->warning_min) && (th->warning_min > value)) ||
+ (!isnan(th->warning_max) && (th->warning_max < value)))
is_warning++;
}
* defined.
* Returns less than zero if the data set doesn't have any data sources.
*/
-static int ut_check_one_threshold(const data_set_t *ds, const value_list_t *vl,
- const threshold_t *th, const gauge_t *values,
- int *ret_ds_index) { /* {{{ */
+static int ut_check_one_threshold(const metric_t *metric_p,
+ const threshold_t *th,
+ const gauge_t value) { /* {{{ */
int ret = -1;
- int ds_index = -1;
- gauge_t values_copy[ds->ds_num];
-
- memcpy(values_copy, values, sizeof(values_copy));
+ gauge_t values_copy = value;
if ((th->flags & UT_FLAG_PERCENTAGE) != 0) {
int num = 0;
gauge_t sum = 0.0;
- if (ds->ds_num == 1) {
- WARNING(
- "ut_check_one_threshold: The %s type has only one data "
- "source, but you have configured to check this as a percentage. "
- "That doesn't make much sense, because the percentage will always "
- "be 100%%!",
- ds->type);
- }
-
/* Prepare `sum' and `num'. */
- for (size_t i = 0; i < ds->ds_num; i++)
- if (!isnan(values[i])) {
- num++;
- sum += values[i];
- }
+ if (!isnan(value)) {
+ num++;
+ sum += value;
+ }
if ((num == 0) /* All data sources are undefined. */
|| (sum == 0.0)) /* Sum is zero, cannot calculate percentage. */
{
- for (size_t i = 0; i < ds->ds_num; i++)
- values_copy[i] = NAN;
+ values_copy = NAN;
} else /* We can actually calculate the percentage. */
{
- for (size_t i = 0; i < ds->ds_num; i++)
- values_copy[i] = 100.0 * values[i] / sum;
+ values_copy = 100.0;
}
} /* if (UT_FLAG_PERCENTAGE) */
- for (size_t i = 0; i < ds->ds_num; i++) {
- int status;
-
- status = ut_check_one_data_source(ds, vl, th, values_copy, i);
- if (ret < status) {
- ret = status;
- ds_index = i;
- }
- } /* for (ds->ds_num) */
-
- if (ret_ds_index != NULL)
- *ret_ds_index = ds_index;
+ int status;
+ status = ut_check_one_data_source(metric_p, th, values_copy);
+ if (ret < status) {
+ ret = status;
+ }
return ret;
} /* }}} int ut_check_one_threshold */
* Returns zero on success and if no threshold has been configured. Returns
* less than zero on failure.
*/
-static int ut_check_threshold(const data_set_t *ds, const value_list_t *vl,
+static int ut_check_threshold(const metric_t *metric_p,
__attribute__((unused))
user_data_t *ud) { /* {{{ */
threshold_t *th;
- gauge_t *values;
+ gauge_t value;
int status;
int worst_state = -1;
threshold_t *worst_th = NULL;
- int worst_ds_index = -1;
if (threshold_tree == NULL)
return 0;
/* Is this lock really necessary? So far, thresholds are only inserted at
* startup. -octo */
pthread_mutex_lock(&threshold_lock);
- th = threshold_search(vl);
+ th = threshold_search(metric_p);
pthread_mutex_unlock(&threshold_lock);
if (th == NULL)
return 0;
DEBUG("ut_check_threshold: Found matching threshold(s)");
- values = uc_get_rate_vl(ds, vl);
- if (values == NULL)
+ status = uc_get_rate(metric_p, &value);
+ if (status != 0)
return 0;
while (th != NULL) {
- int ds_index = -1;
- status = ut_check_one_threshold(ds, vl, th, values, &ds_index);
+ status = ut_check_one_threshold(metric_p, th, value);
if (status < 0) {
ERROR("ut_check_threshold: ut_check_one_threshold failed.");
- sfree(values);
return -1;
}
if (worst_state < status) {
worst_state = status;
worst_th = th;
- worst_ds_index = ds_index;
}
th = th->next;
} /* while (th) */
- status =
- ut_report_state(ds, vl, worst_th, values, worst_ds_index, worst_state);
+ status = ut_report_state(metric_p, worst_th, value, worst_state);
if (status != 0) {
ERROR("ut_check_threshold: ut_report_state failed.");
- sfree(values);
return -1;
}
- sfree(values);
-
return 0;
} /* }}} int ut_check_threshold */
*
* This function is called whenever a value goes "missing".
*/
-static int ut_missing(const value_list_t *vl,
+static int ut_missing(const metric_t *metric_p,
__attribute__((unused)) user_data_t *ud) { /* {{{ */
threshold_t *th;
cdtime_t missing_time;
- char identifier[6 * DATA_MAX_NAME_LEN];
+ char *identifier_p = NULL;
notification_t n;
cdtime_t now;
if (threshold_tree == NULL)
return 0;
- th = threshold_search(vl);
+ th = threshold_search(metric_p);
/* dispatch notifications for "interesting" values only */
if ((th == NULL) || ((th->flags & UT_FLAG_INTERESTING) == 0))
return 0;
now = cdtime();
- missing_time = now - vl->time;
- FORMAT_VL(identifier, sizeof(identifier), vl);
+ missing_time = now - metric_p->time;
+ if ((identifier_p = plugin_format_metric(metric_p)) != 0) {
+ ERROR("uc_update: plugin_format_metric failed.");
+ }
- NOTIFICATION_INIT_VL(&n, vl);
+ notification_init_metric(&n, NOTIF_FAILURE, NULL, metric_p);
ssnprintf(n.message, sizeof(n.message),
- "%s has not been updated for %.3f seconds.", identifier,
+ "%s has not been updated for %.3f seconds.", identifier_p,
CDTIME_T_TO_DOUBLE(missing_time));
n.time = now;
+ sfree(identifier_p);
plugin_dispatch_notification(&n);
return 0;
}
-static int wg_write_messages(const data_set_t *ds, const value_list_t *vl,
+static int wg_write_messages(const metric_t *metric_p,
struct wg_callback *cb) {
char buffer[WG_SEND_BUF_SIZE] = {0};
int status;
- if (0 != strcmp(ds->type, vl->type)) {
- ERROR("write_graphite plugin: DS type does not match "
- "value list type");
- return -1;
- }
-
- status = format_graphite(buffer, sizeof(buffer), ds, vl, cb->prefix,
+ status = format_graphite(buffer, sizeof(buffer), metric_p, cb->prefix,
cb->postfix, cb->escape_char, cb->format_flags);
if (status != 0) /* error message has been printed already. */
return status;
return 0;
} /* int wg_write_messages */
-static int wg_write(const data_set_t *ds, const value_list_t *vl,
+static int wg_write(const metric_t *metric_p,
user_data_t *user_data) {
struct wg_callback *cb;
int status;
cb = user_data->data;
- status = wg_write_messages(ds, vl, cb);
+ status = wg_write_messages(metric_p, cb);
return status;
}
sfree(cb);
} /* }}} void wh_callback_free */
-static int wh_write_command(const data_set_t *ds,
- const value_list_t *vl, /* {{{ */
+static int wh_write_command(const metric_t *metric_p, /* {{{ */
wh_callback_t *cb) {
- char key[10 * DATA_MAX_NAME_LEN];
char values[512];
char command[1024];
size_t command_len;
if ((cb == NULL) || (cb->send_buffer == NULL))
return -1;
- if (strcmp(ds->type, vl->type) != 0) {
- ERROR("write_http plugin: DS type does not match "
- "value list type");
+ /* Copy the identifier to `key' and escape it. */
+ char *metric_string_p = NULL;
+ if ((metric_string_p = plugin_format_metric(metric_p)) != 0) {
return -1;
}
- /* Copy the identifier to `key' and escape it. */
- status = FORMAT_VL(key, sizeof(key), vl);
- if (status != 0) {
- ERROR("write_http plugin: error with format_name");
- return status;
- }
- escape_string(key, sizeof(key));
+ escape_string(metric_string_p, sizeof(metric_string_p));
/* Convert the values to an ASCII representation and put that into
* `values'. */
- status = format_values(values, sizeof(values), ds, vl, cb->store_rates);
+ status = format_values(values, sizeof(values), metric_p, cb->store_rates);
if (status != 0) {
ERROR("write_http plugin: error with "
"wh_value_list_to_string");
+ sfree(metric_string_p);
return status;
}
- command_len = (size_t)snprintf(command, sizeof(command),
- "PUTVAL %s interval=%.3f %s\r\n", key,
- CDTIME_T_TO_DOUBLE(vl->interval), values);
+ command_len = (size_t)snprintf(
+ command, sizeof(command), "PUTVAL %s interval=%.3f %s\r\n",
+ metric_string_p, CDTIME_T_TO_DOUBLE(metric_p->interval), values);
if (command_len >= sizeof(command)) {
ERROR("write_http plugin: Command buffer too small: "
"Need %" PRIsz " bytes.",
command_len + 1);
+ sfree(metric_string_p);
return -1;
}
+ sfree(metric_string_p);
pthread_mutex_lock(&cb->send_lock);
if (wh_callback_init(cb) != 0) {
return 0;
} /* }}} int wh_write_command */
-static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int wh_write_json(const metric_t *metric_p, /* {{{ */
wh_callback_t *cb) {
int status;
}
status =
- format_json_value_list(cb->send_buffer, &cb->send_buffer_fill,
- &cb->send_buffer_free, ds, vl, cb->store_rates);
+ format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
+ &cb->send_buffer_free, metric_p, cb->store_rates);
if (status == -ENOMEM) {
status = wh_flush_nolock(/* timeout = */ 0, cb);
if (status != 0) {
}
status =
- format_json_value_list(cb->send_buffer, &cb->send_buffer_fill,
- &cb->send_buffer_free, ds, vl, cb->store_rates);
+ format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
+ &cb->send_buffer_free, metric_p, cb->store_rates);
}
if (status != 0) {
pthread_mutex_unlock(&cb->send_lock);
return 0;
} /* }}} int wh_write_json */
-static int wh_write_kairosdb(const data_set_t *ds,
- const value_list_t *vl, /* {{{ */
+static int wh_write_kairosdb(const metric_t *metric_p, /* {{{ */
wh_callback_t *cb) {
int status;
}
}
- status = format_kairosdb_value_list(
- cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+ status = format_kairosdb_metric(
+ cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, metric_p,
cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
cb->data_ttl, cb->metrics_prefix);
if (status == -ENOMEM) {
return status;
}
- status = format_kairosdb_value_list(
- cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+ status = format_kairosdb_metric(
+ cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, metric_p,
cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
cb->data_ttl, cb->metrics_prefix);
}
return 0;
} /* }}} int wh_write_kairosdb */
-static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+static int wh_write(const metric_t *metric_p, /* {{{ */
user_data_t *user_data) {
wh_callback_t *cb;
int status;
switch (cb->format) {
case WH_FORMAT_JSON:
- status = wh_write_json(ds, vl, cb);
+ status = wh_write_json(metric_p, cb);
break;
case WH_FORMAT_KAIROSDB:
- status = wh_write_kairosdb(ds, vl, cb);
+ status = wh_write_kairosdb(metric_p, cb);
break;
default:
- status = wh_write_command(ds, vl, cb);
+ status = wh_write_command(metric_p, cb);
break;
}
return status;
};
static int kafka_handle(struct kafka_topic_context *);
-static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
+static int kafka_write(const metric_t *, user_data_t *);
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
} /* }}} int kafka_handle */
-static int kafka_write(const data_set_t *ds, /* {{{ */
- const value_list_t *vl, user_data_t *ud) {
+static int kafka_write(/* {{{ */
+ const metric_t *metric_p, user_data_t *ud) {
int status = 0;
void *key;
size_t keylen = 0;
size_t blen = 0;
struct kafka_topic_context *ctx = ud->data;
- if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
+ if ((metric_p == NULL) || (metric_p->ds == NULL) ||
+ (metric_p->identity == NULL) || (ctx == NULL))
return EINVAL;
pthread_mutex_lock(&ctx->lock);
switch (ctx->format) {
case KAFKA_FORMAT_COMMAND:
- status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
+ status = cmd_create_putval(buffer, sizeof(buffer), metric_p);
if (status != 0) {
ERROR("write_kafka plugin: cmd_create_putval failed with status %i.",
status);
break;
case KAFKA_FORMAT_JSON:
format_json_initialize(buffer, &bfill, &bfree);
- format_json_value_list(buffer, &bfill, &bfree, ds, vl, ctx->store_rates);
+ format_json_metric(buffer, &bfill, &bfree, metric_p, ctx->store_rates);
format_json_finalize(buffer, &bfill, &bfree);
blen = strlen(buffer);
break;
case KAFKA_FORMAT_GRAPHITE:
status =
- format_graphite(buffer, sizeof(buffer), ds, vl, ctx->prefix,
+ format_graphite(buffer, sizeof(buffer), metric_p, ctx->prefix,
ctx->postfix, ctx->escape_char, ctx->graphite_flags);
if (status != 0) {
ERROR("write_kafka plugin: format_graphite failed with status %i.",
/* Plugin:WriteLog has to also operate without a config, so use a global. */
int wl_format = WL_FORMAT_GRAPHITE;
-static int wl_write_graphite(const data_set_t *ds, const value_list_t *vl) {
+static int wl_write_graphite(const metric_t *metric_p) {
char buffer[WL_BUF_SIZE] = {0};
int status;
- if (0 != strcmp(ds->type, vl->type)) {
- ERROR("write_log plugin: DS type does not match value list type");
- return -1;
- }
-
- status = format_graphite(buffer, sizeof(buffer), ds, vl, NULL, NULL, '_', 0);
+ status = format_graphite(buffer, sizeof(buffer), metric_p, NULL, NULL, '_', 0);
if (status != 0) /* error message has been printed already. */
return status;
return 0;
} /* int wl_write_graphite */
-static int wl_write_json(const data_set_t *ds, const value_list_t *vl) {
+static int wl_write_json(const metric_t *metric_p) {
char buffer[WL_BUF_SIZE] = {0};
size_t bfree = sizeof(buffer);
size_t bfill = 0;
- if (0 != strcmp(ds->type, vl->type)) {
- ERROR("write_log plugin: DS type does not match value list type");
- return -1;
- }
-
format_json_initialize(buffer, &bfill, &bfree);
- format_json_value_list(buffer, &bfill, &bfree, ds, vl,
+ format_json_metric(buffer, &bfill, &bfree, metric_p,
/* store rates = */ 0);
format_json_finalize(buffer, &bfill, &bfree);
return 0;
} /* int wl_write_json */
-static int wl_write(const data_set_t *ds, const value_list_t *vl,
+static int wl_write(const metric_t *metric_p,
__attribute__((unused)) user_data_t *user_data) {
int status = 0;
if (wl_format == WL_FORMAT_GRAPHITE) {
- status = wl_write_graphite(ds, vl);
+ status = wl_write_graphite(metric_p);
} else if (wl_format == WL_FORMAT_JSON) {
- status = wl_write_json(ds, vl);
+ status = wl_write_json(metric_p);
}
return status;
/*
* Functions
*/
-static int wr_write(const data_set_t *ds, /* {{{ */
- const value_list_t *vl, user_data_t *ud) {
+static int wr_write(/* {{{ */
+ const metric_t *metric_p, user_data_t *ud) {
wr_node_t *node = ud->data;
- char ident[512];
char key[512];
char value[512] = {0};
char time[24];
int status;
redisReply *rr;
- status = FORMAT_VL(ident, sizeof(ident), vl);
- if (status != 0)
- return status;
+ char *metric_string_p = NULL;
+ if ((metric_string_p = plugin_format_metric(metric_p)) != 0) {
+ return -1;
+ }
+
ssnprintf(key, sizeof(key), "%s%s",
(node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
- ident);
- ssnprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
+ metric_string_p);
+ ssnprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(metric_p->time));
value_size = sizeof(value);
value_ptr = &value[0];
- status = format_values(value_ptr, value_size, ds, vl, node->store_rates);
- if (status != 0)
+ status = format_values(value_ptr, value_size, metric_p, node->store_rates);
+ if (status != 0) {
+ sfree(metric_string_p);
return status;
+ }
pthread_mutex_lock(&node->lock);
(node->host != NULL) ? node->host : "localhost",
(node->port != 0) ? node->port : 6379);
pthread_mutex_unlock(&node->lock);
+ sfree(metric_string_p);
return -1;
} else if (node->conn->err) {
ERROR(
(node->host != NULL) ? node->host : "localhost",
(node->port != 0) ? node->port : 6379, node->conn->errstr);
pthread_mutex_unlock(&node->lock);
+ sfree(metric_string_p);
return -1;
}
* remove element, scored less than 'current-max_set_duration'
* '(...' indicates 'less than' in redis CLI.
*/
- rr = redisCommand(node->conn, "ZREMRANGEBYSCORE %s -1 (%.9f", key,
- (CDTIME_T_TO_DOUBLE(vl->time) - node->max_set_duration));
+ rr = redisCommand(
+ node->conn, "ZREMRANGEBYSCORE %s -1 (%.9f", key,
+ (CDTIME_T_TO_DOUBLE(metric_p->time) - node->max_set_duration));
if (rr == NULL)
WARNING("ZREMRANGEBYSCORE command error. key:%s message:%s", key,
node->conn->errstr);
/* TODO(octo): This is more overhead than necessary. Use the cache and
* metadata to determine if it is a new metric and call SADD only once for
* each metric. */
- rr = redisCommand(
- node->conn, "SADD %svalues %s",
- (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX, ident);
+ rr =
+ redisCommand(node->conn, "SADD %svalues %s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ metric_string_p);
if (rr == NULL)
- WARNING("SADD command error. ident:%s message:%s", ident,
+ WARNING("SADD command error. ident:%s message:%s", metric_string_p,
node->conn->errstr);
else
freeReplyObject(rr);
pthread_mutex_unlock(&node->lock);
-
+ sfree(metric_string_p);
return 0;
} /* }}} int wr_write */
/* XXX: This is an experimental code, not optimized, not fast, not reliable,
* and probably, do not work as you expect. Enjoy! :D */
- prev_state = uc_get_state(ds, vl);
+ prev_state = uc_get_state_vl(ds, vl);
if ((th->hysteresis > 0) && (prev_state != STATE_OKAY) &&
(prev_state != STATE_UNKNOWN)) {
switch (prev_state) {