char *password;
char *address;
int retry_delay;
+ int sendq_limit;
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_s {
}
pthread_mutex_lock(&send_lock);
+ if (transport->sendq_limit > 0 &&
+ DEQ_SIZE(out_messages) >= transport->sendq_limit) {
+ cd_message_t *evict;
+
+ DEBUG("amqp1 plugin: dropping oldest message because sendq is full");
+ evict = DEQ_HEAD(out_messages);
+ DEQ_REMOVE_HEAD(out_messages);
+ cd_message_free(evict);
+ }
DEQ_INSERT_TAIL(out_messages, cdm);
pthread_mutex_unlock(&send_lock);
status = cf_util_get_int(child, &transport->retry_delay);
else if (strcasecmp("Instance", child->key) == 0)
amqp1_config_instance(child);
+ else if (strcasecmp("SendQueueLimit", child->key) == 0)
+ status = cf_util_get_int(child, &transport->sendq_limit);
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
before attempting to reconnect. Defaults to 1, which implies attempt
to reconnect at 1 second intervals.
+=item B<SendQueueLimit> I<SendQueueLimit>
+If there is no AMQP1 connection, the plugin will continue to queue
+messages to send, which could result in unbounded memory consumption. This
+parameter is used to limit the number of messages in the outbound queue to
+the specified value. The default value is 0, which disables this feature.
+
=back
The following options are accepted within each I<Instance> block: