]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_influxdb_udp.c: migration to v6.0
authorCarlos Peón Costa <carlospec@inditex.com>
Tue, 29 Sep 2020 16:13:41 +0000 (18:13 +0200)
committerCarlos Peón Costa <carlospec@inditex.com>
Tue, 29 Sep 2020 16:25:25 +0000 (18:25 +0200)
src/write_influxdb_udp.c

index b324a545ccdf6e0286fd5502862c178626068919..3ac0da9a0eb2907f519a937ff6650cc2d8c54d7a 100644 (file)
@@ -365,20 +365,18 @@ static int wifxudp_escape_string(char *buffer, size_t buffer_size,
 } /* int wifxudp_escape_string */
 
 static int write_influxdb_point(char *buffer, int buffer_len,
-                                const data_set_t *ds, const value_list_t *vl) {
+                                metric_family_t const *fam, size_t *idx) {
   int status;
-  int offset = 0;
-  gauge_t *rates = NULL;
+  int metric_offset = 0;
   bool have_values = false;
 
-  assert(0 == strcmp(ds->type, vl->type));
-
 #define BUFFER_ADD_ESCAPE(...)                                                 \
   do {                                                                         \
     status = wifxudp_escape_string(buffer + offset, buffer_len - offset,       \
                                    __VA_ARGS__);                               \
-    if (status < 0)                                                            \
-      return -1;                                                               \
+    if (status < 0) {                                                          \
+      return metric_offset;                                                    \
+    }                                                                          \
     offset += status;                                                          \
   } while (0)
 
@@ -386,115 +384,112 @@ static int write_influxdb_point(char *buffer, int buffer_len,
   do {                                                                         \
     status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
     if ((status < 0) || (status >= (buffer_len - offset))) {                   \
-      sfree(rates);                                                            \
-      return -1;                                                               \
+      return metric_offset;                                                    \
     }                                                                          \
     offset += status;                                                          \
   } while (0)
 
-  BUFFER_ADD_ESCAPE(vl->plugin);
-  BUFFER_ADD(",host=");
-  BUFFER_ADD_ESCAPE(vl->host);
-  if (strcmp(vl->plugin_instance, "") != 0) {
-    BUFFER_ADD(",instance=");
-    BUFFER_ADD_ESCAPE(vl->plugin_instance);
-  }
-  if (strcmp(vl->type, "") != 0) {
-    BUFFER_ADD(",type=");
-    BUFFER_ADD_ESCAPE(vl->type);
-  }
-  if (strcmp(vl->type_instance, "") != 0) {
-    BUFFER_ADD(",type_instance=");
-    BUFFER_ADD_ESCAPE(vl->type_instance);
-  }
+  for (; *idx < fam->metric.num; (*idx)++) {
+    int offset = metric_offset;
+    have_values = false;
+    BUFFER_ADD_ESCAPE(fam->name);
+    metric_t metric = fam->metric.ptr[*idx];
 
-  BUFFER_ADD(" ");
-  for (size_t i = 0; i < ds->ds_num; i++) {
-    if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
-        (ds->ds[i].type != DS_TYPE_GAUGE) &&
-        (ds->ds[i].type != DS_TYPE_DERIVE)) {
-      sfree(rates);
-      return -1;
+    for (size_t j = 0; j < metric.label.num; j++) {
+      label_pair_t label = metric.label.ptr[j];
+      if (strcmp(label.name, "") != 0 && strcmp(label.value, "") != 0) {
+        BUFFER_ADD(",");
+        BUFFER_ADD_ESCAPE(label.name);
+        BUFFER_ADD("=");
+        BUFFER_ADD_ESCAPE(label.value);
+      }
     }
+    BUFFER_ADD(" ");
 
-    if (ds->ds[i].type == DS_TYPE_GAUGE) {
-      if (isnan(vl->values[i].gauge))
-        continue;
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
-      have_values = true;
+    if (fam->type == METRIC_TYPE_GAUGE) {
+      if (!isnan(metric.value.gauge)) {
+        BUFFER_ADD("value=%lf", metric.value.gauge);
+        have_values = true;
+      }
     } else if (wifxudp_config_store_rates) {
-      if (rates == NULL)
-        rates = uc_get_rate_vl(ds, vl);
-      if (rates == NULL) {
+      gauge_t rate;
+      int ret = uc_get_rate(&metric, &rate);
+      if (ret != 0) {
+        WARNING("write_influxdb_udp plugin: "
+                "uc_get_rate failed.");
+        continue;
+      }
+      if (!isnan(rate)) {
+        BUFFER_ADD("value=%lf", rate);
+        have_values = true;
+      }
+    } else {
+      switch (fam->type) {
+      case METRIC_TYPE_COUNTER:
+        BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+        have_values = true;
+        break;
+      case METRIC_TYPE_UNTYPED: /* derive ¿? */
+        BUFFER_ADD("value=%" PRIi64 "i", metric.value.derive);
+        have_values = true;
+        break;
+      default:
         WARNING("write_influxdb_udp plugin: "
-                "uc_get_rate_vl failed.");
+                "unknown family type.");
         return -1;
+        break;
       }
-      if (isnan(rates[i]))
-        continue;
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
-      have_values = true;
-    } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
-                 (uint64_t)vl->values[i].counter);
-      have_values = true;
-    } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
-      have_values = true;
     }
 
-  } /* for ds->ds_num */
-  sfree(rates);
-
-  if (!have_values)
-    return 0;
-
-  BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(vl->time));
+    if (have_values) {
+      BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
+      metric_offset = offset;
+    }
+  } /* for (; *idx < fam->metric.num; (*idx)++) */
 
 #undef BUFFER_ADD_ESCAPE
 #undef BUFFER_ADD
 
-  return offset;
+  return metric_offset;
 } /* int write_influxdb_point */
 
-static int
-write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
-                         user_data_t __attribute__((unused)) * user_data) {
+static int write_influxdb_udp_write(metric_family_t const *fam,
+                                    user_data_t __attribute__((unused)) *
+                                        user_data) {
+  if (fam == NULL)
+    return EINVAL;
+
   char buffer[NET_DEFAULT_PACKET_SIZE];
+  size_t idx = 0;
 
-  int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl);
+  while (idx < fam->metric.num) {
+    int status =
+        write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, fam, &idx);
+    if (status < 0) {
+      ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
+      return -1;
+    }
 
-  if (status < 0) {
-    ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
-    return -1;
-  }
-  if (status == 0) /* no real values to send (nan) */
-    return 0;
+    if (status == 0) /* no real values to send (nan) */
+      continue;
 
-  pthread_mutex_lock(&send_buffer_lock);
-  if (wifxudp_config_packet_size - send_buffer_fill < status)
-    flush_buffer();
-  memcpy(send_buffer_ptr, buffer, status);
+    pthread_mutex_lock(&send_buffer_lock);
+    if (wifxudp_config_packet_size - send_buffer_fill < status)
+      flush_buffer();
+    memcpy(send_buffer_ptr, buffer, status);
 
-  send_buffer_fill += status;
-  send_buffer_ptr += status;
-  send_buffer_last_update = cdtime();
+    send_buffer_fill += status;
+    send_buffer_ptr += status;
+    send_buffer_last_update = cdtime();
 
-  if (wifxudp_config_packet_size - send_buffer_fill < 120)
-    /* No room for a new point of average size in buffer,
-       the probability of fail for the new point is bigger than
-       the probability of success */
-    flush_buffer();
+    if (wifxudp_config_packet_size - send_buffer_fill < 120)
+      /* No room for a new point of average size in buffer,
+         the probability of fail for the new point is bigger than
+         the probability of success */
+      flush_buffer();
 
-  pthread_mutex_unlock(&send_buffer_lock);
+    pthread_mutex_unlock(&send_buffer_lock);
+  }
   return 0;
 } /* int write_influxdb_udp_write */