]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_http plugin: Implement support for metric_family_t.
authorFlorian Forster <octo@collectd.org>
Wed, 8 Jul 2020 21:06:34 +0000 (23:06 +0200)
committerFlorian Forster <octo@google.com>
Wed, 29 Jul 2020 11:40:54 +0000 (13:40 +0200)
Makefile.am
src/write_http.c

index 01c5bb773595d93cd1b668d28b96139ba7bb6954..f392baa27f98c0d6336caae0b4cc4c558e37215b 100644 (file)
@@ -557,10 +557,7 @@ libcmds_la_SOURCES = \
        src/utils/cmds/putval.h \
        src/utils/cmds/parse_option.c \
        src/utils/cmds/parse_option.h
-libcmds_la_LIBADD = \
-       libcommon.la \
-       libmetadata.la \
-       -lm
+libcmds_la_LIBADD = -lm
 
 test_utils_cmds_SOURCES = \
        src/utils/cmds/cmds_test.c \
@@ -2262,7 +2259,7 @@ write_http_la_SOURCES = \
        src/utils/format_kairosdb/format_kairosdb.h
 write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
 write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_LIBADD = libcmds.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
 endif
 
 if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
index c250747a2dc30dea0bcfcc7d576733644422a25c..b97672c86bad66163e2357575bd5e47d5799d855 100644 (file)
@@ -2,7 +2,7 @@
  * collectd - src/write_http.c
  * Copyright (C) 2009       Paul Sadauskas
  * Copyright (C) 2009       Doug MacEachern
- * Copyright (C) 2007-2014  Florian octo Forster
+ * Copyright (C) 2007-2020  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -26,6 +26,7 @@
 #include "collectd.h"
 
 #include "plugin.h"
+#include "utils/cmds/putval.h"
 #include "utils/common/common.h"
 #include "utils/curl_stats/curl_stats.h"
 #include "utils/format_json/format_json.h"
 
 #include <curl/curl.h>
 
-#ifndef WRITE_HTTP_DEFAULT_BUFFER_SIZE
-#define WRITE_HTTP_DEFAULT_BUFFER_SIZE 4096
-#endif
-
 #ifndef WRITE_HTTP_DEFAULT_PREFIX
 #define WRITE_HTTP_DEFAULT_PREFIX "collectd"
 #endif
@@ -76,19 +73,16 @@ struct wh_callback_s {
   bool send_metrics;
   bool send_notifications;
 
+  pthread_mutex_t curl_lock;
   CURL *curl;
   curl_stats_t *curl_stats;
   struct curl_slist *headers;
   char curl_errbuf[CURL_ERROR_SIZE];
 
-  char *send_buffer;
-  size_t send_buffer_size;
-  size_t send_buffer_free;
-  size_t send_buffer_fill;
+  pthread_mutex_t send_buffer_lock;
+  strbuf_t send_buffer;
   cdtime_t send_buffer_init_time;
 
-  pthread_mutex_t send_lock;
-
   char response_buffer[WRITE_HTTP_RESPONSE_BUFFER_SIZE];
   unsigned int response_buffer_pos;
 
@@ -97,9 +91,6 @@ struct wh_callback_s {
 };
 typedef struct wh_callback_s wh_callback_t;
 
-static char **http_attrs;
-static size_t http_attrs_num;
-
 /* libcurl may call this multiple times depending on how big the server's
  * http response is
  */
@@ -126,50 +117,30 @@ static size_t wh_curl_write_callback(char *ptr, size_t size, size_t nmemb,
    */
   return nmemb;
 
-} /* }}} wh_curl_write_callback */
+} /* wh_curl_write_callback */
 
 static void wh_log_http_error(wh_callback_t *cb) {
-  if (!cb->log_http_error)
+  if (!cb->log_http_error) {
     return;
+  }
 
   long http_code = 0;
-
   curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
 
-  if (http_code != 200)
+  if (http_code != 200) {
     INFO("write_http plugin: HTTP Error code: %lu", http_code);
-}
-
-static void wh_reset_buffer(wh_callback_t *cb) /* {{{ */
-{
-  if ((cb == NULL) || (cb->send_buffer == NULL))
-    return;
-
-  memset(cb->send_buffer, 0, cb->send_buffer_size);
-  cb->send_buffer_free = cb->send_buffer_size;
-  cb->send_buffer_fill = 0;
-  cb->send_buffer_init_time = cdtime();
-
-  if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) {
-    format_json_initialize(cb->send_buffer, &cb->send_buffer_fill,
-                           &cb->send_buffer_free);
   }
+}
 
-  memset(cb->response_buffer, 0, sizeof(cb->response_buffer));
-  cb->response_buffer_pos = 0;
-
-} /* }}} wh_reset_buffer */
-
-/* must hold cb->send_lock when calling */
-static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */
-{
-  int status = 0;
+/* must hold cb->curl_lock when calling */
+static int wh_post(wh_callback_t *cb, char const *data) {
+  pthread_mutex_lock(&cb->curl_lock);
 
   curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb);
-  status = curl_easy_perform(cb->curl);
+  int status = curl_easy_perform(cb->curl);
 
   wh_log_http_error(cb);
 
@@ -177,15 +148,12 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */
     int rc = curl_stats_dispatch(cb->curl_stats, cb->curl, NULL, "write_http",
                                  cb->name);
     if (rc != 0) {
-      ERROR("write_http plugin: curl_stats_dispatch failed with "
-            "status %i",
-            rc);
+      ERROR("write_http plugin: curl_stats_dispatch failed with status %d", rc);
     }
   }
 
   if (status != CURLE_OK) {
-    ERROR("write_http plugin: curl_easy_perform failed with "
-          "status %i: %s",
+    ERROR("write_http plugin: curl_easy_perform failed with status %d: %s",
           status, cb->curl_errbuf);
     if (strlen(cb->response_buffer) > 0) {
       ERROR("write_http plugin: curl_response=%s", cb->response_buffer);
@@ -193,13 +161,15 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */
   } else {
     DEBUG("write_http plugin: curl_response=%s", cb->response_buffer);
   }
+
+  pthread_mutex_unlock(&cb->curl_lock);
   return status;
-} /* }}} wh_post_nolock */
+} /* wh_post */
 
-static int wh_callback_init(wh_callback_t *cb) /* {{{ */
-{
-  if (cb->curl != NULL)
+static int wh_callback_init(wh_callback_t *cb) {
+  if (cb->curl != NULL) {
     return 0;
+  }
 
   cb->curl = curl_easy_init();
   if (cb->curl == NULL) {
@@ -276,99 +246,60 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */
       curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
   }
 
-  wh_reset_buffer(cb);
+  strbuf_reset(&cb->send_buffer);
 
   return 0;
-} /* }}} int wh_callback_init */
-
-static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */
-{
-  int status;
-
-  DEBUG("write_http plugin: wh_flush_nolock: timeout = %.3f; "
-        "send_buffer_fill = %" PRIsz ";",
-        CDTIME_T_TO_DOUBLE(timeout), cb->send_buffer_fill);
-
-  /* timeout == 0  => flush unconditionally */
-  if (timeout > 0) {
-    cdtime_t now;
-
-    now = cdtime();
-    if ((cb->send_buffer_init_time + timeout) > now)
-      return 0;
-  }
-
-  if (cb->format == WH_FORMAT_COMMAND) {
-    if (cb->send_buffer_fill == 0) {
-      cb->send_buffer_init_time = cdtime();
-      return 0;
-    }
-
-    status = wh_post_nolock(cb, cb->send_buffer);
-    wh_reset_buffer(cb);
-  } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) {
-    if (cb->send_buffer_fill <= 2) {
-      cb->send_buffer_init_time = cdtime();
-      return 0;
-    }
+} /* int wh_callback_init */
 
-    status = format_json_finalize(cb->send_buffer, &cb->send_buffer_fill,
-                                  &cb->send_buffer_free);
-    if (status != 0) {
-      ERROR("write_http: wh_flush_nolock: "
-            "format_json_finalize failed.");
-      wh_reset_buffer(cb);
-      return status;
-    }
-
-    status = wh_post_nolock(cb, cb->send_buffer);
-    wh_reset_buffer(cb);
-  } else {
-    ERROR("write_http: wh_flush_nolock: "
-          "Unknown format: %i",
-          cb->format);
-    return -1;
-  }
-
-  return status;
-} /* }}} wh_flush_nolock */
-
-static int wh_flush(cdtime_t timeout, /* {{{ */
+static int wh_flush(cdtime_t timeout,
                     const char *identifier __attribute__((unused)),
                     user_data_t *user_data) {
-  wh_callback_t *cb;
-  int status;
-
   if (user_data == NULL)
     return -EINVAL;
 
-  cb = user_data->data;
+  wh_callback_t *cb = user_data->data;
 
-  pthread_mutex_lock(&cb->send_lock);
+  pthread_mutex_lock(&cb->send_buffer_lock);
 
   if (wh_callback_init(cb) != 0) {
     ERROR("write_http plugin: wh_callback_init failed.");
-    pthread_mutex_unlock(&cb->send_lock);
+    pthread_mutex_unlock(&cb->send_buffer_lock);
     return -1;
   }
 
-  status = wh_flush_nolock(timeout, cb);
-  pthread_mutex_unlock(&cb->send_lock);
+  /* timeout == 0  => flush unconditionally */
+  if (timeout > 0) {
+    if ((cb->send_buffer_init_time + timeout) > cdtime()) {
+      pthread_mutex_unlock(&cb->send_buffer_lock);
+      return 0;
+    }
+  }
+
+  if (cb->send_buffer.pos == 0) {
+    cb->send_buffer_init_time = cdtime();
+    pthread_mutex_unlock(&cb->send_buffer_lock);
+    return 0;
+  }
 
-  return status;
-} /* }}} int wh_flush */
+  char const *json = strdup(cb->send_buffer.ptr);
+  strbuf_reset(&cb->send_buffer);
+  cb->send_buffer_init_time = cdtime();
+  pthread_mutex_unlock(&cb->send_buffer_lock);
 
-static void wh_callback_free(void *data) /* {{{ */
-{
-  wh_callback_t *cb;
+  if (json == NULL) {
+    return ENOMEM;
+  }
 
+  return wh_post(cb, json);
+} /* int wh_flush */
+
+static void wh_callback_free(void *data) {
   if (data == NULL)
     return;
 
-  cb = data;
+  wh_callback_t *cb = data;
 
-  if (cb->send_buffer != NULL)
-    wh_flush_nolock(/* timeout = */ 0, cb);
+  wh_flush(/* timeout = */ 0, NULL, &(user_data_t){.data = cb});
 
   if (cb->curl != NULL) {
     curl_easy_cleanup(cb->curl);
@@ -393,205 +324,95 @@ static void wh_callback_free(void *data) /* {{{ */
   sfree(cb->clientkey);
   sfree(cb->clientcert);
   sfree(cb->clientkeypass);
-  sfree(cb->send_buffer);
   sfree(cb->metrics_prefix);
 
   sfree(cb);
-} /* }}} void wh_callback_free */
-
-static int wh_write_command(metric_single_t const *m, /* {{{ */
-                            wh_callback_t *cb) {
-  char values[512];
-  char command[1024];
-  size_t command_len;
-
-  int status;
-
-  /* sanity checks, primarily to make static analyzers happy. */
-  if ((cb == NULL) || (cb->send_buffer == NULL))
-    return -1;
-
-  /* Copy the identifier to `key' and escape it. */
-  char *metric_string_p = NULL;
-  if ((metric_string_p = metric_marshal_text(m)) != 0) {
-    return -1;
-  }
-
-  escape_string(metric_string_p, sizeof(metric_string_p));
+} /* void wh_callback_free */
 
-  /* Convert the values to an ASCII representation and put that into
-   * `values'. */
-  status = format_values(values, sizeof(values), m, cb->store_rates);
-  if (status != 0) {
-    ERROR("write_http plugin: error with "
-          "wh_value_list_to_string");
-    sfree(metric_string_p);
-    return status;
-  }
+static int wh_write_command(metric_family_t const *fam, wh_callback_t *cb) {
+  pthread_mutex_lock(&cb->send_buffer_lock);
 
-  command_len = (size_t)snprintf(
-      command, sizeof(command), "PUTVAL %s interval=%.3f %s\r\n",
-      metric_string_p, CDTIME_T_TO_DOUBLE(m->interval), values);
-  if (command_len >= sizeof(command)) {
-    ERROR("write_http plugin: Command buffer too small: "
-          "Need %" PRIsz " bytes.",
-          command_len + 1);
-    sfree(metric_string_p);
-    return -1;
-  }
-  sfree(metric_string_p);
+  int ret = 0;
+  for (size_t i = 0; i < fam->metric.num; i++) {
+    metric_t const *m = fam->metric.ptr;
 
-  pthread_mutex_lock(&cb->send_lock);
-  if (wh_callback_init(cb) != 0) {
-    ERROR("write_http plugin: wh_callback_init failed.");
-    pthread_mutex_unlock(&cb->send_lock);
-    return -1;
-  }
-
-  if (command_len >= cb->send_buffer_free) {
-    status = wh_flush_nolock(/* timeout = */ 0, cb);
+    int status = cmd_create_putval(&cb->send_buffer, m);
     if (status != 0) {
-      pthread_mutex_unlock(&cb->send_lock);
-      return status;
+      ERROR("write_http plugin: cmd_create_putval failed: %s",
+            STRERROR(status));
+      ret = ret ? ret : status;
+      continue;
     }
-  }
-  assert(command_len < cb->send_buffer_free);
-
-  /* Make scan-build happy. */
-  assert(cb->send_buffer != NULL);
-
-  /* `command_len + 1' because `command_len' does not include the
-   * trailing null byte. Neither does `send_buffer_fill'. */
-  memcpy(cb->send_buffer + cb->send_buffer_fill, command, command_len + 1);
-  cb->send_buffer_fill += command_len;
-  cb->send_buffer_free -= command_len;
-
-  DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%) \"%s\"",
-        cb->location, cb->send_buffer_fill, cb->send_buffer_size,
-        100.0 * ((double)cb->send_buffer_fill) / ((double)cb->send_buffer_size),
-        command);
-
-  /* Check if we have enough space for this command. */
-  pthread_mutex_unlock(&cb->send_lock);
-
-  return 0;
-} /* }}} int wh_write_command */
 
-static int wh_write_json(metric_single_t const *m, /* {{{ */
-                         wh_callback_t *cb) {
-  int status;
-
-  pthread_mutex_lock(&cb->send_lock);
-  if (wh_callback_init(cb) != 0) {
-    ERROR("write_http plugin: wh_callback_init failed.");
-    pthread_mutex_unlock(&cb->send_lock);
-    return -1;
+    strbuf_print(&cb->send_buffer, "\n");
   }
 
-  status = format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
-                              &cb->send_buffer_free, m, cb->store_rates);
-  if (status == -ENOMEM) {
-    status = wh_flush_nolock(/* timeout = */ 0, cb);
-    if (status != 0) {
-      wh_reset_buffer(cb);
-      pthread_mutex_unlock(&cb->send_lock);
-      return status;
-    }
+  pthread_mutex_unlock(&cb->send_buffer_lock);
+  return ret;
+} /* int wh_write_command */
 
-    status = format_json_metric(cb->send_buffer, &cb->send_buffer_fill,
-                                &cb->send_buffer_free, m, cb->store_rates);
-  }
+static int wh_write_json(metric_family_t const *fam, wh_callback_t *cb) {
+  pthread_mutex_lock(&cb->send_buffer_lock);
+
+  int status =
+      format_json_metric_family(&cb->send_buffer, fam, cb->store_rates);
   if (status != 0) {
-    pthread_mutex_unlock(&cb->send_lock);
+    pthread_mutex_unlock(&cb->send_buffer_lock);
+    ERROR("write_http plugin: format_json_metric_family failed: %s",
+          STRERROR(status));
     return status;
   }
 
-  DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)",
-        cb->location, cb->send_buffer_fill, cb->send_buffer_size,
-        100.0 * ((double)cb->send_buffer_fill) /
-            ((double)cb->send_buffer_size));
-
-  /* Check if we have enough space for this command. */
-  pthread_mutex_unlock(&cb->send_lock);
-
+  pthread_mutex_unlock(&cb->send_buffer_lock);
   return 0;
-} /* }}} int wh_write_json */
-
-static int wh_write_kairosdb(metric_single_t const *m, /* {{{ */
-                             wh_callback_t *cb) {
-  int status;
+} /* int wh_write_json */
 
-  pthread_mutex_lock(&cb->send_lock);
-
-  if (cb->curl == NULL) {
-    status = wh_callback_init(cb);
-    if (status != 0) {
-      ERROR("write_http plugin: wh_callback_init failed.");
-      pthread_mutex_unlock(&cb->send_lock);
-      return -1;
-    }
-  }
+static int wh_write_kairosdb(metric_family_t const *fam, wh_callback_t *cb) {
+  format_kairosdb_opts_t opts = {
+      .store_rates = cb->store_rates,
+      .ttl_secs = cb->data_ttl,
+      .metrics_prefix = cb->metrics_prefix,
+  };
 
-  status = format_kairosdb_metric(
-      cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, m,
-      cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
-      cb->data_ttl, cb->metrics_prefix);
-  if (status == -ENOMEM) {
-    status = wh_flush_nolock(/* timeout = */ 0, cb);
-    if (status != 0) {
-      wh_reset_buffer(cb);
-      pthread_mutex_unlock(&cb->send_lock);
-      return status;
-    }
+  pthread_mutex_lock(&cb->send_buffer_lock);
 
-    status = format_kairosdb_metric(
-        cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, m,
-        cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
-        cb->data_ttl, cb->metrics_prefix);
-  }
+  int status = format_kairosdb_metric_family(&cb->send_buffer, fam, &opts);
   if (status != 0) {
-    pthread_mutex_unlock(&cb->send_lock);
+    pthread_mutex_unlock(&cb->send_buffer_lock);
+    ERROR("write_http plugin: format_kairosdb_metric_family failed: %s",
+          STRERROR(status));
     return status;
   }
 
-  DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)",
-        cb->location, cb->send_buffer_fill, cb->send_buffer_size,
-        100.0 * ((double)cb->send_buffer_fill) /
-            ((double)cb->send_buffer_size));
-
-  /* Check if we have enough space for this command. */
-  pthread_mutex_unlock(&cb->send_lock);
-
+  pthread_mutex_unlock(&cb->send_buffer_lock);
   return 0;
-} /* }}} int wh_write_kairosdb */
+} /* int wh_write_kairosdb */
 
-static int wh_write(metric_single_t const *m, /* {{{ */
-                    user_data_t *user_data) {
-  wh_callback_t *cb;
-  int status;
+static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
+  if ((fam == NULL) || (user_data == NULL)) {
+    return EINVAL;
+  }
 
-  if (user_data == NULL)
-    return -EINVAL;
+  wh_callback_t *cb = user_data->data;
 
-  cb = user_data->data;
   assert(cb->send_metrics);
 
+  int status = EINVAL;
   switch (cb->format) {
   case WH_FORMAT_JSON:
-    status = wh_write_json(m, cb);
+    status = wh_write_json(fam, cb);
     break;
   case WH_FORMAT_KAIROSDB:
-    status = wh_write_kairosdb(m, cb);
+    status = wh_write_kairosdb(fam, cb);
     break;
   default:
-    status = wh_write_command(m, cb);
+    status = wh_write_command(fam, cb);
     break;
   }
   return status;
-} /* }}} int wh_write */
+} /* int wh_write */
 
-static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */
-{
+static int wh_notify(notification_t const *n, user_data_t *ud) {
   wh_callback_t *cb;
   char alert[4096];
   int status;
@@ -608,21 +429,20 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */
     return status;
   }
 
-  pthread_mutex_lock(&cb->send_lock);
+  pthread_mutex_lock(&cb->send_buffer_lock);
   if (wh_callback_init(cb) != 0) {
     ERROR("write_http plugin: wh_callback_init failed.");
-    pthread_mutex_unlock(&cb->send_lock);
+    pthread_mutex_unlock(&cb->send_buffer_lock);
     return -1;
   }
 
-  status = wh_post_nolock(cb, alert);
-  pthread_mutex_unlock(&cb->send_lock);
+  status = wh_post(cb, alert);
+  pthread_mutex_unlock(&cb->send_buffer_lock);
 
   return status;
-} /* }}} int wh_notify */
+} /* int wh_notify */
 
-static int config_set_format(wh_callback_t *cb, /* {{{ */
-                             oconfig_item_t *ci) {
+static int config_set_format(wh_callback_t *cb, oconfig_item_t *ci) {
   char *string;
 
   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
@@ -645,10 +465,9 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */
   }
 
   return 0;
-} /* }}} int config_set_format */
+} /* int config_set_format */
 
-static int wh_config_append_string(const char *name,
-                                   struct curl_slist **dest, /* {{{ */
+static int wh_config_append_string(const char *name, struct curl_slist **dest,
                                    oconfig_item_t *ci) {
   struct curl_slist *temp = NULL;
   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
@@ -663,10 +482,9 @@ static int wh_config_append_string(const char *name,
   *dest = temp;
 
   return 0;
-} /* }}} int wh_config_append_string */
+} /* int wh_config_append_string */
 
-static int wh_config_node(oconfig_item_t *ci) /* {{{ */
-{
+static int wh_config_node(oconfig_item_t *ci) {
   wh_callback_t *cb;
   int buffer_size = 0;
   char callback_name[DATA_MAX_NAME_LEN];
@@ -697,7 +515,8 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
     return -1;
   }
 
-  pthread_mutex_init(&cb->send_lock, /* attr = */ NULL);
+  pthread_mutex_init(&cb->curl_lock, /* attr = */ NULL);
+  pthread_mutex_init(&cb->send_buffer_lock, /* attr = */ NULL);
 
   cf_util_get_string(ci, &cb->name);
 
@@ -769,7 +588,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
       if (cb->curl_stats == NULL)
         status = -1;
     } else if (strcasecmp("Notifications", child->key) == 0)
-      cf_util_get_boolean(child, &cb->send_notifications);
+      status = cf_util_get_boolean(child, &cb->send_notifications);
     else if (strcasecmp("StoreRates", child->key) == 0)
       status = cf_util_get_boolean(child, &cb->store_rates);
     else if (strcasecmp("BufferSize", child->key) == 0)
@@ -782,38 +601,11 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_boolean(child, &cb->log_http_error);
     else if (strcasecmp("Header", child->key) == 0)
       status = wh_config_append_string("Header", &cb->headers, child);
-    else if (strcasecmp("Attribute", child->key) == 0) {
-      char *key = NULL;
-      char *val = NULL;
-
-      if (child->values_num != 2) {
-        WARNING("write_http plugin: Attribute need both a key and a value.");
-        break;
-      }
-      if (child->values[0].type != OCONFIG_TYPE_STRING ||
-          child->values[1].type != OCONFIG_TYPE_STRING) {
-        WARNING("write_http plugin: Attribute needs string arguments.");
-        break;
-      }
-      if ((key = strdup(child->values[0].value.string)) == NULL) {
-        WARNING("cannot allocate memory for attribute key.");
-        break;
-      }
-      if ((val = strdup(child->values[1].value.string)) == NULL) {
-        WARNING("cannot allocate memory for attribute value.");
-        sfree(key);
-        break;
-      }
-      strarray_add(&http_attrs, &http_attrs_num, key);
-      strarray_add(&http_attrs, &http_attrs_num, val);
-      DEBUG("write_http plugin: got attribute: %s => %s", key, val);
-      sfree(key);
-      sfree(val);
-    } else if (strcasecmp("TTL", child->key) == 0) {
+    else if (strcasecmp("TTL", child->key) == 0)
       status = cf_util_get_int(child, &cb->data_ttl);
-    } else if (strcasecmp("Prefix", child->key) == 0) {
+    else if (strcasecmp("Prefix", child->key) == 0)
       status = cf_util_get_string(child, &cb->metrics_prefix);
-    else {
+    else {
       ERROR("write_http plugin: Invalid configuration "
             "option: %s.",
             child->key);
@@ -849,25 +641,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
   if (cb->low_speed_limit > 0)
     cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval());
 
-  /* Determine send_buffer_size. */
-  cb->send_buffer_size = WRITE_HTTP_DEFAULT_BUFFER_SIZE;
-  if (buffer_size >= 1024)
-    cb->send_buffer_size = (size_t)buffer_size;
-  else if (buffer_size != 0)
-    ERROR("write_http plugin: Ignoring invalid BufferSize setting (%d).",
-          buffer_size);
-
-  /* Allocate the buffer. */
-  cb->send_buffer = malloc(cb->send_buffer_size);
-  if (cb->send_buffer == NULL) {
-    ERROR("write_http plugin: malloc(%" PRIsz ") failed.",
-          cb->send_buffer_size);
-    wh_callback_free(cb);
-    return -1;
-  }
-
-  /* Nulls the buffer and sets ..._free and ..._fill. */
-  wh_reset_buffer(cb);
+  cb->send_buffer = STRBUF_CREATE;
 
   snprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name);
   DEBUG("write_http: Registering write callback '%s' with URL '%s'",
@@ -879,6 +653,11 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
   };
 
   if (cb->send_metrics) {
+    /* This causes wh_flush to be called periodically. */
+    plugin_ctx_t ctx = plugin_get_ctx();
+    ctx.flush_interval = plugin_get_interval();
+    plugin_set_ctx(ctx);
+
     plugin_register_write(callback_name, wh_write, &user_data);
     user_data.free_func = NULL;
 
@@ -891,40 +670,30 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
   }
 
   return 0;
-} /* }}} int wh_config_node */
+} /* int wh_config_node */
 
-static int wh_config(oconfig_item_t *ci) /* {{{ */
-{
+static int wh_config(oconfig_item_t *ci) {
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
 
-    if (strcasecmp("Node", child->key) == 0)
-      wh_config_node(child);
-    /* FIXME: Remove this legacy mode in version 6. */
-    else if (strcasecmp("URL", child->key) == 0) {
-      WARNING("write_http plugin: Legacy <URL> block found. "
-              "Please use <Node> instead.");
+    if (strcasecmp("Node", child->key) == 0) {
       wh_config_node(child);
     } else {
-      ERROR("write_http plugin: Invalid configuration "
-            "option: %s.",
-            child->key);
+      ERROR("write_http plugin: Invalid configuration option: %s.", child->key);
     }
   }
 
   return 0;
-} /* }}} int wh_config */
+} /* int wh_config */
 
-static int wh_init(void) /* {{{ */
-{
+static int wh_init(void) {
   /* Call this while collectd is still single-threaded to avoid
    * initialization issues in libgcrypt. */
   curl_global_init(CURL_GLOBAL_SSL);
   return 0;
-} /* }}} int wh_init */
+} /* int wh_init */
 
-void module_register(void) /* {{{ */
-{
+void module_register(void) {
   plugin_register_complex_config("write_http", wh_config);
   plugin_register_init("write_http", wh_init);
-} /* }}} void module_register */
+} /* void module_register */