} /* int wifxudp_escape_string */
static int write_influxdb_point(char *buffer, int buffer_len,
- const data_set_t *ds, const value_list_t *vl) {
+ metric_family_t const *fam, size_t *idx) {
int status;
- int offset = 0;
- gauge_t *rates = NULL;
+ int metric_offset = 0;
bool have_values = false;
- assert(0 == strcmp(ds->type, vl->type));
-
#define BUFFER_ADD_ESCAPE(...) \
do { \
status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \
__VA_ARGS__); \
- if (status < 0) \
- return -1; \
+ if (status < 0) { \
+ return metric_offset; \
+ } \
offset += status; \
} while (0)
do { \
status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
if ((status < 0) || (status >= (buffer_len - offset))) { \
- sfree(rates); \
- return -1; \
+ return metric_offset; \
} \
offset += status; \
} while (0)
- BUFFER_ADD_ESCAPE(vl->plugin);
- BUFFER_ADD(",host=");
- BUFFER_ADD_ESCAPE(vl->host);
- if (strcmp(vl->plugin_instance, "") != 0) {
- BUFFER_ADD(",instance=");
- BUFFER_ADD_ESCAPE(vl->plugin_instance);
- }
- if (strcmp(vl->type, "") != 0) {
- BUFFER_ADD(",type=");
- BUFFER_ADD_ESCAPE(vl->type);
- }
- if (strcmp(vl->type_instance, "") != 0) {
- BUFFER_ADD(",type_instance=");
- BUFFER_ADD_ESCAPE(vl->type_instance);
- }
+ for (; *idx < fam->metric.num; (*idx)++) {
+ int offset = metric_offset;
+ have_values = false;
+ BUFFER_ADD_ESCAPE(fam->name);
+ metric_t metric = fam->metric.ptr[*idx];
- BUFFER_ADD(" ");
- for (size_t i = 0; i < ds->ds_num; i++) {
- if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
- (ds->ds[i].type != DS_TYPE_GAUGE) &&
- (ds->ds[i].type != DS_TYPE_DERIVE)) {
- sfree(rates);
- return -1;
+ for (size_t j = 0; j < metric.label.num; j++) {
+ label_pair_t label = metric.label.ptr[j];
+ if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) {
+ BUFFER_ADD(",");
+ BUFFER_ADD_ESCAPE(label.name);
+ BUFFER_ADD("=");
+ BUFFER_ADD_ESCAPE(label.value);
+ }
}
+ BUFFER_ADD(" ");
- if (ds->ds[i].type == DS_TYPE_GAUGE) {
- if (isnan(vl->values[i].gauge))
- continue;
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
- have_values = true;
+ if (fam->type == METRIC_TYPE_GAUGE) {
+ if (!isnan(metric.value.gauge)) {
+ BUFFER_ADD("value=%lf", metric.value.gauge);
+ have_values = true;
+ }
} else if (wifxudp_config_store_rates) {
- if (rates == NULL)
- rates = uc_get_rate_vl(ds, vl);
- if (rates == NULL) {
+ gauge_t rate;
+ int ret = uc_get_rate(&metric, &rate);
+ if (ret != 0) {
+ WARNING("write_influxdb_udp plugin: "
+ "uc_get_rate failed.");
+ continue;
+ }
+ if (!isnan(rate)) {
+ BUFFER_ADD("value=%lf", rate);
+ have_values = true;
+ }
+ } else {
+ switch (fam->type) {
+ case METRIC_TYPE_COUNTER:
+ BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+ have_values = true;
+ break;
+ case METRIC_TYPE_UNTYPED: /* derive ¿? */
+ BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive);
+ have_values = true;
+ break;
+ default:
WARNING("write_influxdb_udp plugin: "
- "uc_get_rate_vl failed.");
+ "unknown family type.");
return -1;
+ break;
}
- if (isnan(rates[i]))
- continue;
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
- have_values = true;
- } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
- (uint64_t)vl->values[i].counter);
- have_values = true;
- } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
- have_values = true;
}
- } /* for ds->ds_num */
- sfree(rates);
-
- if (!have_values)
- return 0;
-
- BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(vl->time));
+ if (have_values) {
+ BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
+ metric_offset = offset;
+ }
+ } /* for (; *idx < fam->metric.num; (*idx)++) */
#undef BUFFER_ADD_ESCAPE
#undef BUFFER_ADD
- return offset;
+ return metric_offset;
} /* int write_influxdb_point */
-static int
-write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
- user_data_t __attribute__((unused)) * user_data) {
+static int write_influxdb_udp_write(metric_family_t const *fam,
+ user_data_t __attribute__((unused)) *
+ user_data) {
+ if (fam == NULL)
+ return EINVAL;
+
char buffer[NET_DEFAULT_PACKET_SIZE];
+ size_t idx = 0;
- int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl);
+ while (idx < fam->metric.num) {
+ int status =
+ write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx);
+ if (status < 0) {
+ ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
+ return -1;
+ }
- if (status < 0) {
- ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
- return -1;
- }
- if (status == 0) /* no real values to send (nan) */
- return 0;
+ if (status == 0) /* no real values to send (nan) */
+ continue;
- pthread_mutex_lock(&send_buffer_lock);
- if (wifxudp_config_packet_size - send_buffer_fill < status)
- flush_buffer();
- memcpy(send_buffer_ptr, buffer, status);
+ pthread_mutex_lock(&send_buffer_lock);
+ if (wifxudp_config_packet_size - send_buffer_fill < status)
+ flush_buffer();
+ memcpy(send_buffer_ptr, buffer, status);
- send_buffer_fill += status;
- send_buffer_ptr += status;
- send_buffer_last_update = cdtime();
+ send_buffer_fill += status;
+ send_buffer_ptr += status;
+ send_buffer_last_update = cdtime();
- if (wifxudp_config_packet_size - send_buffer_fill < 120)
- /* No room for a new point of average size in buffer,
- the probability of fail for the new point is bigger than
- the probability of success */
- flush_buffer();
+ if (wifxudp_config_packet_size - send_buffer_fill < 120)
+ /* No room for a new point of average size in buffer,
+ the probability of fail for the new point is bigger than
+ the probability of success */
+ flush_buffer();
- pthread_mutex_unlock(&send_buffer_lock);
+ pthread_mutex_unlock(&send_buffer_lock);
+ }
return 0;
} /* int write_influxdb_udp_write */