From 55efb56a8d56f5b37bfdad430801e731210ecd1f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Leonard=20G=C3=B6hrs?= Date: Tue, 19 Jul 2022 11:20:09 +0200 Subject: [PATCH] [collectd 6] src/daemon/plugin.c: Use one thread per write plugin MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit ChangeLog: collectd: Use one write thread per write plugin The previous write thread design used a single queue with a single read head from which one of the write threads would de-queue an element and would then sequentially call each registered write callback. This meant that all write plugins would have to cooperate in order to not drop values. If for example all write threads are stalled by the same write plugin's callback function not returning, the queue will start to fill up until elements start to be dropped, even though there are other plugins that could still make progress. In addition to that, all write callbacks have to be designed to be reentrant right now, which increases complexity. This new design uses a single linked-list write queue with one read head per output plugin. Each output plugin is serviced in a dedicated write thread. Elements are freed based on a reference count, which is shown in the ASCII-Art below: +- Thread #1 Head +- Thread #2 Head +- Tail v v v +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ | 0|->| 1|->| 1|->| 1|->| 1|->| 2|->| 2|->| 2|->| 2|->| 2|->X +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ +--+ ^ +- to be free()d The changes introduced by this commit have some side-effects: - The WriteThreads config option no longer exists, as a strict 1:1 ratio of write plugins and write threads is used. - The data flow has changed. The previous data flow was: (From one of the ReadThreads) plugin_dispatch_{values,multivalue}() plugin_dispatch_metric_family() enqueue_metric_family() write_queue_enqueue() -----{Queue}----+ | (In one of the WriteThreads threads) | plugin_write_thread() | ^- plugin_write_dequeue() <-+ plugin_dispatch_metric_internal() ^- fc_process_chain(pre_cache_chain) fc_process_chain(fc_process_chain) fc_bit_write_invoke() plugin_write(NULL) / plugin_write(plugin_name) plugin callback() The data flow now is: (From one of the ReadThreads) plugin_dispatch_{values,multivalue}() plugin_dispatch_metric_family() plugin_dispatch_metric_internal() ^- fc_process_chain(pre_cache_chain) fc_process_chain(post_cache_chain) fc_bit_write_invoke() plugin_write(NULL) / plugin_write(plugin_name) write_queue_enqueue() -----{Queue}----+ | (In one of the WriteThreads threads) | plugin_write_thread() <-+ plugin callback() One result of this change is, that the behaviour of plugin_write has changed from running the plugin callback immediately and in the same thread, to always enqueueing the value and de-queing in the dedicated thread. - The behaviour of the WriteQueueLimitHigh and WriteQueueLimitLow options has changed. The Queue will be be capped to a length of LimitHigh by dropping random queue elements between the queue end and LimitLow. Setting LimitLow to a reasonably large value ensures that fast write plugins do not loose values, even in the vicinity of a slow plugin. The diagram below shows the random element selected for removal (###) in Step 1 and the queue with the element removed in Step 2. Step 1: +- Thread #1 Head | +- Thread #2 Head +- Tail v | | v v +--+| +--+ #### +--+| +--+ +--+ +--+ +--+ +--+ | 1|->| 1|-># 1#->| 1|->| 2|->| 2|->| 2|->| 2|->| 2|->X +--+| +--+ #### +--+| +--+ +--+ +--+ +--+ +--+ | | | LimitHigh | LimitLow Step 2: | +- Thread #1 Head +- Thread #2 Head +- Tail | v | v v | +--+ +--+ +--+| +--+ +--+ +--+ +--+ +--+ | | 1|->| 1|->| 1|->| 2|->| 2|->| 2|->| 2|->| 2|->X | +--+ +--+ +--+| +--+ +--+ +--+ +--+ +--+ | | | LimitHigh | LimitLow Signed-off-by: Leonard Göhrs --- src/collectd.conf.in | 1 - src/collectd.conf.pod | 25 +- src/daemon/configfile.c | 1 - src/daemon/filter_chain.c | 14 +- src/daemon/filter_chain.h | 6 +- src/daemon/plugin.c | 739 +++++++++++++++++++------------------- src/daemon/plugin.h | 16 +- 7 files changed, 402 insertions(+), 400 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 2f7468cc6..f99955402 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -43,7 +43,6 @@ #MaxReadInterval 86400 #Timeout 2 #ReadThreads 5 -#WriteThreads 5 # Limit the size of the write queue. Default is no limit. Setting up a limit is # recommended for servers handling a high volume of traffic. diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index f7d0183ae..9e0c37b50 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -166,7 +166,7 @@ The following metrics are reported: =item C The number of metrics currently in the write queue. You can limit the queue -length with the B and B options. +length with the and B option. =item C @@ -282,12 +282,6 @@ you may want to increase this if you have more than five plugins that take a long time to read. Mostly those are plugins that do network-IO. Setting this to a value higher than the number of registered read callbacks is not recommended. -=item B I - -Number of threads to start for dispatching value lists to write plugins. The -default value is B<5>, but you may want to increase this if you have more than -five plugins that may take relatively long to write to. - =item B I =item B I @@ -304,19 +298,18 @@ metrics. For servers it is recommended to set this to a non-zero value, though. You can set the limits using B and B. Each of them takes a numerical argument which is the number of metrics in the -queue. If there are I metrics in the queue, any new metrics I be -dropped. If there are less than I metrics in the queue, all new metrics -I be enqueued. If the number of metrics currently in the queue is between -I and I, the metric is dropped with a probability that is -proportional to the number of metrics in the queue (i.e. it increases linearly -until it reaches 100%.) +queue. If there are I metrics in the queue a random element from the +queue will be dropped. This results in the data becoming more and more sparse +without dropping a lot of continguous values. An exception from the random +selection of elements to drop are the I most recent elements which will +never be dropped. This can be used to ensure that one miss-behaving write +plugin does not result in dropped values for other well-behaved write plugins. If B is set to non-zero and B is unset, the latter will default to half of B. -If you do not want to randomly drop values when the queue size is between -I and I, set B and B -to the same value. +If you want to deterministically drop the oldest elements instead of random +ones, set B and B to the same value. Enabling the B option is of great help to figure out the values to set B and B to. diff --git a/src/daemon/configfile.c b/src/daemon/configfile.c index 4d40a7784..b36c69eff 100644 --- a/src/daemon/configfile.c +++ b/src/daemon/configfile.c @@ -107,7 +107,6 @@ static cf_global_option_t cf_global_options[] = { {"FQDNLookup", NULL, 0, "true"}, {"Interval", NULL, 0, NULL}, {"ReadThreads", NULL, 0, "5"}, - {"WriteThreads", NULL, 0, "5"}, {"WriteQueueLimitHigh", NULL, 0, NULL}, {"WriteQueueLimitLow", NULL, 0, NULL}, {"Timeout", NULL, 0, "2"}, diff --git a/src/daemon/filter_chain.c b/src/daemon/filter_chain.c index 89892f779..b4e1bbe1b 100644 --- a/src/daemon/filter_chain.c +++ b/src/daemon/filter_chain.c @@ -568,7 +568,7 @@ static int fc_bit_jump_destroy(void **user_data) /* {{{ */ return 0; } /* }}} int fc_bit_jump_destroy */ -static int fc_bit_jump_invoke(metric_family_t *fam, +static int fc_bit_jump_invoke(metric_family_t const *fam, notification_meta_t __attribute__((unused)) * *meta, void **user_data) { @@ -598,14 +598,16 @@ static int fc_bit_jump_invoke(metric_family_t *fam, return FC_TARGET_CONTINUE; } /* }}} int fc_bit_jump_invoke */ -static int fc_bit_stop_invoke(__attribute__((unused)) metric_family_t *fam, +static int fc_bit_stop_invoke(__attribute__((unused)) + metric_family_t const *fam, __attribute__((unused)) notification_meta_t **meta, __attribute__((unused)) void **user_data) { return FC_TARGET_STOP; } /* }}} int fc_bit_stop_invoke */ -static int fc_bit_return_invoke(__attribute__((unused)) metric_family_t *fam, +static int fc_bit_return_invoke(__attribute__((unused)) + metric_family_t const *fam, __attribute__((unused)) notification_meta_t **meta, __attribute__((unused)) void **user_data) { @@ -678,7 +680,7 @@ static int fc_bit_write_destroy(void **user_data) /* {{{ */ return 0; } /* }}} int fc_bit_write_destroy */ -static int fc_bit_write_invoke(metric_family_t *fam, +static int fc_bit_write_invoke(metric_family_t const *fam, __attribute__((unused)) notification_meta_t **meta, void **user_data) { @@ -849,7 +851,7 @@ fc_chain_t *fc_chain_get_by_name(const char *chain_name) /* {{{ */ return NULL; } /* }}} int fc_chain_get_by_name */ -int fc_process_chain(metric_family_t *fam, /* {{{ */ +int fc_process_chain(metric_family_t const *fam, /* {{{ */ fc_chain_t *chain) { fc_target_t *target; int status = FC_TARGET_CONTINUE; @@ -969,7 +971,7 @@ int fc_process_chain(metric_family_t *fam, /* {{{ */ /* Iterate over all rules in the chain and execute all targets for which all * matches match. */ -int fc_default_action(metric_family_t *fam) /* {{{ */ +int fc_default_action(metric_family_t const *fam) /* {{{ */ { /* FIXME: Pass the meta-data to match targets here (when implemented). */ return fc_bit_write_invoke(fam, NULL, NULL); diff --git a/src/daemon/filter_chain.h b/src/daemon/filter_chain.h index 41fb44570..5eb171cc8 100644 --- a/src/daemon/filter_chain.h +++ b/src/daemon/filter_chain.h @@ -57,7 +57,7 @@ int fc_register_match(const char *name, match_proc_t proc); struct target_proc_s { int (*create)(const oconfig_item_t *ci, void **user_data); int (*destroy)(void **user_data); - int (*invoke)(metric_family_t *fam, notification_meta_t **meta, + int (*invoke)(metric_family_t const *fam, notification_meta_t **meta, void **user_data); }; typedef struct target_proc_s target_proc_t; @@ -91,9 +91,9 @@ int fc_rule_delete (const char *chain_name, int position); */ fc_chain_t *fc_chain_get_by_name(const char *chain_name); -int fc_process_chain(metric_family_t *fam, fc_chain_t *chain); +int fc_process_chain(metric_family_t const *fam, fc_chain_t *chain); -int fc_default_action(metric_family_t *fam); +int fc_default_action(metric_family_t const *fam); /* * Shortcut for global configuration diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index 243637e90..111dd88c4 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -94,12 +94,35 @@ struct cache_event_func_s { }; typedef struct cache_event_func_s cache_event_func_t; -struct write_queue_s; -typedef struct write_queue_s write_queue_t; -struct write_queue_s { +typedef struct write_queue_elem_s { metric_family_t *family; plugin_ctx_t ctx; - write_queue_t *next; + const char *plugin; + long ref_count; + struct write_queue_elem_s *next; +} write_queue_elem_t; + +typedef struct write_queue_thread_s { + bool loop; + long queue_length; + const char *name; + plugin_write_cb callback; + user_data_t *ud; + pthread_t thread; + write_queue_elem_t *head; + struct write_queue_thread_s *next; +} write_queue_thread_t; + +struct { + pthread_mutex_t lock; + pthread_cond_t cond; + write_queue_elem_t *tail; + write_queue_thread_t *threads; +} write_queue = { + .lock = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + .tail = NULL, + .threads = NULL, }; struct flush_callback_s { @@ -114,7 +137,6 @@ typedef struct flush_callback_s flush_callback_t; static c_avl_tree_t *plugins_loaded; static llist_t *list_init; -static llist_t *list_write; static llist_t *list_flush; static llist_t *list_missing; static llist_t *list_shutdown; @@ -143,15 +165,6 @@ static pthread_t *read_threads; static size_t read_threads_num; static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL; -static write_queue_t *write_queue_head; -static write_queue_t *write_queue_tail; -static long write_queue_length; -static bool write_loop = true; -static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *write_threads; -static size_t write_threads_num; - static pthread_key_t plugin_ctx_key; static bool plugin_ctx_key_initialized; @@ -165,7 +178,7 @@ static bool record_statistics; /* * Static functions */ -static int plugin_dispatch_metric_internal(metric_family_t *fam); +static int plugin_dispatch_metric_internal(metric_family_t const *fam); static const char *plugin_get_dir(void) { if (plugindir == NULL) @@ -175,6 +188,17 @@ static const char *plugin_get_dir(void) { } static int plugin_update_internal_statistics(void) { /* {{{ */ + long write_queue_length = 0; + + pthread_mutex_lock(&write_queue.lock); + for (write_queue_thread_t *thread = write_queue.threads; thread != NULL; + thread = thread->next) { + if (thread->queue_length > write_queue_length) { + write_queue_length = thread->queue_length; + } + } + pthread_mutex_unlock(&write_queue.lock); + gauge_t copy_write_queue_length = (gauge_t)write_queue_length; /* Initialize `vl' */ @@ -193,7 +217,9 @@ static int plugin_update_internal_statistics(void) { /* {{{ */ plugin_dispatch_values(&vl); /* Write queue : Values dropped (queue length > low limit) */ + pthread_mutex_lock(&statistics_lock); vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped}; + pthread_mutex_unlock(&statistics_lock); vl.values_len = 1; sstrncpy(vl.type, "derive", sizeof(vl.type)); sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance)); @@ -321,42 +347,6 @@ static int register_callback(llist_t **list, /* {{{ */ return 0; } /* }}} int register_callback */ -static void log_list_callbacks(llist_t **list, /* {{{ */ - const char *comment) { - char *str; - int len; - int i; - llentry_t *le; - int n; - - n = llist_size(*list); - if (n == 0) { - INFO("%s: [none]", comment); - return; - } - - char **keys = calloc(n, sizeof(*keys)); - if (keys == NULL) { - ERROR("%s: failed to allocate memory for list of callbacks", comment); - return; - } - - for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) { - keys[i] = le->key; - len += strlen(le->key) + 6; - } - str = malloc(len + 10); - if (str == NULL) { - ERROR("%s: failed to allocate memory for list of callbacks", comment); - } else { - *str = '\0'; - strjoin(str, len, keys, n, "', '"); - INFO("%s ['%s']", comment, str); - sfree(str); - } - sfree(keys); -} /* }}} void log_list_callbacks */ - static int create_register_callback(llist_t **list, /* {{{ */ const char *name, void *callback, user_data_t const *ud) { @@ -732,42 +722,164 @@ plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */ return vl; } /* }}} value_list_t *plugin_value_list_clone */ -static void write_queue_enqueue(write_queue_t *head) { - write_queue_t *tail = NULL; - long num = 0; +static void write_queue_ref_single(write_queue_elem_t *elem, long dir) { + elem->ref_count += dir; + + assert(elem->ref_count >= 0); + + if (elem->ref_count == 0) { + if (write_queue.tail == elem) { + write_queue.tail = NULL; + assert(elem->next == NULL); + } - for (write_queue_t *elem = head; elem != NULL; elem = elem->next) { - tail = elem; - num++; + metric_family_free(elem->family); + sfree(elem); } +} - if (num == 0) { - return; +static void write_queue_ref_all(write_queue_elem_t *start, long dir) { + while (start != NULL) { + write_queue_elem_t *elem = start; + start = elem->next; + + write_queue_ref_single(elem, dir); } +} - pthread_mutex_lock(&write_lock); +static int write_queue_enqueue(write_queue_elem_t *ins_head) { + static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; - if (write_queue_tail == NULL) { - write_queue_head = head; - write_queue_tail = tail; - write_queue_length = num; - } else { - write_queue_tail->next = head; - write_queue_tail = tail; - write_queue_length += num; + if (ins_head == NULL) { + return EINVAL; } - pthread_cond_signal(&write_cond); - pthread_mutex_unlock(&write_lock); + pthread_mutex_lock(&write_queue.lock); + + if (write_queue.threads == NULL) { + c_complain_once(LOG_WARNING, &no_write_complaint, + "write_queue_enqueue: No write callback has been " + "registered. Please load at least one output plugin, " + "if you want the collected data to be stored."); + + /* Element in the ins_head queue already have zero reference count + * but without any write threads there is noone to free them. + * make sure they are freed by de-refing them 0 times. */ + write_queue_ref_all(ins_head, 0); + pthread_mutex_unlock(&write_queue.lock); + + return ENOENT; + } + + write_queue_elem_t *ins_tail = NULL; + long num_elems = 0; + + /* More than one element may be enqueued at once. Count elements and find + * local tail. */ + for (write_queue_elem_t *elem = ins_head; elem != NULL; elem = elem->next) { + ins_tail = elem; + num_elems++; + } + + /* Add reference to new elements to existing queue (if there is one) + * and update the tail. */ + if (write_queue.tail != NULL) { + write_queue.tail->next = ins_head; + } + write_queue.tail = ins_tail; + + /* Iterate through all registered write plugins/threads to: + * a) update their head pointer if their queue is currently empty. + * b) find the thread with the longest queue to apply limits later. */ + write_queue_thread_t *slowest_thread = write_queue.threads; + + for (write_queue_thread_t *thread = write_queue.threads; thread != NULL; + thread = thread->next) { + if (thread->head == NULL) { + thread->head = ins_head; + } + + /* Mark the new elements as to be consumed by this thread */ + write_queue_ref_all(ins_head, 1); + thread->queue_length += num_elems; + + if (thread->queue_length > slowest_thread->queue_length) { + slowest_thread = thread; + } + } + + /* Enforce write_limit_high (unless it is infinite (e.g. == 0)) */ + while (write_limit_high != 0 && + slowest_thread->queue_length > write_limit_high) { + /* Select a random element to drop between the last position in the slowest + * thread's queue and queue positon "write_limit_low". This makes sure that + * write plugins that do not let the queue get longer than "write_limit_low" + * will never drop values, regardless of what other plugins do. */ + long drop_pos = + cdrand_u() % (slowest_thread->queue_length - write_limit_low) + + write_limit_low; + + /* Walk the queue and count elements until element number drop_pos is + * found. to_drop will point to the element in question and to_spare + * will point to one element before to_drop (it it exists). */ + write_queue_elem_t *to_spare = NULL; + write_queue_elem_t *to_drop = slowest_thread->head; + + for (long queue_pos = slowest_thread->queue_length - 1; + queue_pos > drop_pos; queue_pos--) { + to_spare = to_drop; + to_drop = to_drop->next; + } + + /* Unlink to_drop from linked list if it is not the head (in which case + * to_spare is NULL and it will be unlinked below) */ + if (to_spare != NULL) { + to_spare->next = to_drop->next; + } + + /* Iterate through all registered write plugins/threads to: + * a) update the head if it references to_drop. + * b) update the thread's queue length if to_drop was still in it's + * queue. */ + for (write_queue_thread_t *thread = write_queue.threads; thread != NULL; + thread = thread->next) { + + if (thread->head == to_drop) { + thread->head = to_drop->next; + } + + /* Reduce reference count and queue length for every affected queue. + * There may still be references held in write_threads if the element was + * just de-queued in any and is currently used in the callback, + * so freeing may be delayed until it is dropped there. */ + if (drop_pos < thread->queue_length) { + thread->queue_length--; + write_queue_ref_single(to_drop, -1); + } + } + + if (record_statistics) { + pthread_mutex_lock(&statistics_lock); + stats_values_dropped++; + pthread_mutex_unlock(&statistics_lock); + } + } + + pthread_cond_broadcast(&write_queue.cond); + pthread_mutex_unlock(&write_queue.lock); + + return 0; } -/* enqueue_metric_family enqueues the metric family to write_queue. */ -static int enqueue_metric_family(metric_family_t const *fam) { /* {{{ */ +EXPORT int plugin_write(const char *plugin, metric_family_t const *fam) { + if (fam == NULL) { + return EINVAL; + } + metric_family_t *fam_copy = metric_family_clone(fam); if (fam_copy == NULL) { int status = errno; - ERROR("enqueue_metric_family: metric_family_clone failed: %s", - STRERROR(status)); + ERROR("plugin_write: metric_family_clone failed: %s", STRERROR(status)); return status; } @@ -785,141 +897,81 @@ static int enqueue_metric_family(metric_family_t const *fam) { /* {{{ */ /* TODO(octo): set target labels here. */ } - write_queue_t *q = calloc(1, sizeof(*q)); - if (q == NULL) { + write_queue_elem_t *elem = calloc(1, sizeof(*elem)); + if (elem == NULL) { return ENOMEM; } - (*q) = (write_queue_t){ - .family = fam_copy, - .ctx = plugin_get_ctx(), - }; - write_queue_enqueue(q); - return 0; -} /* }}} int enqueue_metric_family */ - -static metric_family_t *plugin_write_dequeue(void) /* {{{ */ -{ - write_queue_t *q; - - pthread_mutex_lock(&write_lock); - while (write_loop && (write_queue_head == NULL)) - pthread_cond_wait(&write_cond, &write_lock); + elem->family = fam_copy; + elem->ctx = plugin_get_ctx(); + elem->plugin = plugin; + elem->ref_count = 0; + elem->next = NULL; - if (write_queue_head == NULL) { - pthread_mutex_unlock(&write_lock); - return NULL; - } + return write_queue_enqueue(elem); +} /* }}} int enqueue_metric_family */ - q = write_queue_head; - write_queue_head = q->next; - write_queue_length -= 1; - if (write_queue_head == NULL) { - write_queue_tail = NULL; - assert(0 == write_queue_length); - } +static void *plugin_write_thread(void *args) /* {{{ */ +{ + write_queue_thread_t *this_thread = args; - pthread_mutex_unlock(&write_lock); + DEBUG("plugin_write_thread (%s): start", this_thread->name); - (void)plugin_set_ctx(q->ctx); + pthread_mutex_lock(&write_queue.lock); - metric_family_t *fam = q->family; - sfree(q); - return fam; -} /* }}} metric_family_t *plugin_write_dequeue */ + while (this_thread->loop) { + write_queue_elem_t *elem = this_thread->head; -static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */ -{ - while (write_loop) { - metric_family_t *fam = plugin_write_dequeue(); - if (fam == NULL) + if (elem == NULL) { + pthread_cond_wait(&write_queue.cond, &write_queue.lock); continue; + } - (void)plugin_dispatch_metric_internal(fam); - metric_family_free(fam); - } - - pthread_exit(NULL); - return (void *)0; -} /* }}} void *plugin_write_thread */ - -static void start_write_threads(size_t num) /* {{{ */ -{ - if (write_threads != NULL) - return; + /* Unlink early so that write_queue_enqueue can freely manipulate the + * head while the lock is not held in the callback. */ + this_thread->head = elem->next; + this_thread->queue_length--; - write_threads = calloc(num, sizeof(*write_threads)); - if (write_threads == NULL) { - ERROR("plugin: start_write_threads: calloc failed."); - return; - } + DEBUG("plugin_write_thread(%s): de-queue %p (remaining queue length: %ld)", + this_thread->name, elem, this_thread->queue_length); - write_threads_num = 0; - for (size_t i = 0; i < num; i++) { - int status = pthread_create(write_threads + write_threads_num, - /* attr = */ NULL, plugin_write_thread, - /* arg = */ NULL); - if (status != 0) { - ERROR("plugin: start_write_threads: pthread_create failed with status %i " - "(%s).", - status, STRERROR(status)); - return; - } + /* Should elem be written to all plugins or this plugin in particular? */ + if (elem->plugin == NULL || + strcasecmp(elem->plugin, this_thread->name) == 0) { + pthread_mutex_unlock(&write_queue.lock); - char name[THREAD_NAME_MAX]; - ssnprintf(name, sizeof(name), "writer#%" PRIu64, - (uint64_t)write_threads_num); - set_thread_name(write_threads[write_threads_num], name); + plugin_ctx_t ctx = elem->ctx; + ctx.name = (char *)this_thread->name; + plugin_set_ctx(ctx); - write_threads_num++; - } /* for (i) */ -} /* }}} void start_write_threads */ + /* TODO(lgo): do something with the return value? */ + this_thread->callback(elem->family, this_thread->ud); -static void stop_write_threads(void) /* {{{ */ -{ - write_queue_t *q; - size_t i; + pthread_mutex_lock(&write_queue.lock); + } - if (write_threads == NULL) - return; + /* Free the element if it is not referenced by another queue or thread. */ + write_queue_ref_single(elem, -1); + } - INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num); + DEBUG("plugin_write_thread(%s): teardown", this_thread->name); - pthread_mutex_lock(&write_lock); - write_loop = false; - DEBUG("plugin: stop_write_threads: Signalling `write_cond'"); - pthread_cond_broadcast(&write_cond); - pthread_mutex_unlock(&write_lock); + /* Cleanup before leaving */ + free_userdata(this_thread->ud); + this_thread->ud = NULL; - for (i = 0; i < write_threads_num; i++) { - if (pthread_join(write_threads[i], NULL) != 0) { - ERROR("plugin: stop_write_threads: pthread_join failed."); - } - write_threads[i] = (pthread_t)0; + /* Drop references to all remaining queue elements */ + if (this_thread->head != NULL) { + write_queue_ref_all(this_thread->head, -1); + this_thread->head = NULL; + this_thread->queue_length = 0; } - sfree(write_threads); - write_threads_num = 0; - pthread_mutex_lock(&write_lock); - i = 0; - for (q = write_queue_head; q != NULL;) { - write_queue_t *q1 = q; - metric_family_free(q->family); - q = q->next; - sfree(q1); - i++; - } - write_queue_head = NULL; - write_queue_tail = NULL; - write_queue_length = 0; - pthread_mutex_unlock(&write_lock); + pthread_mutex_unlock(&write_queue.lock); + pthread_exit(NULL); - if (i > 0) { - WARNING("plugin: %" PRIsz " metric%s left after shutting down " - "the write threads.", - i, (i == 1) ? " was" : "s were"); - } -} /* }}} void stop_write_threads */ + return NULL; +} /* }}} void *plugin_write_thread */ /* * Public functions @@ -1245,8 +1297,46 @@ EXPORT int plugin_register_complex_read(const char *group, const char *name, } /* int plugin_register_complex_read */ EXPORT int plugin_register_write(const char *name, plugin_write_cb callback, - user_data_t const *ud) { - return create_register_callback(&list_write, name, (void *)callback, ud); + user_data_t const *user_data) { + write_queue_thread_t *this_thread = calloc(1, sizeof(*this_thread)); + + if (this_thread == NULL) { + free_userdata(user_data); + ERROR("plugin_register_write: calloc failed."); + return ENOMEM; + } + + this_thread->loop = true; + this_thread->queue_length = 0; + this_thread->name = name; + this_thread->callback = callback; + this_thread->ud = (user_data_t *)user_data; + this_thread->head = NULL; + + pthread_mutex_lock(&write_queue.lock); + + int status = pthread_create(&this_thread->thread, NULL, plugin_write_thread, + (void *)this_thread); + + if (status == 0) { + char thread_name[THREAD_NAME_MAX]; + ssnprintf(thread_name, sizeof(thread_name), "writer_%s", name); + set_thread_name(this_thread->thread, thread_name); + + this_thread->next = write_queue.threads; + write_queue.threads = this_thread; + } else { + ERROR("plugin: plugin_register_write: pthread_create failed with status %i " + "(%s).", + status, STRERROR(status)); + + free_userdata(user_data); + sfree(this_thread); + } + + pthread_mutex_unlock(&write_queue.lock); + + return status; } /* int plugin_register_write */ static int plugin_flush_timeout_callback(user_data_t *ud) { @@ -1512,7 +1602,44 @@ EXPORT int plugin_unregister_read(const char *name) /* {{{ */ } /* }}} int plugin_unregister_read */ EXPORT void plugin_log_available_writers(void) { - log_list_callbacks(&list_write, "Available write targets:"); + pthread_mutex_lock(&write_queue.lock); + + if (write_queue.threads == NULL) { + INFO("Available write targets: [none]"); + return; + } + + size_t total_len = 0; + + for (write_queue_thread_t *piv = write_queue.threads; piv != NULL; + piv = piv->next) { + total_len += strlen(piv->name); + if (piv->next != NULL) { + total_len += 5; + } + } + + char *str = malloc(total_len + 1); + if (str == NULL) { + ERROR("Available write targets: failed to allocate memory for list of " + "writers"); + return; + } + + char *cursor = str; + + for (write_queue_thread_t *piv = write_queue.threads; piv != NULL; + piv = piv->next) { + cursor = stpcpy(cursor, piv->name); + if (piv->next != NULL) { + cursor = stpcpy(cursor, "' , '"); + } + } + + pthread_mutex_unlock(&write_queue.lock); + + INFO("Available write targets: ['%s']", str); + sfree(str); } static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */ @@ -1574,7 +1701,58 @@ EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */ } /* }}} int plugin_unregister_read_group */ EXPORT int plugin_unregister_write(const char *name) { - return plugin_unregister(list_write, name); + pthread_mutex_lock(&write_queue.lock); + + /* Build to completely new thread lists. One with threads to_stop and another + * with threads to_keep. If name is NULL to_keep will be empty and to_stop + * will contain all threads. If name is NULL to_stop will contain the + * relevant thread and to_keep will contain all remaining threads. */ + write_queue_thread_t *to_stop = NULL; + write_queue_thread_t *to_keep = NULL; + + for (write_queue_thread_t *piv = write_queue.threads; piv != NULL;) { + write_queue_thread_t *next = piv->next; + + if (name == NULL || strcasecmp(name, piv->name) == 0) { + piv->loop = false; + piv->next = to_stop; + to_stop = piv; + } else { + piv->next = to_keep; + to_keep = piv; + } + + piv = next; + } + + write_queue.threads = to_keep; + + pthread_cond_broadcast(&write_queue.cond); + pthread_mutex_unlock(&write_queue.lock); + + /* Return error if the requested thread was not found */ + if (to_stop == NULL && name != NULL) { + return ENOENT; + } + + int status = 0; + + while (to_stop != NULL) { + write_queue_thread_t *next = to_stop->next; + + int ret = pthread_join(to_stop->thread, NULL); + + if (ret != 0) { + ERROR("plugin_unregister_write: pthread_join failed for %s.", + to_stop->name); + status = ret; + } + + sfree(to_stop); + to_stop = next; + } + + return status; } EXPORT int plugin_unregister_flush(const char *name) { @@ -1689,13 +1867,6 @@ EXPORT int plugin_init_all(void) { write_limit_low = write_limit_high; } - write_threads_num = global_option_get_long("WriteThreads", - /* default = */ 5); - if (write_threads_num < 1) { - ERROR("WriteThreads must be positive."); - write_threads_num = 5; - } - if ((list_init == NULL) && (read_heap == NULL)) return ret; @@ -1726,8 +1897,6 @@ EXPORT int plugin_init_all(void) { le = le->next; } - start_write_threads((size_t)write_threads_num); - max_read_interval = global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL); @@ -1793,74 +1962,6 @@ EXPORT int plugin_read_all_once(void) { return return_status; } /* int plugin_read_all_once */ -EXPORT int plugin_write(const char *plugin, /* {{{ */ - metric_family_t const *fam) { - llentry_t *le; - int status; - - if (fam == NULL) - return EINVAL; - - if (list_write == NULL) - return ENOENT; - - if (plugin == NULL) { - int success = 0; - int failure = 0; - - le = llist_head(list_write); - while (le != NULL) { - callback_func_t *cf = le->value; - - /* Keep the read plugin's interval and flush information but update the - * plugin name. */ - plugin_ctx_t old_ctx = plugin_get_ctx(); - plugin_ctx_t ctx = old_ctx; - ctx.name = cf->cf_ctx.name; - plugin_set_ctx(ctx); - - DEBUG("plugin: plugin_write: Writing values via %s.", le->key); - plugin_write_cb callback = (void *)cf->cf_callback; - status = (*callback)(fam, &cf->cf_udata); - if (status != 0) - failure++; - else - success++; - - plugin_set_ctx(old_ctx); - le = le->next; - } - - if ((success == 0) && (failure != 0)) - status = -1; - else - status = 0; - } else /* plugin != NULL */ - { - le = llist_head(list_write); - while (le != NULL) { - if (strcasecmp(plugin, le->key) == 0) - break; - - le = le->next; - } - - if (le == NULL) - return ENOENT; - - callback_func_t *cf = le->value; - - /* do not switch plugin context; rather keep the context (interval) - * information of the calling read plugin */ - - DEBUG("plugin: plugin_write: Writing values via %s.", le->key); - plugin_write_cb callback = (void *)cf->cf_callback; - status = (*callback)(fam, &cf->cf_udata); - } - - return status; -} /* }}} int plugin_write */ - EXPORT int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) { llentry_t *le; @@ -1904,7 +2005,7 @@ EXPORT int plugin_shutdown_all(void) { destroy_read_heap(); /* blocks until all write threads have shut down. */ - stop_write_threads(); + plugin_unregister_write(NULL); /* ask all plugins to write out the state they kept. */ plugin_flush(/* plugin = */ NULL, @@ -1940,7 +2041,6 @@ EXPORT int plugin_shutdown_all(void) { destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); destroy_cache_event_callbacks(); - destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); destroy_all_callbacks(&list_shutdown); @@ -2060,18 +2160,7 @@ void plugin_dispatch_cache_event(enum cache_event_type_e event_type, return; } -static int plugin_dispatch_metric_internal(metric_family_t *fam) { - static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; - if (fam == NULL) { - return EINVAL; - } - - if (list_write == NULL) - c_complain_once(LOG_WARNING, &no_write_complaint, - "plugin_dispatch_values: No write callback has been " - "registered. Please load at least one output plugin, " - "if you want the collected data to be stored."); - +static int plugin_dispatch_metric_internal(metric_family_t const *fam) { /**** Handle caching here !! ****/ int status = 0; if (pre_cache_chain != NULL) { @@ -2102,86 +2191,17 @@ static int plugin_dispatch_metric_internal(metric_family_t *fam) { return 0; } /* int plugin_dispatch_values_internal */ -static double get_drop_probability(void) /* {{{ */ -{ - long pos; - long size; - long wql; - - pthread_mutex_lock(&write_lock); - wql = write_queue_length; - pthread_mutex_unlock(&write_lock); - - if (wql < write_limit_low) - return 0.0; - if (wql >= write_limit_high) - return 1.0; - - pos = 1 + wql - write_limit_low; - size = 1 + write_limit_high - write_limit_low; - - return (double)pos / (double)size; -} /* }}} double get_drop_probability */ - -static bool check_drop_value(void) /* {{{ */ -{ - static cdtime_t last_message_time; - static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; - - double p; - double q; - int status; - - if (write_limit_high == 0) - return false; - - p = get_drop_probability(); - if (p == 0.0) - return false; - - status = pthread_mutex_trylock(&last_message_lock); - if (status == 0) { - cdtime_t now; - - now = cdtime(); - if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) { - last_message_time = now; - ERROR("plugin_dispatch_values: Low water mark " - "reached. Dropping %.0f%% of metrics.", - 100.0 * p); - } - pthread_mutex_unlock(&last_message_lock); - } - - if (p == 1.0) - return true; - - q = cdrand_d(); - if (q > p) - return true; - else - return false; -} /* }}} bool check_drop_value */ - EXPORT int plugin_dispatch_metric_family(metric_family_t const *fam) { if ((fam == NULL) || (fam->metric.num == 0)) { return EINVAL; } - if (check_drop_value()) { - if (record_statistics) { - pthread_mutex_lock(&statistics_lock); - stats_values_dropped++; - pthread_mutex_unlock(&statistics_lock); - } - return 0; - } - - int status = enqueue_metric_family(fam); + int status = plugin_dispatch_metric_internal(fam); if (status != 0) { - ERROR("plugin_dispatch_values: plugin_write_enqueue_metric_list failed " - "with status %i (%s).", - status, STRERROR(status)); + ERROR( + "plugin_dispatch_metric_family: plugin_dispatch_metric_internal failed " + "with status %i (%s).", + status, STRERROR(status)); } return status; } @@ -2220,15 +2240,6 @@ plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */ gauge_t sum = 0.0; va_list ap; - if (check_drop_value()) { - if (record_statistics) { - pthread_mutex_lock(&statistics_lock); - stats_values_dropped++; - pthread_mutex_unlock(&statistics_lock); - } - return 0; - } - assert(template->values_len == 1); /* Calculate sum for Gauge to calculate percent if needed */ diff --git a/src/daemon/plugin.h b/src/daemon/plugin.h index faec39705..099d13b76 100644 --- a/src/daemon/plugin.h +++ b/src/daemon/plugin.h @@ -243,21 +243,19 @@ int plugin_shutdown_all(void); * plugin_write * * DESCRIPTION - * Calls the write function of the given plugin with the provided data set and - * value list. It differs from `plugin_dispatch_values' in that it does not - * update the cache, does not do threshold checking, call the chain subsystem - * and so on. It looks up the requested plugin and invokes the function, end - * of story. + * Directly enqueues the given metric family without updating the cache or + * calling the chain subsystem. The metric family may still be dropped from + * the queue if collectd is stopped or the queue length surpasses the + * configured limit. * * ARGUMENTS * plugin Name of the plugin. If NULL, the value is sent to all registered * write functions. - * m The actual value to be processed. Must not be NULL. + * fam The actual metric family to be processed. * * RETURN VALUE - * Returns zero upon success or non-zero if an error occurred. If `plugin' is - * NULL and more than one plugin is called, an error is only returned if *all* - * plugins fail. + * Returns zero if the metric family was sucessfully enqueued or non-zero if + * an error occurred. * * NOTES * This is the function used by the `write' built-in target. May be used by -- 2.47.2