From: Carlos Peón Costa Date: Tue, 29 Sep 2020 16:13:41 +0000 (+0200) Subject: write_influxdb_udp.c: migration to v6.0 X-Git-Tag: 6.0.0-rc0~131^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=26cee72da0bee0ac75b712ec1fa5b6de6758ea8d;p=thirdparty%2Fcollectd.git write_influxdb_udp.c: migration to v6.0 --- diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index b324a545c..3ac0da9a0 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -365,20 +365,18 @@ static int wifxudp_escape_string(char *buffer, size_t buffer_size, } /* int wifxudp_escape_string */ static int write_influxdb_point(char *buffer, int buffer_len, - const data_set_t *ds, const value_list_t *vl) { + metric_family_t const *fam, size_t *idx) { int status; - int offset = 0; - gauge_t *rates = NULL; + int metric_offset = 0; bool have_values = false; - assert(0 == strcmp(ds->type, vl->type)); - #define BUFFER_ADD_ESCAPE(...) \ do { \ status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \ __VA_ARGS__); \ - if (status < 0) \ - return -1; \ + if (status < 0) { \ + return metric_offset; \ + } \ offset += status; \ } while (0) @@ -386,115 +384,112 @@ static int write_influxdb_point(char *buffer, int buffer_len, do { \ status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ if ((status < 0) || (status >= (buffer_len - offset))) { \ - sfree(rates); \ - return -1; \ + return metric_offset; \ } \ offset += status; \ } while (0) - BUFFER_ADD_ESCAPE(vl->plugin); - BUFFER_ADD(",host="); - BUFFER_ADD_ESCAPE(vl->host); - if (strcmp(vl->plugin_instance, "") != 0) { - BUFFER_ADD(",instance="); - BUFFER_ADD_ESCAPE(vl->plugin_instance); - } - if (strcmp(vl->type, "") != 0) { - BUFFER_ADD(",type="); - BUFFER_ADD_ESCAPE(vl->type); - } - if (strcmp(vl->type_instance, "") != 0) { - BUFFER_ADD(",type_instance="); - BUFFER_ADD_ESCAPE(vl->type_instance); - } + for (; *idx < fam->metric.num; (*idx)++) { + int offset = metric_offset; + have_values = false; + BUFFER_ADD_ESCAPE(fam->name); + metric_t metric = fam->metric.ptr[*idx]; - BUFFER_ADD(" "); - for (size_t i = 0; i < ds->ds_num; i++) { - if ((ds->ds[i].type != DS_TYPE_COUNTER) && - (ds->ds[i].type != DS_TYPE_GAUGE) && - (ds->ds[i].type != DS_TYPE_DERIVE)) { - sfree(rates); - return -1; + for (size_t j = 0; j < metric.label.num; j++) { + label_pair_t label = metric.label.ptr[j]; + if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) { + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(label.name); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(label.value); + } } + BUFFER_ADD(" "); - if (ds->ds[i].type == DS_TYPE_GAUGE) { - if (isnan(vl->values[i].gauge)) - continue; - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge); - have_values = true; + if (fam->type == METRIC_TYPE_GAUGE) { + if (!isnan(metric.value.gauge)) { + BUFFER_ADD("value=%lf", metric.value.gauge); + have_values = true; + } } else if (wifxudp_config_store_rates) { - if (rates == NULL) - rates = uc_get_rate_vl(ds, vl); - if (rates == NULL) { + gauge_t rate; + int ret = uc_get_rate(&metric, &rate); + if (ret != 0) { + WARNING("write_influxdb_udp plugin: " + "uc_get_rate failed."); + continue; + } + if (!isnan(rate)) { + BUFFER_ADD("value=%lf", rate); + have_values = true; + } + } else { + switch (fam->type) { + case METRIC_TYPE_COUNTER: + BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter); + have_values = true; + break; + case METRIC_TYPE_UNTYPED: /* derive ¿? */ + BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive); + have_values = true; + break; + default: WARNING("write_influxdb_udp plugin: " - "uc_get_rate_vl failed."); + "unknown family type."); return -1; + break; } - if (isnan(rates[i])) - continue; - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]); - have_values = true; - } else if (ds->ds[i].type == DS_TYPE_COUNTER) { - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, - (uint64_t)vl->values[i].counter); - have_values = true; - } else if (ds->ds[i].type == DS_TYPE_DERIVE) { - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive); - have_values = true; } - } /* for ds->ds_num */ - sfree(rates); - - if (!have_values) - return 0; - - BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(vl->time)); + if (have_values) { + BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time)); + metric_offset = offset; + } + } /* for (; *idx < fam->metric.num; (*idx)++) */ #undef BUFFER_ADD_ESCAPE #undef BUFFER_ADD - return offset; + return metric_offset; } /* int write_influxdb_point */ -static int -write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl, - user_data_t __attribute__((unused)) * user_data) { +static int write_influxdb_udp_write(metric_family_t const *fam, + user_data_t __attribute__((unused)) * + user_data) { + if (fam == NULL) + return EINVAL; + char buffer[NET_DEFAULT_PACKET_SIZE]; + size_t idx = 0; - int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl); + while (idx < fam->metric.num) { + int status = + write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx); + if (status < 0) { + ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed."); + return -1; + } - if (status < 0) { - ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed."); - return -1; - } - if (status == 0) /* no real values to send (nan) */ - return 0; + if (status == 0) /* no real values to send (nan) */ + continue; - pthread_mutex_lock(&send_buffer_lock); - if (wifxudp_config_packet_size - send_buffer_fill < status) - flush_buffer(); - memcpy(send_buffer_ptr, buffer, status); + pthread_mutex_lock(&send_buffer_lock); + if (wifxudp_config_packet_size - send_buffer_fill < status) + flush_buffer(); + memcpy(send_buffer_ptr, buffer, status); - send_buffer_fill += status; - send_buffer_ptr += status; - send_buffer_last_update = cdtime(); + send_buffer_fill += status; + send_buffer_ptr += status; + send_buffer_last_update = cdtime(); - if (wifxudp_config_packet_size - send_buffer_fill < 120) - /* No room for a new point of average size in buffer, - the probability of fail for the new point is bigger than - the probability of success */ - flush_buffer(); + if (wifxudp_config_packet_size - send_buffer_fill < 120) + /* No room for a new point of average size in buffer, + the probability of fail for the new point is bigger than + the probability of success */ + flush_buffer(); - pthread_mutex_unlock(&send_buffer_lock); + pthread_mutex_unlock(&send_buffer_lock); + } return 0; } /* int write_influxdb_udp_write */