From e7dd149f6f8279d844d172663023a841aa032a93 Mon Sep 17 00:00:00 2001 From: Ryan McCabe Date: Mon, 23 Mar 2020 15:49:35 -0400 Subject: [PATCH] amqp1: Add option to limit internal send queue length ChangeLog: amqp1 plugin: Add option to limit internal send queue length. Add a config option to control the internal plugin send queue length: SendQueueLimit If SendQueueLimit is set to an integer greater than 0, the send queue in the amqp1 plugin will be constrained to that number of entries to avoid unbounded memory use in the case that there is no amqp1 connection. If a new message is queued when the queue has reached maximum length, the oldest message in the queue will be discarded. Signed-off-by: Ryan McCabe --- src/amqp1.c | 12 ++++++++++++ src/collectd.conf.pod | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/src/amqp1.c b/src/amqp1.c index bdabcc217..c4f14d12c 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -63,6 +63,7 @@ typedef struct amqp1_config_transport_s { char *password; char *address; int retry_delay; + int sendq_limit; } amqp1_config_transport_t; typedef struct amqp1_config_instance_s { @@ -341,6 +342,15 @@ static int encqueue(cd_message_t *cdm, } pthread_mutex_lock(&send_lock); + if (transport->sendq_limit > 0 && + DEQ_SIZE(out_messages) >= transport->sendq_limit) { + cd_message_t *evict; + + DEBUG("amqp1 plugin: dropping oldest message because sendq is full"); + evict = DEQ_HEAD(out_messages); + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(evict); + } DEQ_INSERT_TAIL(out_messages, cdm); pthread_mutex_unlock(&send_lock); @@ -697,6 +707,8 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_int(child, &transport->retry_delay); else if (strcasecmp("Instance", child->key) == 0) amqp1_config_instance(child); + else if (strcasecmp("SendQueueLimit", child->key) == 0) + status = cf_util_get_int(child, &transport->sendq_limit); else WARNING("amqp1 plugin: Ignoring unknown " "transport configuration option " diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index cb9e40f76..c283885d9 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -860,6 +860,12 @@ When the AMQP1 connection is lost, defines the time in seconds to wait before attempting to reconnect. Defaults to 1, which implies attempt to reconnect at 1 second intervals. +=item B I +If there is no AMQP1 connection, the plugin will continue to queue +messages to send, which could result in unbounded memory consumption. This +parameter is used to limit the number of messages in the outbound queue to +the specified value. The default value is 0, which disables this feature. + =back The following options are accepted within each I block: -- 2.47.2