From: carlospeon Date: Wed, 30 Sep 2020 14:11:18 +0000 (+0200) Subject: * family loop write rework X-Git-Tag: 6.0.0-rc0~131^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d7e5648e756b58f56de0e9c8e3adeedc7e34de6f;p=thirdparty%2Fcollectd.git * family loop write rework * fix gauge printf format * fix METRIC_TYPE_UNTYPED --- diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index 3ac0da9a0..1019de084 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -326,22 +326,32 @@ static void flush_buffer(void) { write_influxdb_udp_init_buffer(); } +static void fill_send_buffer(char const *buffer, size_t len) { + assert(len <= wifxudp_config_packet_size); + + pthread_mutex_lock(&send_buffer_lock); + if (wifxudp_config_packet_size - send_buffer_fill < len) + flush_buffer(); + memcpy(send_buffer_ptr, buffer, len); + send_buffer_fill += len; + send_buffer_ptr += len; + send_buffer_last_update = cdtime(); + pthread_mutex_unlock(&send_buffer_lock); +} + static int wifxudp_escape_string(char *buffer, size_t buffer_size, const char *string) { if ((buffer == NULL) || (string == NULL)) return -EINVAL; - if (buffer_size < 3) - return -ENOMEM; - int dst_pos = 0; #define BUFFER_ADD(c) \ do { \ if (dst_pos >= (buffer_size - 1)) { \ - buffer[buffer_size - 1] = '\0'; \ - return -ENOMEM; \ + buffer[buffer_size - 1] = 0; \ + return dst_pos++; \ } \ buffer[dst_pos] = (c); \ dst_pos++; \ @@ -364,93 +374,84 @@ static int wifxudp_escape_string(char *buffer, size_t buffer_size, return dst_pos; } /* int wifxudp_escape_string */ -static int write_influxdb_point(char *buffer, int buffer_len, - metric_family_t const *fam, size_t *idx) { - int status; - int metric_offset = 0; +static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) { + int offset = 0; bool have_values = false; #define BUFFER_ADD_ESCAPE(...) \ do { \ - status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \ - __VA_ARGS__); \ - if (status < 0) { \ - return metric_offset; \ - } \ + int status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \ + __VA_ARGS__); \ + if (status < 0) \ + return status; \ offset += status; \ + if (status >= (buffer_len - offset)) \ + return offset; \ } while (0) #define BUFFER_ADD(...) \ do { \ - status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ - if ((status < 0) || (status >= (buffer_len - offset))) { \ - return metric_offset; \ - } \ + int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ + if (status < 0) \ + return status; \ offset += status; \ + if (status >= (buffer_len - offset)) \ + return offset; \ } while (0) - for (; *idx < fam->metric.num; (*idx)++) { - int offset = metric_offset; - have_values = false; - BUFFER_ADD_ESCAPE(fam->name); - metric_t metric = fam->metric.ptr[*idx]; - - for (size_t j = 0; j < metric.label.num; j++) { - label_pair_t label = metric.label.ptr[j]; - if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) { - BUFFER_ADD(","); - BUFFER_ADD_ESCAPE(label.name); - BUFFER_ADD("="); - BUFFER_ADD_ESCAPE(label.value); - } - } - BUFFER_ADD(" "); + have_values = false; + BUFFER_ADD_ESCAPE(metric.family->name); + for (size_t j = 0; j < metric.label.num; j++) { + label_pair_t label = metric.label.ptr[j]; + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(label.name); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(label.value); + } + BUFFER_ADD(" "); - if (fam->type == METRIC_TYPE_GAUGE) { + if (wifxudp_config_store_rates && + (metric.family->type == METRIC_TYPE_COUNTER)) { + gauge_t rate; + if (uc_get_rate(&metric, &rate) != 0) { + WARNING("write_influxdb_udp plugin: " + "uc_get_rate failed."); + return -1; + } + if (!isnan(rate)) { + BUFFER_ADD("value=" GAUGE_FORMAT, rate); + have_values = true; + } + } else { + switch (metric.family->type) { + case METRIC_TYPE_GAUGE: + case METRIC_TYPE_UNTYPED: if (!isnan(metric.value.gauge)) { - BUFFER_ADD("value=%lf", metric.value.gauge); - have_values = true; - } - } else if (wifxudp_config_store_rates) { - gauge_t rate; - int ret = uc_get_rate(&metric, &rate); - if (ret != 0) { - WARNING("write_influxdb_udp plugin: " - "uc_get_rate failed."); - continue; - } - if (!isnan(rate)) { - BUFFER_ADD("value=%lf", rate); - have_values = true; - } - } else { - switch (fam->type) { - case METRIC_TYPE_COUNTER: - BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter); - have_values = true; - break; - case METRIC_TYPE_UNTYPED: /* derive ¿? */ - BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive); + BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge); have_values = true; - break; - default: - WARNING("write_influxdb_udp plugin: " - "unknown family type."); - return -1; - break; } + break; + case METRIC_TYPE_COUNTER: + BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter); + have_values = true; + break; + default: + WARNING("write_influxdb_udp plugin: " + "unknown family type."); + return -1; + break; } + } - if (have_values) { - BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time)); - metric_offset = offset; - } - } /* for (; *idx < fam->metric.num; (*idx)++) */ + if (!have_values) + return 0; + + BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time)); #undef BUFFER_ADD_ESCAPE #undef BUFFER_ADD - return metric_offset; + return offset; } /* int write_influxdb_point */ static int write_influxdb_udp_write(metric_family_t const *fam, @@ -460,36 +461,28 @@ static int write_influxdb_udp_write(metric_family_t const *fam, return EINVAL; char buffer[NET_DEFAULT_PACKET_SIZE]; - size_t idx = 0; + int buffer_len = NET_DEFAULT_PACKET_SIZE; + int offset = 0; + + if (wifxudp_config_packet_size < buffer_len) + buffer_len = wifxudp_config_packet_size; - while (idx < fam->metric.num) { + for (size_t i = 0; i < fam->metric.num; i++) { + metric_t metric = fam->metric.ptr[i]; int status = - write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx); - if (status < 0) { + write_influxdb_point(buffer + offset, buffer_len - offset, metric); + if (status < 0) { // error ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed."); return -1; + } else if (status >= buffer_len - offset) { // full + fill_send_buffer(buffer, offset); + offset = 0; + buffer[0] = 0; + } else { + offset += status; } - - if (status == 0) /* no real values to send (nan) */ - continue; - - pthread_mutex_lock(&send_buffer_lock); - if (wifxudp_config_packet_size - send_buffer_fill < status) - flush_buffer(); - memcpy(send_buffer_ptr, buffer, status); - - send_buffer_fill += status; - send_buffer_ptr += status; - send_buffer_last_update = cdtime(); - - if (wifxudp_config_packet_size - send_buffer_fill < 120) - /* No room for a new point of average size in buffer, - the probability of fail for the new point is bigger than - the probability of success */ - flush_buffer(); - - pthread_mutex_unlock(&send_buffer_lock); } + fill_send_buffer(buffer, offset); return 0; } /* int write_influxdb_udp_write */