From: Florian Forster Date: Wed, 8 Jul 2020 21:06:34 +0000 (+0200) Subject: write_http plugin: Implement support for metric_family_t. X-Git-Tag: 6.0.0-rc0~144^2~43 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=65f935572bef512bca1ea904bd9dc5d6d594496f;p=thirdparty%2Fcollectd.git write_http plugin: Implement support for metric_family_t. --- diff --git a/Makefile.am b/Makefile.am index 01c5bb773..f392baa27 100644 --- a/Makefile.am +++ b/Makefile.am @@ -557,10 +557,7 @@ libcmds_la_SOURCES = \ src/utils/cmds/putval.h \ src/utils/cmds/parse_option.c \ src/utils/cmds/parse_option.h -libcmds_la_LIBADD = \ - libcommon.la \ - libmetadata.la \ - -lm +libcmds_la_LIBADD = -lm test_utils_cmds_SOURCES = \ src/utils/cmds/cmds_test.c \ @@ -2262,7 +2259,7 @@ write_http_la_SOURCES = \ src/utils/format_kairosdb/format_kairosdb.h write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS) -write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS) +write_http_la_LIBADD = libcmds.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS) endif if BUILD_PLUGIN_WRITE_INFLUXDB_UDP diff --git a/src/write_http.c b/src/write_http.c index c250747a2..b97672c86 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -2,7 +2,7 @@ * collectd - src/write_http.c * Copyright (C) 2009 Paul Sadauskas * Copyright (C) 2009 Doug MacEachern - * Copyright (C) 2007-2014 Florian octo Forster + * Copyright (C) 2007-2020 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -26,6 +26,7 @@ #include "collectd.h" #include "plugin.h" +#include "utils/cmds/putval.h" #include "utils/common/common.h" #include "utils/curl_stats/curl_stats.h" #include "utils/format_json/format_json.h" @@ -33,10 +34,6 @@ #include -#ifndef WRITE_HTTP_DEFAULT_BUFFER_SIZE -#define WRITE_HTTP_DEFAULT_BUFFER_SIZE 4096 -#endif - #ifndef WRITE_HTTP_DEFAULT_PREFIX #define WRITE_HTTP_DEFAULT_PREFIX "collectd" #endif @@ -76,19 +73,16 @@ struct wh_callback_s { bool send_metrics; bool send_notifications; + pthread_mutex_t curl_lock; CURL *curl; curl_stats_t *curl_stats; struct curl_slist *headers; char curl_errbuf[CURL_ERROR_SIZE]; - char *send_buffer; - size_t send_buffer_size; - size_t send_buffer_free; - size_t send_buffer_fill; + pthread_mutex_t send_buffer_lock; + strbuf_t send_buffer; cdtime_t send_buffer_init_time; - pthread_mutex_t send_lock; - char response_buffer[WRITE_HTTP_RESPONSE_BUFFER_SIZE]; unsigned int response_buffer_pos; @@ -97,9 +91,6 @@ struct wh_callback_s { }; typedef struct wh_callback_s wh_callback_t; -static char **http_attrs; -static size_t http_attrs_num; - /* libcurl may call this multiple times depending on how big the server's * http response is */ @@ -126,50 +117,30 @@ static size_t wh_curl_write_callback(char *ptr, size_t size, size_t nmemb, */ return nmemb; -} /* }}} wh_curl_write_callback */ +} /* wh_curl_write_callback */ static void wh_log_http_error(wh_callback_t *cb) { - if (!cb->log_http_error) + if (!cb->log_http_error) { return; + } long http_code = 0; - curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code); - if (http_code != 200) + if (http_code != 200) { INFO("write_http plugin: HTTP Error code: %lu", http_code); -} - -static void wh_reset_buffer(wh_callback_t *cb) /* {{{ */ -{ - if ((cb == NULL) || (cb->send_buffer == NULL)) - return; - - memset(cb->send_buffer, 0, cb->send_buffer_size); - cb->send_buffer_free = cb->send_buffer_size; - cb->send_buffer_fill = 0; - cb->send_buffer_init_time = cdtime(); - - if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { - format_json_initialize(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free); } +} - memset(cb->response_buffer, 0, sizeof(cb->response_buffer)); - cb->response_buffer_pos = 0; - -} /* }}} wh_reset_buffer */ - -/* must hold cb->send_lock when calling */ -static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ -{ - int status = 0; +/* must hold cb->curl_lock when calling */ +static int wh_post(wh_callback_t *cb, char const *data) { + pthread_mutex_lock(&cb->curl_lock); curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location); curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data); curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback); curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb); - status = curl_easy_perform(cb->curl); + int status = curl_easy_perform(cb->curl); wh_log_http_error(cb); @@ -177,15 +148,12 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ int rc = curl_stats_dispatch(cb->curl_stats, cb->curl, NULL, "write_http", cb->name); if (rc != 0) { - ERROR("write_http plugin: curl_stats_dispatch failed with " - "status %i", - rc); + ERROR("write_http plugin: curl_stats_dispatch failed with status %d", rc); } } if (status != CURLE_OK) { - ERROR("write_http plugin: curl_easy_perform failed with " - "status %i: %s", + ERROR("write_http plugin: curl_easy_perform failed with status %d: %s", status, cb->curl_errbuf); if (strlen(cb->response_buffer) > 0) { ERROR("write_http plugin: curl_response=%s", cb->response_buffer); @@ -193,13 +161,15 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ } else { DEBUG("write_http plugin: curl_response=%s", cb->response_buffer); } + + pthread_mutex_unlock(&cb->curl_lock); return status; -} /* }}} wh_post_nolock */ +} /* wh_post */ -static int wh_callback_init(wh_callback_t *cb) /* {{{ */ -{ - if (cb->curl != NULL) +static int wh_callback_init(wh_callback_t *cb) { + if (cb->curl != NULL) { return 0; + } cb->curl = curl_easy_init(); if (cb->curl == NULL) { @@ -276,99 +246,60 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass); } - wh_reset_buffer(cb); + strbuf_reset(&cb->send_buffer); return 0; -} /* }}} int wh_callback_init */ - -static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ -{ - int status; - - DEBUG("write_http plugin: wh_flush_nolock: timeout = %.3f; " - "send_buffer_fill = %" PRIsz ";", - CDTIME_T_TO_DOUBLE(timeout), cb->send_buffer_fill); - - /* timeout == 0 => flush unconditionally */ - if (timeout > 0) { - cdtime_t now; - - now = cdtime(); - if ((cb->send_buffer_init_time + timeout) > now) - return 0; - } - - if (cb->format == WH_FORMAT_COMMAND) { - if (cb->send_buffer_fill == 0) { - cb->send_buffer_init_time = cdtime(); - return 0; - } - - status = wh_post_nolock(cb, cb->send_buffer); - wh_reset_buffer(cb); - } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { - if (cb->send_buffer_fill <= 2) { - cb->send_buffer_init_time = cdtime(); - return 0; - } +} /* int wh_callback_init */ - status = format_json_finalize(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free); - if (status != 0) { - ERROR("write_http: wh_flush_nolock: " - "format_json_finalize failed."); - wh_reset_buffer(cb); - return status; - } - - status = wh_post_nolock(cb, cb->send_buffer); - wh_reset_buffer(cb); - } else { - ERROR("write_http: wh_flush_nolock: " - "Unknown format: %i", - cb->format); - return -1; - } - - return status; -} /* }}} wh_flush_nolock */ - -static int wh_flush(cdtime_t timeout, /* {{{ */ +static int wh_flush(cdtime_t timeout, const char *identifier __attribute__((unused)), user_data_t *user_data) { - wh_callback_t *cb; - int status; - if (user_data == NULL) return -EINVAL; - cb = user_data->data; + wh_callback_t *cb = user_data->data; - pthread_mutex_lock(&cb->send_lock); + pthread_mutex_lock(&cb->send_buffer_lock); if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock(&cb->send_lock); + pthread_mutex_unlock(&cb->send_buffer_lock); return -1; } - status = wh_flush_nolock(timeout, cb); - pthread_mutex_unlock(&cb->send_lock); + /* timeout == 0 => flush unconditionally */ + if (timeout > 0) { + if ((cb->send_buffer_init_time + timeout) > cdtime()) { + pthread_mutex_unlock(&cb->send_buffer_lock); + return 0; + } + } + + if (cb->send_buffer.pos == 0) { + cb->send_buffer_init_time = cdtime(); + pthread_mutex_unlock(&cb->send_buffer_lock); + return 0; + } - return status; -} /* }}} int wh_flush */ + char const *json = strdup(cb->send_buffer.ptr); + strbuf_reset(&cb->send_buffer); + cb->send_buffer_init_time = cdtime(); + pthread_mutex_unlock(&cb->send_buffer_lock); -static void wh_callback_free(void *data) /* {{{ */ -{ - wh_callback_t *cb; + if (json == NULL) { + return ENOMEM; + } + return wh_post(cb, json); +} /* int wh_flush */ + +static void wh_callback_free(void *data) { if (data == NULL) return; - cb = data; + wh_callback_t *cb = data; - if (cb->send_buffer != NULL) - wh_flush_nolock(/* timeout = */ 0, cb); + wh_flush(/* timeout = */ 0, NULL, &(user_data_t){.data = cb}); if (cb->curl != NULL) { curl_easy_cleanup(cb->curl); @@ -393,205 +324,95 @@ static void wh_callback_free(void *data) /* {{{ */ sfree(cb->clientkey); sfree(cb->clientcert); sfree(cb->clientkeypass); - sfree(cb->send_buffer); sfree(cb->metrics_prefix); sfree(cb); -} /* }}} void wh_callback_free */ - -static int wh_write_command(metric_single_t const *m, /* {{{ */ - wh_callback_t *cb) { - char values[512]; - char command[1024]; - size_t command_len; - - int status; - - /* sanity checks, primarily to make static analyzers happy. */ - if ((cb == NULL) || (cb->send_buffer == NULL)) - return -1; - - /* Copy the identifier to `key' and escape it. */ - char *metric_string_p = NULL; - if ((metric_string_p = metric_marshal_text(m)) != 0) { - return -1; - } - - escape_string(metric_string_p, sizeof(metric_string_p)); +} /* void wh_callback_free */ - /* Convert the values to an ASCII representation and put that into - * `values'. */ - status = format_values(values, sizeof(values), m, cb->store_rates); - if (status != 0) { - ERROR("write_http plugin: error with " - "wh_value_list_to_string"); - sfree(metric_string_p); - return status; - } +static int wh_write_command(metric_family_t const *fam, wh_callback_t *cb) { + pthread_mutex_lock(&cb->send_buffer_lock); - command_len = (size_t)snprintf( - command, sizeof(command), "PUTVAL %s interval=%.3f %s\r\n", - metric_string_p, CDTIME_T_TO_DOUBLE(m->interval), values); - if (command_len >= sizeof(command)) { - ERROR("write_http plugin: Command buffer too small: " - "Need %" PRIsz " bytes.", - command_len + 1); - sfree(metric_string_p); - return -1; - } - sfree(metric_string_p); + int ret = 0; + for (size_t i = 0; i < fam->metric.num; i++) { + metric_t const *m = fam->metric.ptr; - pthread_mutex_lock(&cb->send_lock); - if (wh_callback_init(cb) != 0) { - ERROR("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock(&cb->send_lock); - return -1; - } - - if (command_len >= cb->send_buffer_free) { - status = wh_flush_nolock(/* timeout = */ 0, cb); + int status = cmd_create_putval(&cb->send_buffer, m); if (status != 0) { - pthread_mutex_unlock(&cb->send_lock); - return status; + ERROR("write_http plugin: cmd_create_putval failed: %s", + STRERROR(status)); + ret = ret ? ret : status; + continue; } - } - assert(command_len < cb->send_buffer_free); - - /* Make scan-build happy. */ - assert(cb->send_buffer != NULL); - - /* `command_len + 1' because `command_len' does not include the - * trailing null byte. Neither does `send_buffer_fill'. */ - memcpy(cb->send_buffer + cb->send_buffer_fill, command, command_len + 1); - cb->send_buffer_fill += command_len; - cb->send_buffer_free -= command_len; - - DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%) \"%s\"", - cb->location, cb->send_buffer_fill, cb->send_buffer_size, - 100.0 * ((double)cb->send_buffer_fill) / ((double)cb->send_buffer_size), - command); - - /* Check if we have enough space for this command. */ - pthread_mutex_unlock(&cb->send_lock); - - return 0; -} /* }}} int wh_write_command */ -static int wh_write_json(metric_single_t const *m, /* {{{ */ - wh_callback_t *cb) { - int status; - - pthread_mutex_lock(&cb->send_lock); - if (wh_callback_init(cb) != 0) { - ERROR("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock(&cb->send_lock); - return -1; + strbuf_print(&cb->send_buffer, "\n"); } - status = format_json_metric(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free, m, cb->store_rates); - if (status == -ENOMEM) { - status = wh_flush_nolock(/* timeout = */ 0, cb); - if (status != 0) { - wh_reset_buffer(cb); - pthread_mutex_unlock(&cb->send_lock); - return status; - } + pthread_mutex_unlock(&cb->send_buffer_lock); + return ret; +} /* int wh_write_command */ - status = format_json_metric(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free, m, cb->store_rates); - } +static int wh_write_json(metric_family_t const *fam, wh_callback_t *cb) { + pthread_mutex_lock(&cb->send_buffer_lock); + + int status = + format_json_metric_family(&cb->send_buffer, fam, cb->store_rates); if (status != 0) { - pthread_mutex_unlock(&cb->send_lock); + pthread_mutex_unlock(&cb->send_buffer_lock); + ERROR("write_http plugin: format_json_metric_family failed: %s", + STRERROR(status)); return status; } - DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)", - cb->location, cb->send_buffer_fill, cb->send_buffer_size, - 100.0 * ((double)cb->send_buffer_fill) / - ((double)cb->send_buffer_size)); - - /* Check if we have enough space for this command. */ - pthread_mutex_unlock(&cb->send_lock); - + pthread_mutex_unlock(&cb->send_buffer_lock); return 0; -} /* }}} int wh_write_json */ - -static int wh_write_kairosdb(metric_single_t const *m, /* {{{ */ - wh_callback_t *cb) { - int status; +} /* int wh_write_json */ - pthread_mutex_lock(&cb->send_lock); - - if (cb->curl == NULL) { - status = wh_callback_init(cb); - if (status != 0) { - ERROR("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock(&cb->send_lock); - return -1; - } - } +static int wh_write_kairosdb(metric_family_t const *fam, wh_callback_t *cb) { + format_kairosdb_opts_t opts = { + .store_rates = cb->store_rates, + .ttl_secs = cb->data_ttl, + .metrics_prefix = cb->metrics_prefix, + }; - status = format_kairosdb_metric( - cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, m, - cb->store_rates, (char const *const *)http_attrs, http_attrs_num, - cb->data_ttl, cb->metrics_prefix); - if (status == -ENOMEM) { - status = wh_flush_nolock(/* timeout = */ 0, cb); - if (status != 0) { - wh_reset_buffer(cb); - pthread_mutex_unlock(&cb->send_lock); - return status; - } + pthread_mutex_lock(&cb->send_buffer_lock); - status = format_kairosdb_metric( - cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, m, - cb->store_rates, (char const *const *)http_attrs, http_attrs_num, - cb->data_ttl, cb->metrics_prefix); - } + int status = format_kairosdb_metric_family(&cb->send_buffer, fam, &opts); if (status != 0) { - pthread_mutex_unlock(&cb->send_lock); + pthread_mutex_unlock(&cb->send_buffer_lock); + ERROR("write_http plugin: format_kairosdb_metric_family failed: %s", + STRERROR(status)); return status; } - DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)", - cb->location, cb->send_buffer_fill, cb->send_buffer_size, - 100.0 * ((double)cb->send_buffer_fill) / - ((double)cb->send_buffer_size)); - - /* Check if we have enough space for this command. */ - pthread_mutex_unlock(&cb->send_lock); - + pthread_mutex_unlock(&cb->send_buffer_lock); return 0; -} /* }}} int wh_write_kairosdb */ +} /* int wh_write_kairosdb */ -static int wh_write(metric_single_t const *m, /* {{{ */ - user_data_t *user_data) { - wh_callback_t *cb; - int status; +static int wh_write(metric_family_t const *fam, user_data_t *user_data) { + if ((fam == NULL) || (user_data == NULL)) { + return EINVAL; + } - if (user_data == NULL) - return -EINVAL; + wh_callback_t *cb = user_data->data; - cb = user_data->data; assert(cb->send_metrics); + int status = EINVAL; switch (cb->format) { case WH_FORMAT_JSON: - status = wh_write_json(m, cb); + status = wh_write_json(fam, cb); break; case WH_FORMAT_KAIROSDB: - status = wh_write_kairosdb(m, cb); + status = wh_write_kairosdb(fam, cb); break; default: - status = wh_write_command(m, cb); + status = wh_write_command(fam, cb); break; } return status; -} /* }}} int wh_write */ +} /* int wh_write */ -static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ -{ +static int wh_notify(notification_t const *n, user_data_t *ud) { wh_callback_t *cb; char alert[4096]; int status; @@ -608,21 +429,20 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ return status; } - pthread_mutex_lock(&cb->send_lock); + pthread_mutex_lock(&cb->send_buffer_lock); if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock(&cb->send_lock); + pthread_mutex_unlock(&cb->send_buffer_lock); return -1; } - status = wh_post_nolock(cb, alert); - pthread_mutex_unlock(&cb->send_lock); + status = wh_post(cb, alert); + pthread_mutex_unlock(&cb->send_buffer_lock); return status; -} /* }}} int wh_notify */ +} /* int wh_notify */ -static int config_set_format(wh_callback_t *cb, /* {{{ */ - oconfig_item_t *ci) { +static int config_set_format(wh_callback_t *cb, oconfig_item_t *ci) { char *string; if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { @@ -645,10 +465,9 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ } return 0; -} /* }}} int config_set_format */ +} /* int config_set_format */ -static int wh_config_append_string(const char *name, - struct curl_slist **dest, /* {{{ */ +static int wh_config_append_string(const char *name, struct curl_slist **dest, oconfig_item_t *ci) { struct curl_slist *temp = NULL; if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { @@ -663,10 +482,9 @@ static int wh_config_append_string(const char *name, *dest = temp; return 0; -} /* }}} int wh_config_append_string */ +} /* int wh_config_append_string */ -static int wh_config_node(oconfig_item_t *ci) /* {{{ */ -{ +static int wh_config_node(oconfig_item_t *ci) { wh_callback_t *cb; int buffer_size = 0; char callback_name[DATA_MAX_NAME_LEN]; @@ -697,7 +515,8 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ return -1; } - pthread_mutex_init(&cb->send_lock, /* attr = */ NULL); + pthread_mutex_init(&cb->curl_lock, /* attr = */ NULL); + pthread_mutex_init(&cb->send_buffer_lock, /* attr = */ NULL); cf_util_get_string(ci, &cb->name); @@ -769,7 +588,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ if (cb->curl_stats == NULL) status = -1; } else if (strcasecmp("Notifications", child->key) == 0) - cf_util_get_boolean(child, &cb->send_notifications); + status = cf_util_get_boolean(child, &cb->send_notifications); else if (strcasecmp("StoreRates", child->key) == 0) status = cf_util_get_boolean(child, &cb->store_rates); else if (strcasecmp("BufferSize", child->key) == 0) @@ -782,38 +601,11 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_boolean(child, &cb->log_http_error); else if (strcasecmp("Header", child->key) == 0) status = wh_config_append_string("Header", &cb->headers, child); - else if (strcasecmp("Attribute", child->key) == 0) { - char *key = NULL; - char *val = NULL; - - if (child->values_num != 2) { - WARNING("write_http plugin: Attribute need both a key and a value."); - break; - } - if (child->values[0].type != OCONFIG_TYPE_STRING || - child->values[1].type != OCONFIG_TYPE_STRING) { - WARNING("write_http plugin: Attribute needs string arguments."); - break; - } - if ((key = strdup(child->values[0].value.string)) == NULL) { - WARNING("cannot allocate memory for attribute key."); - break; - } - if ((val = strdup(child->values[1].value.string)) == NULL) { - WARNING("cannot allocate memory for attribute value."); - sfree(key); - break; - } - strarray_add(&http_attrs, &http_attrs_num, key); - strarray_add(&http_attrs, &http_attrs_num, val); - DEBUG("write_http plugin: got attribute: %s => %s", key, val); - sfree(key); - sfree(val); - } else if (strcasecmp("TTL", child->key) == 0) { + else if (strcasecmp("TTL", child->key) == 0) status = cf_util_get_int(child, &cb->data_ttl); - } else if (strcasecmp("Prefix", child->key) == 0) { + else if (strcasecmp("Prefix", child->key) == 0) status = cf_util_get_string(child, &cb->metrics_prefix); - } else { + else { ERROR("write_http plugin: Invalid configuration " "option: %s.", child->key); @@ -849,25 +641,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ if (cb->low_speed_limit > 0) cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval()); - /* Determine send_buffer_size. */ - cb->send_buffer_size = WRITE_HTTP_DEFAULT_BUFFER_SIZE; - if (buffer_size >= 1024) - cb->send_buffer_size = (size_t)buffer_size; - else if (buffer_size != 0) - ERROR("write_http plugin: Ignoring invalid BufferSize setting (%d).", - buffer_size); - - /* Allocate the buffer. */ - cb->send_buffer = malloc(cb->send_buffer_size); - if (cb->send_buffer == NULL) { - ERROR("write_http plugin: malloc(%" PRIsz ") failed.", - cb->send_buffer_size); - wh_callback_free(cb); - return -1; - } - - /* Nulls the buffer and sets ..._free and ..._fill. */ - wh_reset_buffer(cb); + cb->send_buffer = STRBUF_CREATE; snprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name); DEBUG("write_http: Registering write callback '%s' with URL '%s'", @@ -879,6 +653,11 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ }; if (cb->send_metrics) { + /* This causes wh_flush to be called periodically. */ + plugin_ctx_t ctx = plugin_get_ctx(); + ctx.flush_interval = plugin_get_interval(); + plugin_set_ctx(ctx); + plugin_register_write(callback_name, wh_write, &user_data); user_data.free_func = NULL; @@ -891,40 +670,30 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ } return 0; -} /* }}} int wh_config_node */ +} /* int wh_config_node */ -static int wh_config(oconfig_item_t *ci) /* {{{ */ -{ +static int wh_config(oconfig_item_t *ci) { for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; - if (strcasecmp("Node", child->key) == 0) - wh_config_node(child); - /* FIXME: Remove this legacy mode in version 6. */ - else if (strcasecmp("URL", child->key) == 0) { - WARNING("write_http plugin: Legacy block found. " - "Please use instead."); + if (strcasecmp("Node", child->key) == 0) { wh_config_node(child); } else { - ERROR("write_http plugin: Invalid configuration " - "option: %s.", - child->key); + ERROR("write_http plugin: Invalid configuration option: %s.", child->key); } } return 0; -} /* }}} int wh_config */ +} /* int wh_config */ -static int wh_init(void) /* {{{ */ -{ +static int wh_init(void) { /* Call this while collectd is still single-threaded to avoid * initialization issues in libgcrypt. */ curl_global_init(CURL_GLOBAL_SSL); return 0; -} /* }}} int wh_init */ +} /* int wh_init */ -void module_register(void) /* {{{ */ -{ +void module_register(void) { plugin_register_complex_config("write_http", wh_config); plugin_register_init("write_http", wh_init); -} /* }}} void module_register */ +} /* void module_register */