]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
amqp1: Add option to limit internal send queue length 3432/head
authorRyan McCabe <rmccabe@redhat.com>
Mon, 23 Mar 2020 19:49:35 +0000 (15:49 -0400)
committerRyan McCabe <rmccabe@redhat.com>
Tue, 24 Mar 2020 20:18:45 +0000 (16:18 -0400)
ChangeLog: amqp1 plugin: Add option to limit internal send queue length.

Add a config  option to control the internal plugin send queue
length: SendQueueLimit

If SendQueueLimit is set to an integer greater than 0, the send queue
in the amqp1 plugin will be constrained to that number of entries to
avoid unbounded memory use in the case that there is no amqp1
connection. If a new message is queued when the queue has reached
maximum length, the oldest message in the queue will be discarded.

Signed-off-by: Ryan McCabe <rmccabe@redhat.com>
src/amqp1.c
src/collectd.conf.pod

index bdabcc2172e44b9ae8eafd3c6c4a4da76fc3bbc3..c4f14d12c7066b917a18423872fd2171b7984f34 100644 (file)
@@ -63,6 +63,7 @@ typedef struct amqp1_config_transport_s {
   char *password;
   char *address;
   int retry_delay;
+  int sendq_limit;
 } amqp1_config_transport_t;
 
 typedef struct amqp1_config_instance_s {
@@ -341,6 +342,15 @@ static int encqueue(cd_message_t *cdm,
   }
 
   pthread_mutex_lock(&send_lock);
+  if (transport->sendq_limit > 0 &&
+      DEQ_SIZE(out_messages) >= transport->sendq_limit) {
+    cd_message_t *evict;
+
+    DEBUG("amqp1 plugin: dropping oldest message because sendq is full");
+    evict = DEQ_HEAD(out_messages);
+    DEQ_REMOVE_HEAD(out_messages);
+    cd_message_free(evict);
+  }
   DEQ_INSERT_TAIL(out_messages, cdm);
   pthread_mutex_unlock(&send_lock);
 
@@ -697,6 +707,8 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_int(child, &transport->retry_delay);
     else if (strcasecmp("Instance", child->key) == 0)
       amqp1_config_instance(child);
+    else if (strcasecmp("SendQueueLimit", child->key) == 0)
+      status = cf_util_get_int(child, &transport->sendq_limit);
     else
       WARNING("amqp1 plugin: Ignoring unknown "
               "transport configuration option "
index cb9e40f76d13b1866138a8335a62939e61dc6738..c283885d9761d19ba5cd9086a096ae14b7fda340 100644 (file)
@@ -860,6 +860,12 @@ When the AMQP1 connection is lost, defines the time in seconds to wait
 before attempting to reconnect. Defaults to 1, which implies attempt
 to reconnect at 1 second intervals.
 
+=item B<SendQueueLimit> I<SendQueueLimit>
+If there is no AMQP1 connection, the plugin will continue to queue
+messages to send, which could result in unbounded memory consumption. This
+parameter is used to limit the number of messages in the outbound queue to
+the specified value. The default value is 0, which disables this feature.
+
 =back
 
 The following options are accepted within each I<Instance> block: