write_influxdb_udp_init_buffer();
}
+static void fill_send_buffer(char const *buffer, size_t len) {
+ assert(len <= wifxudp_config_packet_size);
+
+ pthread_mutex_lock(&send_buffer_lock);
+ if (wifxudp_config_packet_size - send_buffer_fill < len)
+ flush_buffer();
+ memcpy(send_buffer_ptr, buffer, len);
+ send_buffer_fill += len;
+ send_buffer_ptr += len;
+ send_buffer_last_update = cdtime();
+ pthread_mutex_unlock(&send_buffer_lock);
+}
+
static int wifxudp_escape_string(char *buffer, size_t buffer_size,
const char *string) {
if ((buffer == NULL) || (string == NULL))
return -EINVAL;
- if (buffer_size < 3)
- return -ENOMEM;
-
int dst_pos = 0;
#define BUFFER_ADD(c) \
do { \
if (dst_pos >= (buffer_size - 1)) { \
- buffer[buffer_size - 1] = '\0'; \
- return -ENOMEM; \
+ buffer[buffer_size - 1] = 0; \
+ return dst_pos++; \
} \
buffer[dst_pos] = (c); \
dst_pos++; \
return dst_pos;
} /* int wifxudp_escape_string */
-static int write_influxdb_point(char *buffer, int buffer_len,
- metric_family_t const *fam, size_t *idx) {
- int status;
- int metric_offset = 0;
+static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) {
+ int offset = 0;
bool have_values = false;
#define BUFFER_ADD_ESCAPE(...) \
do { \
- status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \
- __VA_ARGS__); \
- if (status < 0) { \
- return metric_offset; \
- } \
+ int status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \
+ __VA_ARGS__); \
+ if (status < 0) \
+ return status; \
offset += status; \
+ if (status >= (buffer_len - offset)) \
+ return offset; \
} while (0)
#define BUFFER_ADD(...) \
do { \
- status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
- if ((status < 0) || (status >= (buffer_len - offset))) { \
- return metric_offset; \
- } \
+ int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
+ if (status < 0) \
+ return status; \
offset += status; \
+ if (status >= (buffer_len - offset)) \
+ return offset; \
} while (0)
- for (; *idx < fam->metric.num; (*idx)++) {
- int offset = metric_offset;
- have_values = false;
- BUFFER_ADD_ESCAPE(fam->name);
- metric_t metric = fam->metric.ptr[*idx];
-
- for (size_t j = 0; j < metric.label.num; j++) {
- label_pair_t label = metric.label.ptr[j];
- if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) {
- BUFFER_ADD(",");
- BUFFER_ADD_ESCAPE(label.name);
- BUFFER_ADD("=");
- BUFFER_ADD_ESCAPE(label.value);
- }
- }
- BUFFER_ADD(" ");
+ have_values = false;
+ BUFFER_ADD_ESCAPE(metric.family->name);
+ for (size_t j = 0; j < metric.label.num; j++) {
+ label_pair_t label = metric.label.ptr[j];
+ BUFFER_ADD(",");
+ BUFFER_ADD_ESCAPE(label.name);
+ BUFFER_ADD("=");
+ BUFFER_ADD_ESCAPE(label.value);
+ }
+ BUFFER_ADD(" ");
- if (fam->type == METRIC_TYPE_GAUGE) {
+ if (wifxudp_config_store_rates &&
+ (metric.family->type == METRIC_TYPE_COUNTER)) {
+ gauge_t rate;
+ if (uc_get_rate(&metric, &rate) != 0) {
+ WARNING("write_influxdb_udp plugin: "
+ "uc_get_rate failed.");
+ return -1;
+ }
+ if (!isnan(rate)) {
+ BUFFER_ADD("value=" GAUGE_FORMAT, rate);
+ have_values = true;
+ }
+ } else {
+ switch (metric.family->type) {
+ case METRIC_TYPE_GAUGE:
+ case METRIC_TYPE_UNTYPED:
if (!isnan(metric.value.gauge)) {
- BUFFER_ADD("value=%lf", metric.value.gauge);
- have_values = true;
- }
- } else if (wifxudp_config_store_rates) {
- gauge_t rate;
- int ret = uc_get_rate(&metric, &rate);
- if (ret != 0) {
- WARNING("write_influxdb_udp plugin: "
- "uc_get_rate failed.");
- continue;
- }
- if (!isnan(rate)) {
- BUFFER_ADD("value=%lf", rate);
- have_values = true;
- }
- } else {
- switch (fam->type) {
- case METRIC_TYPE_COUNTER:
- BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
- have_values = true;
- break;
- case METRIC_TYPE_UNTYPED: /* derive ¿? */
- BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive);
+ BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
have_values = true;
- break;
- default:
- WARNING("write_influxdb_udp plugin: "
- "unknown family type.");
- return -1;
- break;
}
+ break;
+ case METRIC_TYPE_COUNTER:
+ BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+ have_values = true;
+ break;
+ default:
+ WARNING("write_influxdb_udp plugin: "
+ "unknown family type.");
+ return -1;
+ break;
}
+ }
- if (have_values) {
- BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
- metric_offset = offset;
- }
- } /* for (; *idx < fam->metric.num; (*idx)++) */
+ if (!have_values)
+ return 0;
+
+ BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
#undef BUFFER_ADD_ESCAPE
#undef BUFFER_ADD
- return metric_offset;
+ return offset;
} /* int write_influxdb_point */
static int write_influxdb_udp_write(metric_family_t const *fam,
return EINVAL;
char buffer[NET_DEFAULT_PACKET_SIZE];
- size_t idx = 0;
+ int buffer_len = NET_DEFAULT_PACKET_SIZE;
+ int offset = 0;
+
+ if (wifxudp_config_packet_size < buffer_len)
+ buffer_len = wifxudp_config_packet_size;
- while (idx < fam->metric.num) {
+ for (size_t i = 0; i < fam->metric.num; i++) {
+ metric_t metric = fam->metric.ptr[i];
int status =
- write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx);
- if (status < 0) {
+ write_influxdb_point(buffer + offset, buffer_len - offset, metric);
+ if (status < 0) { // error
ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
return -1;
+ } else if (status >= buffer_len - offset) { // full
+ fill_send_buffer(buffer, offset);
+ offset = 0;
+ buffer[0] = 0;
+ } else {
+ offset += status;
}
-
- if (status == 0) /* no real values to send (nan) */
- continue;
-
- pthread_mutex_lock(&send_buffer_lock);
- if (wifxudp_config_packet_size - send_buffer_fill < status)
- flush_buffer();
- memcpy(send_buffer_ptr, buffer, status);
-
- send_buffer_fill += status;
- send_buffer_ptr += status;
- send_buffer_last_update = cdtime();
-
- if (wifxudp_config_packet_size - send_buffer_fill < 120)
- /* No room for a new point of average size in buffer,
- the probability of fail for the new point is bigger than
- the probability of success */
- flush_buffer();
-
- pthread_mutex_unlock(&send_buffer_lock);
}
+ fill_send_buffer(buffer, offset);
return 0;
} /* int write_influxdb_udp_write */