]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
stasis: Add setting subscription congestion levels. 00/3000/1
authorRichard Mudgett <rmudgett@digium.com>
Fri, 3 Jun 2016 16:35:49 +0000 (11:35 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Wed, 8 Jun 2016 18:46:05 +0000 (13:46 -0500)
Stasis subscriptions and message routers create taskprocessors to process
the event messages.  API calls are needed to be able to set the congestion
levels of these taskprocessors for selected subscriptions and message
routers.

* Updated CDR, CEL, and manager's stasis subscription congestion levels
based upon stress testing.  Increased the congestion levels to reduce the
potential for bursty call setup/teardown activity from triggering the
taskprocessor overload alert.  CDRs in particular need an extra high
congestion level because they can take awhile to process the stasis
messages.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: Id0a716394b4eee746dd158acc63d703902450244

include/asterisk/stasis.h
include/asterisk/stasis_message_router.h
main/cdr.c
main/cel.c
main/manager.c
main/stasis.c
main/stasis_message_router.c

index 16b30ccb3845803f29591ce74bf16cfffe538f1f..b1f7823ef76d5576dee0697d00975f7f0a9d6821 100644 (file)
@@ -592,6 +592,20 @@ struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
 struct stasis_subscription *stasis_unsubscribe(
        struct stasis_subscription *subscription);
 
+/*!
+ * \brief Set the high and low alert water marks of the stasis subscription.
+ * \since 13.10.0
+ *
+ * \param subscription Pointer to a stasis subscription
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+       long low_water, long high_water);
+
 /*!
  * \brief Block until the last message is processed on a subscription.
  *
index 89657a5ee5eddd10dfe653cd8c72162af18e1093..50270a788b4c89537d5ce9ad3e26f863ff1408d5 100644 (file)
@@ -126,6 +126,20 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
 void stasis_message_router_publish_sync(struct stasis_message_router *router,
        struct stasis_message *message);
 
+/*!
+ * \brief Set the high and low alert water marks of the stasis message router.
+ * \since 13.10.0
+ *
+ * \param router Pointer to a stasis message router
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+       long low_water, long high_water);
+
 /*!
  * \brief Add a route to a message router.
  *
index 7795a65fda931bd8c0c70ad1bb79b2ff91eb4aee..ab6530ed3be1ed8145a01d323d8fbbba3b21e400 100644 (file)
@@ -71,6 +71,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
 
 /*** DOCUMENTATION
        <configInfo name="cdr" language="en_US">
@@ -4184,6 +4185,8 @@ int ast_cdr_engine_init(void)
        if (!stasis_router) {
                return -1;
        }
+       stasis_message_router_set_congestion_limits(stasis_router, -1,
+               10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
        if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
                return -1;
index d9fcc5f6b2aa65845a7fe7a0b1dac9f3de73d79a..a26a939180cde61471bc11cc7f903feee9d95485 100644 (file)
@@ -59,6 +59,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/parking.h"
 #include "asterisk/pickup.h"
 #include "asterisk/core_local.h"
+#include "asterisk/taskprocessor.h"
 
 /*** DOCUMENTATION
        <configInfo name="cel" language="en_US">
@@ -1575,6 +1576,8 @@ static int create_routes(void)
        if (!cel_state_router) {
                return -1;
        }
+       stasis_message_router_set_congestion_limits(cel_state_router, -1,
+               6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
        ret |= stasis_message_router_add(cel_state_router,
                stasis_cache_update_type(),
index de003813c93457e61a5f23fdeec4b81317ec0a9f..2330ca8ef530509d095bc8c2faad2e4beb1a611b 100644 (file)
@@ -100,6 +100,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/rtp_engine.h"
 #include "asterisk/format_cache.h"
 #include "asterisk/translate.h"
+#include "asterisk/taskprocessor.h"
 
 /*** DOCUMENTATION
        <manager name="Ping" language="en_US">
@@ -8538,6 +8539,8 @@ static int manager_subscriptions_init(void)
        if (!stasis_router) {
                return -1;
        }
+       stasis_message_router_set_congestion_limits(stasis_router, -1,
+               6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
        res |= stasis_message_router_set_default(stasis_router,
                manager_default_msg_cb, NULL);
index 4fb69033f5a19d978f8b0b1070f1922ae7dd4fbd..bbafb69e16868f72d5f394a7194be7521f8fbd27 100644 (file)
@@ -564,6 +564,18 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
        return NULL;
 }
 
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+       long low_water, long high_water)
+{
+       int res = -1;
+
+       if (subscription) {
+               res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+                       low_water, high_water);
+       }
+       return res;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
index 26df76c53fc3935f7c4af957522ae0ba349043c3..cf0ac787ed6d4ad72df7004b281517b18e36ed07 100644 (file)
@@ -289,6 +289,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router,
        ao2_cleanup(router);
 }
 
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+       long low_water, long high_water)
+{
+       int res = -1;
+
+       if (router) {
+               res = stasis_subscription_set_congestion_limits(router->subscription,
+                       low_water, high_water);
+       }
+       return res;
+}
+
 int stasis_message_router_add(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)