};
typedef struct cache_event_func_s cache_event_func_t;
-struct write_queue_s;
-typedef struct write_queue_s write_queue_t;
-struct write_queue_s {
+typedef struct write_queue_elem_s {
metric_family_t *family;
plugin_ctx_t ctx;
- write_queue_t *next;
+ const char *plugin;
+ long ref_count;
+ struct write_queue_elem_s *next;
+} write_queue_elem_t;
+
+typedef struct write_queue_thread_s {
+ bool loop;
+ long queue_length;
+ const char *name;
+ plugin_write_cb callback;
+ user_data_t *ud;
+ pthread_t thread;
+ write_queue_elem_t *head;
+ struct write_queue_thread_s *next;
+} write_queue_thread_t;
+
+struct {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ write_queue_elem_t *tail;
+ write_queue_thread_t *threads;
+} write_queue = {
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .cond = PTHREAD_COND_INITIALIZER,
+ .tail = NULL,
+ .threads = NULL,
};
struct flush_callback_s {
static c_avl_tree_t *plugins_loaded;
static llist_t *list_init;
-static llist_t *list_write;
static llist_t *list_flush;
static llist_t *list_missing;
static llist_t *list_shutdown;
static size_t read_threads_num;
static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
-static write_queue_t *write_queue_head;
-static write_queue_t *write_queue_tail;
-static long write_queue_length;
-static bool write_loop = true;
-static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
-static pthread_t *write_threads;
-static size_t write_threads_num;
-
static pthread_key_t plugin_ctx_key;
static bool plugin_ctx_key_initialized;
/*
* Static functions
*/
-static int plugin_dispatch_metric_internal(metric_family_t *fam);
+static int plugin_dispatch_metric_internal(metric_family_t const *fam);
static const char *plugin_get_dir(void) {
if (plugindir == NULL)
}
static int plugin_update_internal_statistics(void) { /* {{{ */
+ long write_queue_length = 0;
+
+ pthread_mutex_lock(&write_queue.lock);
+ for (write_queue_thread_t *thread = write_queue.threads; thread != NULL;
+ thread = thread->next) {
+ if (thread->queue_length > write_queue_length) {
+ write_queue_length = thread->queue_length;
+ }
+ }
+ pthread_mutex_unlock(&write_queue.lock);
+
gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
/* Initialize `vl' */
plugin_dispatch_values(&vl);
/* Write queue : Values dropped (queue length > low limit) */
+ pthread_mutex_lock(&statistics_lock);
vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
+ pthread_mutex_unlock(&statistics_lock);
vl.values_len = 1;
sstrncpy(vl.type, "derive", sizeof(vl.type));
sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
return 0;
} /* }}} int register_callback */
-static void log_list_callbacks(llist_t **list, /* {{{ */
- const char *comment) {
- char *str;
- int len;
- int i;
- llentry_t *le;
- int n;
-
- n = llist_size(*list);
- if (n == 0) {
- INFO("%s: [none]", comment);
- return;
- }
-
- char **keys = calloc(n, sizeof(*keys));
- if (keys == NULL) {
- ERROR("%s: failed to allocate memory for list of callbacks", comment);
- return;
- }
-
- for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
- keys[i] = le->key;
- len += strlen(le->key) + 6;
- }
- str = malloc(len + 10);
- if (str == NULL) {
- ERROR("%s: failed to allocate memory for list of callbacks", comment);
- } else {
- *str = '\0';
- strjoin(str, len, keys, n, "', '");
- INFO("%s ['%s']", comment, str);
- sfree(str);
- }
- sfree(keys);
-} /* }}} void log_list_callbacks */
-
static int create_register_callback(llist_t **list, /* {{{ */
const char *name, void *callback,
user_data_t const *ud) {
return vl;
} /* }}} value_list_t *plugin_value_list_clone */
-static void write_queue_enqueue(write_queue_t *head) {
- write_queue_t *tail = NULL;
- long num = 0;
+static void write_queue_ref_single(write_queue_elem_t *elem, long dir) {
+ elem->ref_count += dir;
+
+ assert(elem->ref_count >= 0);
+
+ if (elem->ref_count == 0) {
+ if (write_queue.tail == elem) {
+ write_queue.tail = NULL;
+ assert(elem->next == NULL);
+ }
- for (write_queue_t *elem = head; elem != NULL; elem = elem->next) {
- tail = elem;
- num++;
+ metric_family_free(elem->family);
+ sfree(elem);
}
+}
- if (num == 0) {
- return;
+static void write_queue_ref_all(write_queue_elem_t *start, long dir) {
+ while (start != NULL) {
+ write_queue_elem_t *elem = start;
+ start = elem->next;
+
+ write_queue_ref_single(elem, dir);
}
+}
- pthread_mutex_lock(&write_lock);
+static int write_queue_enqueue(write_queue_elem_t *ins_head) {
+ static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
- if (write_queue_tail == NULL) {
- write_queue_head = head;
- write_queue_tail = tail;
- write_queue_length = num;
- } else {
- write_queue_tail->next = head;
- write_queue_tail = tail;
- write_queue_length += num;
+ if (ins_head == NULL) {
+ return EINVAL;
}
- pthread_cond_signal(&write_cond);
- pthread_mutex_unlock(&write_lock);
+ pthread_mutex_lock(&write_queue.lock);
+
+ if (write_queue.threads == NULL) {
+ c_complain_once(LOG_WARNING, &no_write_complaint,
+ "write_queue_enqueue: No write callback has been "
+ "registered. Please load at least one output plugin, "
+ "if you want the collected data to be stored.");
+
+ /* Element in the ins_head queue already have zero reference count
+ * but without any write threads there is noone to free them.
+ * make sure they are freed by de-refing them 0 times. */
+ write_queue_ref_all(ins_head, 0);
+ pthread_mutex_unlock(&write_queue.lock);
+
+ return ENOENT;
+ }
+
+ write_queue_elem_t *ins_tail = NULL;
+ long num_elems = 0;
+
+ /* More than one element may be enqueued at once. Count elements and find
+ * local tail. */
+ for (write_queue_elem_t *elem = ins_head; elem != NULL; elem = elem->next) {
+ ins_tail = elem;
+ num_elems++;
+ }
+
+ /* Add reference to new elements to existing queue (if there is one)
+ * and update the tail. */
+ if (write_queue.tail != NULL) {
+ write_queue.tail->next = ins_head;
+ }
+ write_queue.tail = ins_tail;
+
+ /* Iterate through all registered write plugins/threads to:
+ * a) update their head pointer if their queue is currently empty.
+ * b) find the thread with the longest queue to apply limits later. */
+ write_queue_thread_t *slowest_thread = write_queue.threads;
+
+ for (write_queue_thread_t *thread = write_queue.threads; thread != NULL;
+ thread = thread->next) {
+ if (thread->head == NULL) {
+ thread->head = ins_head;
+ }
+
+ /* Mark the new elements as to be consumed by this thread */
+ write_queue_ref_all(ins_head, 1);
+ thread->queue_length += num_elems;
+
+ if (thread->queue_length > slowest_thread->queue_length) {
+ slowest_thread = thread;
+ }
+ }
+
+ /* Enforce write_limit_high (unless it is infinite (e.g. == 0)) */
+ while (write_limit_high != 0 &&
+ slowest_thread->queue_length > write_limit_high) {
+ /* Select a random element to drop between the last position in the slowest
+ * thread's queue and queue positon "write_limit_low". This makes sure that
+ * write plugins that do not let the queue get longer than "write_limit_low"
+ * will never drop values, regardless of what other plugins do. */
+ long drop_pos =
+ cdrand_u() % (slowest_thread->queue_length - write_limit_low) +
+ write_limit_low;
+
+ /* Walk the queue and count elements until element number drop_pos is
+ * found. to_drop will point to the element in question and to_spare
+ * will point to one element before to_drop (it it exists). */
+ write_queue_elem_t *to_spare = NULL;
+ write_queue_elem_t *to_drop = slowest_thread->head;
+
+ for (long queue_pos = slowest_thread->queue_length - 1;
+ queue_pos > drop_pos; queue_pos--) {
+ to_spare = to_drop;
+ to_drop = to_drop->next;
+ }
+
+ /* Unlink to_drop from linked list if it is not the head (in which case
+ * to_spare is NULL and it will be unlinked below) */
+ if (to_spare != NULL) {
+ to_spare->next = to_drop->next;
+ }
+
+ /* Iterate through all registered write plugins/threads to:
+ * a) update the head if it references to_drop.
+ * b) update the thread's queue length if to_drop was still in it's
+ * queue. */
+ for (write_queue_thread_t *thread = write_queue.threads; thread != NULL;
+ thread = thread->next) {
+
+ if (thread->head == to_drop) {
+ thread->head = to_drop->next;
+ }
+
+ /* Reduce reference count and queue length for every affected queue.
+ * There may still be references held in write_threads if the element was
+ * just de-queued in any and is currently used in the callback,
+ * so freeing may be delayed until it is dropped there. */
+ if (drop_pos < thread->queue_length) {
+ thread->queue_length--;
+ write_queue_ref_single(to_drop, -1);
+ }
+ }
+
+ if (record_statistics) {
+ pthread_mutex_lock(&statistics_lock);
+ stats_values_dropped++;
+ pthread_mutex_unlock(&statistics_lock);
+ }
+ }
+
+ pthread_cond_broadcast(&write_queue.cond);
+ pthread_mutex_unlock(&write_queue.lock);
+
+ return 0;
}
-/* enqueue_metric_family enqueues the metric family to write_queue. */
-static int enqueue_metric_family(metric_family_t const *fam) { /* {{{ */
+EXPORT int plugin_write(const char *plugin, metric_family_t const *fam) {
+ if (fam == NULL) {
+ return EINVAL;
+ }
+
metric_family_t *fam_copy = metric_family_clone(fam);
if (fam_copy == NULL) {
int status = errno;
- ERROR("enqueue_metric_family: metric_family_clone failed: %s",
- STRERROR(status));
+ ERROR("plugin_write: metric_family_clone failed: %s", STRERROR(status));
return status;
}
/* TODO(octo): set target labels here. */
}
- write_queue_t *q = calloc(1, sizeof(*q));
- if (q == NULL) {
+ write_queue_elem_t *elem = calloc(1, sizeof(*elem));
+ if (elem == NULL) {
return ENOMEM;
}
- (*q) = (write_queue_t){
- .family = fam_copy,
- .ctx = plugin_get_ctx(),
- };
- write_queue_enqueue(q);
- return 0;
-} /* }}} int enqueue_metric_family */
-
-static metric_family_t *plugin_write_dequeue(void) /* {{{ */
-{
- write_queue_t *q;
-
- pthread_mutex_lock(&write_lock);
- while (write_loop && (write_queue_head == NULL))
- pthread_cond_wait(&write_cond, &write_lock);
+ elem->family = fam_copy;
+ elem->ctx = plugin_get_ctx();
+ elem->plugin = plugin;
+ elem->ref_count = 0;
+ elem->next = NULL;
- if (write_queue_head == NULL) {
- pthread_mutex_unlock(&write_lock);
- return NULL;
- }
+ return write_queue_enqueue(elem);
+} /* }}} int enqueue_metric_family */
- q = write_queue_head;
- write_queue_head = q->next;
- write_queue_length -= 1;
- if (write_queue_head == NULL) {
- write_queue_tail = NULL;
- assert(0 == write_queue_length);
- }
+static void *plugin_write_thread(void *args) /* {{{ */
+{
+ write_queue_thread_t *this_thread = args;
- pthread_mutex_unlock(&write_lock);
+ DEBUG("plugin_write_thread (%s): start", this_thread->name);
- (void)plugin_set_ctx(q->ctx);
+ pthread_mutex_lock(&write_queue.lock);
- metric_family_t *fam = q->family;
- sfree(q);
- return fam;
-} /* }}} metric_family_t *plugin_write_dequeue */
+ while (this_thread->loop) {
+ write_queue_elem_t *elem = this_thread->head;
-static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
-{
- while (write_loop) {
- metric_family_t *fam = plugin_write_dequeue();
- if (fam == NULL)
+ if (elem == NULL) {
+ pthread_cond_wait(&write_queue.cond, &write_queue.lock);
continue;
+ }
- (void)plugin_dispatch_metric_internal(fam);
- metric_family_free(fam);
- }
-
- pthread_exit(NULL);
- return (void *)0;
-} /* }}} void *plugin_write_thread */
-
-static void start_write_threads(size_t num) /* {{{ */
-{
- if (write_threads != NULL)
- return;
+ /* Unlink early so that write_queue_enqueue can freely manipulate the
+ * head while the lock is not held in the callback. */
+ this_thread->head = elem->next;
+ this_thread->queue_length--;
- write_threads = calloc(num, sizeof(*write_threads));
- if (write_threads == NULL) {
- ERROR("plugin: start_write_threads: calloc failed.");
- return;
- }
+ DEBUG("plugin_write_thread(%s): de-queue %p (remaining queue length: %ld)",
+ this_thread->name, elem, this_thread->queue_length);
- write_threads_num = 0;
- for (size_t i = 0; i < num; i++) {
- int status = pthread_create(write_threads + write_threads_num,
- /* attr = */ NULL, plugin_write_thread,
- /* arg = */ NULL);
- if (status != 0) {
- ERROR("plugin: start_write_threads: pthread_create failed with status %i "
- "(%s).",
- status, STRERROR(status));
- return;
- }
+ /* Should elem be written to all plugins or this plugin in particular? */
+ if (elem->plugin == NULL ||
+ strcasecmp(elem->plugin, this_thread->name) == 0) {
+ pthread_mutex_unlock(&write_queue.lock);
- char name[THREAD_NAME_MAX];
- ssnprintf(name, sizeof(name), "writer#%" PRIu64,
- (uint64_t)write_threads_num);
- set_thread_name(write_threads[write_threads_num], name);
+ plugin_ctx_t ctx = elem->ctx;
+ ctx.name = (char *)this_thread->name;
+ plugin_set_ctx(ctx);
- write_threads_num++;
- } /* for (i) */
-} /* }}} void start_write_threads */
+ /* TODO(lgo): do something with the return value? */
+ this_thread->callback(elem->family, this_thread->ud);
-static void stop_write_threads(void) /* {{{ */
-{
- write_queue_t *q;
- size_t i;
+ pthread_mutex_lock(&write_queue.lock);
+ }
- if (write_threads == NULL)
- return;
+ /* Free the element if it is not referenced by another queue or thread. */
+ write_queue_ref_single(elem, -1);
+ }
- INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
+ DEBUG("plugin_write_thread(%s): teardown", this_thread->name);
- pthread_mutex_lock(&write_lock);
- write_loop = false;
- DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
- pthread_cond_broadcast(&write_cond);
- pthread_mutex_unlock(&write_lock);
+ /* Cleanup before leaving */
+ free_userdata(this_thread->ud);
+ this_thread->ud = NULL;
- for (i = 0; i < write_threads_num; i++) {
- if (pthread_join(write_threads[i], NULL) != 0) {
- ERROR("plugin: stop_write_threads: pthread_join failed.");
- }
- write_threads[i] = (pthread_t)0;
+ /* Drop references to all remaining queue elements */
+ if (this_thread->head != NULL) {
+ write_queue_ref_all(this_thread->head, -1);
+ this_thread->head = NULL;
+ this_thread->queue_length = 0;
}
- sfree(write_threads);
- write_threads_num = 0;
- pthread_mutex_lock(&write_lock);
- i = 0;
- for (q = write_queue_head; q != NULL;) {
- write_queue_t *q1 = q;
- metric_family_free(q->family);
- q = q->next;
- sfree(q1);
- i++;
- }
- write_queue_head = NULL;
- write_queue_tail = NULL;
- write_queue_length = 0;
- pthread_mutex_unlock(&write_lock);
+ pthread_mutex_unlock(&write_queue.lock);
+ pthread_exit(NULL);
- if (i > 0) {
- WARNING("plugin: %" PRIsz " metric%s left after shutting down "
- "the write threads.",
- i, (i == 1) ? " was" : "s were");
- }
-} /* }}} void stop_write_threads */
+ return NULL;
+} /* }}} void *plugin_write_thread */
/*
* Public functions
} /* int plugin_register_complex_read */
EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
- user_data_t const *ud) {
- return create_register_callback(&list_write, name, (void *)callback, ud);
+ user_data_t const *user_data) {
+ write_queue_thread_t *this_thread = calloc(1, sizeof(*this_thread));
+
+ if (this_thread == NULL) {
+ free_userdata(user_data);
+ ERROR("plugin_register_write: calloc failed.");
+ return ENOMEM;
+ }
+
+ this_thread->loop = true;
+ this_thread->queue_length = 0;
+ this_thread->name = name;
+ this_thread->callback = callback;
+ this_thread->ud = (user_data_t *)user_data;
+ this_thread->head = NULL;
+
+ pthread_mutex_lock(&write_queue.lock);
+
+ int status = pthread_create(&this_thread->thread, NULL, plugin_write_thread,
+ (void *)this_thread);
+
+ if (status == 0) {
+ char thread_name[THREAD_NAME_MAX];
+ ssnprintf(thread_name, sizeof(thread_name), "writer_%s", name);
+ set_thread_name(this_thread->thread, thread_name);
+
+ this_thread->next = write_queue.threads;
+ write_queue.threads = this_thread;
+ } else {
+ ERROR("plugin: plugin_register_write: pthread_create failed with status %i "
+ "(%s).",
+ status, STRERROR(status));
+
+ free_userdata(user_data);
+ sfree(this_thread);
+ }
+
+ pthread_mutex_unlock(&write_queue.lock);
+
+ return status;
} /* int plugin_register_write */
static int plugin_flush_timeout_callback(user_data_t *ud) {
} /* }}} int plugin_unregister_read */
EXPORT void plugin_log_available_writers(void) {
- log_list_callbacks(&list_write, "Available write targets:");
+ pthread_mutex_lock(&write_queue.lock);
+
+ if (write_queue.threads == NULL) {
+ INFO("Available write targets: [none]");
+ return;
+ }
+
+ size_t total_len = 0;
+
+ for (write_queue_thread_t *piv = write_queue.threads; piv != NULL;
+ piv = piv->next) {
+ total_len += strlen(piv->name);
+ if (piv->next != NULL) {
+ total_len += 5;
+ }
+ }
+
+ char *str = malloc(total_len + 1);
+ if (str == NULL) {
+ ERROR("Available write targets: failed to allocate memory for list of "
+ "writers");
+ return;
+ }
+
+ char *cursor = str;
+
+ for (write_queue_thread_t *piv = write_queue.threads; piv != NULL;
+ piv = piv->next) {
+ cursor = stpcpy(cursor, piv->name);
+ if (piv->next != NULL) {
+ cursor = stpcpy(cursor, "' , '");
+ }
+ }
+
+ pthread_mutex_unlock(&write_queue.lock);
+
+ INFO("Available write targets: ['%s']", str);
+ sfree(str);
}
static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
} /* }}} int plugin_unregister_read_group */
EXPORT int plugin_unregister_write(const char *name) {
- return plugin_unregister(list_write, name);
+ pthread_mutex_lock(&write_queue.lock);
+
+ /* Build to completely new thread lists. One with threads to_stop and another
+ * with threads to_keep. If name is NULL to_keep will be empty and to_stop
+ * will contain all threads. If name is NULL to_stop will contain the
+ * relevant thread and to_keep will contain all remaining threads. */
+ write_queue_thread_t *to_stop = NULL;
+ write_queue_thread_t *to_keep = NULL;
+
+ for (write_queue_thread_t *piv = write_queue.threads; piv != NULL;) {
+ write_queue_thread_t *next = piv->next;
+
+ if (name == NULL || strcasecmp(name, piv->name) == 0) {
+ piv->loop = false;
+ piv->next = to_stop;
+ to_stop = piv;
+ } else {
+ piv->next = to_keep;
+ to_keep = piv;
+ }
+
+ piv = next;
+ }
+
+ write_queue.threads = to_keep;
+
+ pthread_cond_broadcast(&write_queue.cond);
+ pthread_mutex_unlock(&write_queue.lock);
+
+ /* Return error if the requested thread was not found */
+ if (to_stop == NULL && name != NULL) {
+ return ENOENT;
+ }
+
+ int status = 0;
+
+ while (to_stop != NULL) {
+ write_queue_thread_t *next = to_stop->next;
+
+ int ret = pthread_join(to_stop->thread, NULL);
+
+ if (ret != 0) {
+ ERROR("plugin_unregister_write: pthread_join failed for %s.",
+ to_stop->name);
+ status = ret;
+ }
+
+ sfree(to_stop);
+ to_stop = next;
+ }
+
+ return status;
}
EXPORT int plugin_unregister_flush(const char *name) {
write_limit_low = write_limit_high;
}
- write_threads_num = global_option_get_long("WriteThreads",
- /* default = */ 5);
- if (write_threads_num < 1) {
- ERROR("WriteThreads must be positive.");
- write_threads_num = 5;
- }
-
if ((list_init == NULL) && (read_heap == NULL))
return ret;
le = le->next;
}
- start_write_threads((size_t)write_threads_num);
-
max_read_interval =
global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
return return_status;
} /* int plugin_read_all_once */
-EXPORT int plugin_write(const char *plugin, /* {{{ */
- metric_family_t const *fam) {
- llentry_t *le;
- int status;
-
- if (fam == NULL)
- return EINVAL;
-
- if (list_write == NULL)
- return ENOENT;
-
- if (plugin == NULL) {
- int success = 0;
- int failure = 0;
-
- le = llist_head(list_write);
- while (le != NULL) {
- callback_func_t *cf = le->value;
-
- /* Keep the read plugin's interval and flush information but update the
- * plugin name. */
- plugin_ctx_t old_ctx = plugin_get_ctx();
- plugin_ctx_t ctx = old_ctx;
- ctx.name = cf->cf_ctx.name;
- plugin_set_ctx(ctx);
-
- DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
- plugin_write_cb callback = (void *)cf->cf_callback;
- status = (*callback)(fam, &cf->cf_udata);
- if (status != 0)
- failure++;
- else
- success++;
-
- plugin_set_ctx(old_ctx);
- le = le->next;
- }
-
- if ((success == 0) && (failure != 0))
- status = -1;
- else
- status = 0;
- } else /* plugin != NULL */
- {
- le = llist_head(list_write);
- while (le != NULL) {
- if (strcasecmp(plugin, le->key) == 0)
- break;
-
- le = le->next;
- }
-
- if (le == NULL)
- return ENOENT;
-
- callback_func_t *cf = le->value;
-
- /* do not switch plugin context; rather keep the context (interval)
- * information of the calling read plugin */
-
- DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
- plugin_write_cb callback = (void *)cf->cf_callback;
- status = (*callback)(fam, &cf->cf_udata);
- }
-
- return status;
-} /* }}} int plugin_write */
-
EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
const char *identifier) {
llentry_t *le;
destroy_read_heap();
/* blocks until all write threads have shut down. */
- stop_write_threads();
+ plugin_unregister_write(NULL);
/* ask all plugins to write out the state they kept. */
plugin_flush(/* plugin = */ NULL,
destroy_all_callbacks(&list_flush);
destroy_all_callbacks(&list_missing);
destroy_cache_event_callbacks();
- destroy_all_callbacks(&list_write);
destroy_all_callbacks(&list_notification);
destroy_all_callbacks(&list_shutdown);
return;
}
-static int plugin_dispatch_metric_internal(metric_family_t *fam) {
- static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
- if (fam == NULL) {
- return EINVAL;
- }
-
- if (list_write == NULL)
- c_complain_once(LOG_WARNING, &no_write_complaint,
- "plugin_dispatch_values: No write callback has been "
- "registered. Please load at least one output plugin, "
- "if you want the collected data to be stored.");
-
+static int plugin_dispatch_metric_internal(metric_family_t const *fam) {
/**** Handle caching here !! ****/
int status = 0;
if (pre_cache_chain != NULL) {
return 0;
} /* int plugin_dispatch_values_internal */
-static double get_drop_probability(void) /* {{{ */
-{
- long pos;
- long size;
- long wql;
-
- pthread_mutex_lock(&write_lock);
- wql = write_queue_length;
- pthread_mutex_unlock(&write_lock);
-
- if (wql < write_limit_low)
- return 0.0;
- if (wql >= write_limit_high)
- return 1.0;
-
- pos = 1 + wql - write_limit_low;
- size = 1 + write_limit_high - write_limit_low;
-
- return (double)pos / (double)size;
-} /* }}} double get_drop_probability */
-
-static bool check_drop_value(void) /* {{{ */
-{
- static cdtime_t last_message_time;
- static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
-
- double p;
- double q;
- int status;
-
- if (write_limit_high == 0)
- return false;
-
- p = get_drop_probability();
- if (p == 0.0)
- return false;
-
- status = pthread_mutex_trylock(&last_message_lock);
- if (status == 0) {
- cdtime_t now;
-
- now = cdtime();
- if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
- last_message_time = now;
- ERROR("plugin_dispatch_values: Low water mark "
- "reached. Dropping %.0f%% of metrics.",
- 100.0 * p);
- }
- pthread_mutex_unlock(&last_message_lock);
- }
-
- if (p == 1.0)
- return true;
-
- q = cdrand_d();
- if (q > p)
- return true;
- else
- return false;
-} /* }}} bool check_drop_value */
-
EXPORT int plugin_dispatch_metric_family(metric_family_t const *fam) {
if ((fam == NULL) || (fam->metric.num == 0)) {
return EINVAL;
}
- if (check_drop_value()) {
- if (record_statistics) {
- pthread_mutex_lock(&statistics_lock);
- stats_values_dropped++;
- pthread_mutex_unlock(&statistics_lock);
- }
- return 0;
- }
-
- int status = enqueue_metric_family(fam);
+ int status = plugin_dispatch_metric_internal(fam);
if (status != 0) {
- ERROR("plugin_dispatch_values: plugin_write_enqueue_metric_list failed "
- "with status %i (%s).",
- status, STRERROR(status));
+ ERROR(
+ "plugin_dispatch_metric_family: plugin_dispatch_metric_internal failed "
+ "with status %i (%s).",
+ status, STRERROR(status));
}
return status;
}
gauge_t sum = 0.0;
va_list ap;
- if (check_drop_value()) {
- if (record_statistics) {
- pthread_mutex_lock(&statistics_lock);
- stats_values_dropped++;
- pthread_mutex_unlock(&statistics_lock);
- }
- return 0;
- }
-
assert(template->values_len == 1);
/* Calculate sum for Gauge to calculate percent if needed */