]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
* family loop write rework
authorcarlospeon <carlospeon@gmail.com>
Wed, 30 Sep 2020 14:11:18 +0000 (16:11 +0200)
committerCarlos Peón Costa <carlospec@inditex.com>
Thu, 1 Oct 2020 10:47:33 +0000 (12:47 +0200)
* fix gauge printf format
* fix METRIC_TYPE_UNTYPED

src/write_influxdb_udp.c

index 3ac0da9a0eb2907f519a937ff6650cc2d8c54d7a..1019de08450a3ce86ea610d8bc61143d6e873b3f 100644 (file)
@@ -326,22 +326,32 @@ static void flush_buffer(void) {
   write_influxdb_udp_init_buffer();
 }
 
+static void fill_send_buffer(char const *buffer, size_t len) {
+  assert(len <= wifxudp_config_packet_size);
+
+  pthread_mutex_lock(&send_buffer_lock);
+  if (wifxudp_config_packet_size - send_buffer_fill < len)
+    flush_buffer();
+  memcpy(send_buffer_ptr, buffer, len);
+  send_buffer_fill += len;
+  send_buffer_ptr += len;
+  send_buffer_last_update = cdtime();
+  pthread_mutex_unlock(&send_buffer_lock);
+}
+
 static int wifxudp_escape_string(char *buffer, size_t buffer_size,
                                  const char *string) {
 
   if ((buffer == NULL) || (string == NULL))
     return -EINVAL;
 
-  if (buffer_size < 3)
-    return -ENOMEM;
-
   int dst_pos = 0;
 
 #define BUFFER_ADD(c)                                                          \
   do {                                                                         \
     if (dst_pos >= (buffer_size - 1)) {                                        \
-      buffer[buffer_size - 1] = '\0';                                          \
-      return -ENOMEM;                                                          \
+      buffer[buffer_size - 1] = 0;                                             \
+      return dst_pos++;                                                        \
     }                                                                          \
     buffer[dst_pos] = (c);                                                     \
     dst_pos++;                                                                 \
@@ -364,93 +374,84 @@ static int wifxudp_escape_string(char *buffer, size_t buffer_size,
   return dst_pos;
 } /* int wifxudp_escape_string */
 
-static int write_influxdb_point(char *buffer, int buffer_len,
-                                metric_family_t const *fam, size_t *idx) {
-  int status;
-  int metric_offset = 0;
+static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) {
+  int offset = 0;
   bool have_values = false;
 
 #define BUFFER_ADD_ESCAPE(...)                                                 \
   do {                                                                         \
-    status = wifxudp_escape_string(buffer + offset, buffer_len - offset,       \
-                                   __VA_ARGS__);                               \
-    if (status < 0) {                                                          \
-      return metric_offset;                                                    \
-    }                                                                          \
+    int status = wifxudp_escape_string(buffer + offset, buffer_len - offset,   \
+                                       __VA_ARGS__);                           \
+    if (status < 0)                                                            \
+      return status;                                                           \
     offset += status;                                                          \
+    if (status >= (buffer_len - offset))                                       \
+      return offset;                                                           \
   } while (0)
 
 #define BUFFER_ADD(...)                                                        \
   do {                                                                         \
-    status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
-    if ((status < 0) || (status >= (buffer_len - offset))) {                   \
-      return metric_offset;                                                    \
-    }                                                                          \
+    int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);  \
+    if (status < 0)                                                            \
+      return status;                                                           \
     offset += status;                                                          \
+    if (status >= (buffer_len - offset))                                       \
+      return offset;                                                           \
   } while (0)
 
-  for (; *idx < fam->metric.num; (*idx)++) {
-    int offset = metric_offset;
-    have_values = false;
-    BUFFER_ADD_ESCAPE(fam->name);
-    metric_t metric = fam->metric.ptr[*idx];
-
-    for (size_t j = 0; j < metric.label.num; j++) {
-      label_pair_t label = metric.label.ptr[j];
-      if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) {
-        BUFFER_ADD(",");
-        BUFFER_ADD_ESCAPE(label.name);
-        BUFFER_ADD("=");
-        BUFFER_ADD_ESCAPE(label.value);
-      }
-    }
-    BUFFER_ADD(" ");
+  have_values = false;
+  BUFFER_ADD_ESCAPE(metric.family->name);
+  for (size_t j = 0; j < metric.label.num; j++) {
+    label_pair_t label = metric.label.ptr[j];
+    BUFFER_ADD(",");
+    BUFFER_ADD_ESCAPE(label.name);
+    BUFFER_ADD("=");
+    BUFFER_ADD_ESCAPE(label.value);
+  }
+  BUFFER_ADD(" ");
 
-    if (fam->type == METRIC_TYPE_GAUGE) {
+  if (wifxudp_config_store_rates &&
+      (metric.family->type == METRIC_TYPE_COUNTER)) {
+    gauge_t rate;
+    if (uc_get_rate(&metric, &rate) != 0) {
+      WARNING("write_influxdb_udp plugin: "
+              "uc_get_rate failed.");
+      return -1;
+    }
+    if (!isnan(rate)) {
+      BUFFER_ADD("value=" GAUGE_FORMAT, rate);
+      have_values = true;
+    }
+  } else {
+    switch (metric.family->type) {
+    case METRIC_TYPE_GAUGE:
+    case METRIC_TYPE_UNTYPED:
       if (!isnan(metric.value.gauge)) {
-        BUFFER_ADD("value=%lf", metric.value.gauge);
-        have_values = true;
-      }
-    } else if (wifxudp_config_store_rates) {
-      gauge_t rate;
-      int ret = uc_get_rate(&metric, &rate);
-      if (ret != 0) {
-        WARNING("write_influxdb_udp plugin: "
-                "uc_get_rate failed.");
-        continue;
-      }
-      if (!isnan(rate)) {
-        BUFFER_ADD("value=%lf", rate);
-        have_values = true;
-      }
-    } else {
-      switch (fam->type) {
-      case METRIC_TYPE_COUNTER:
-        BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
-        have_values = true;
-        break;
-      case METRIC_TYPE_UNTYPED: /* derive ¿? */
-        BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive);
+        BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
         have_values = true;
-        break;
-      default:
-        WARNING("write_influxdb_udp plugin: "
-                "unknown family type.");
-        return -1;
-        break;
       }
+      break;
+    case METRIC_TYPE_COUNTER:
+      BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+      have_values = true;
+      break;
+    default:
+      WARNING("write_influxdb_udp plugin: "
+              "unknown family type.");
+      return -1;
+      break;
     }
+  }
 
-    if (have_values) {
-      BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
-      metric_offset = offset;
-    }
-  } /* for (; *idx < fam->metric.num; (*idx)++) */
+  if (!have_values)
+    return 0;
+
+  BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
 
 #undef BUFFER_ADD_ESCAPE
 #undef BUFFER_ADD
 
-  return metric_offset;
+  return offset;
 } /* int write_influxdb_point */
 
 static int write_influxdb_udp_write(metric_family_t const *fam,
@@ -460,36 +461,28 @@ static int write_influxdb_udp_write(metric_family_t const *fam,
     return EINVAL;
 
   char buffer[NET_DEFAULT_PACKET_SIZE];
-  size_t idx = 0;
+  int buffer_len = NET_DEFAULT_PACKET_SIZE;
+  int offset = 0;
+
+  if (wifxudp_config_packet_size < buffer_len)
+    buffer_len = wifxudp_config_packet_size;
 
-  while (idx < fam->metric.num) {
+  for (size_t i = 0; i < fam->metric.num; i++) {
+    metric_t metric = fam->metric.ptr[i];
     int status =
-        write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx);
-    if (status < 0) {
+        write_influxdb_point(buffer + offset, buffer_len - offset, metric);
+    if (status < 0) { // error
       ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
       return -1;
+    } else if (status >= buffer_len - offset) { // full
+      fill_send_buffer(buffer, offset);
+      offset = 0;
+      buffer[0] = 0;
+    } else {
+      offset += status;
     }
-
-    if (status == 0) /* no real values to send (nan) */
-      continue;
-
-    pthread_mutex_lock(&send_buffer_lock);
-    if (wifxudp_config_packet_size - send_buffer_fill < status)
-      flush_buffer();
-    memcpy(send_buffer_ptr, buffer, status);
-
-    send_buffer_fill += status;
-    send_buffer_ptr += status;
-    send_buffer_last_update = cdtime();
-
-    if (wifxudp_config_packet_size - send_buffer_fill < 120)
-      /* No room for a new point of average size in buffer,
-         the probability of fail for the new point is bigger than
-         the probability of success */
-      flush_buffer();
-
-    pthread_mutex_unlock(&send_buffer_lock);
   }
+  fill_send_buffer(buffer, offset);
   return 0;
 } /* int write_influxdb_udp_write */