struct stasis_subscription *stasis_unsubscribe(
struct stasis_subscription *subscription);
+/*!
+ * \brief Set the high and low alert water marks of the stasis subscription.
+ * \since 13.10.0
+ *
+ * \param subscription Pointer to a stasis subscription
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+ long low_water, long high_water);
+
/*!
* \brief Block until the last message is processed on a subscription.
*
void stasis_message_router_publish_sync(struct stasis_message_router *router,
struct stasis_message *message);
+/*!
+ * \brief Set the high and low alert water marks of the stasis message router.
+ * \since 13.10.0
+ *
+ * \param router Pointer to a stasis message router
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+ long low_water, long high_water);
+
/*!
* \brief Add a route to a message router.
*
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<configInfo name="cdr" language="en_US">
if (!stasis_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(stasis_router, -1,
+ 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
return -1;
#include "asterisk/parking.h"
#include "asterisk/pickup.h"
#include "asterisk/core_local.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<configInfo name="cel" language="en_US">
if (!cel_state_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(cel_state_router, -1,
+ 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
ret |= stasis_message_router_add(cel_state_router,
stasis_cache_update_type(),
#include "asterisk/rtp_engine.h"
#include "asterisk/format_cache.h"
#include "asterisk/translate.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
if (!stasis_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(stasis_router, -1,
+ 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
res |= stasis_message_router_set_default(stasis_router,
manager_default_msg_cb, NULL);
return NULL;
}
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+ long low_water, long high_water)
+{
+ int res = -1;
+
+ if (subscription) {
+ res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+ low_water, high_water);
+ }
+ return res;
+}
+
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
ao2_cleanup(router);
}
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+ long low_water, long high_water)
+{
+ int res = -1;
+
+ if (router) {
+ res = stasis_subscription_set_congestion_limits(router->subscription,
+ low_water, high_water);
+ }
+ return res;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)