static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- struct ao2_container *sessions;
+ struct ao2_container *sessions = data;
struct ast_manager_event_blob *ev;
- if (!stasis_message_can_be_ami(message)) {
- /* Not an AMI message; disregard */
- return;
- }
-
- sessions = ao2_global_obj_ref(mgr_sessions);
+ /*
+ * This callback only receives messages that can be turned into AMI events, so
+ * no need to check that the message can be turned into an event before checking for listeners.
+ */
if (!any_manager_listeners(sessions)) {
/* Nobody is listening */
- ao2_cleanup(sessions);
return;
}
ev = stasis_message_to_ami(message);
if (!ev) {
/* Conversion failure */
- ao2_cleanup(sessions);
return;
}
manager_event_sessions(sessions, ev->event_flags, ev->manager_event,
"%s", ev->extra_fields);
ao2_ref(ev, -1);
- ao2_cleanup(sessions);
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
const char *type;
struct ast_json *event;
struct ast_str *event_buffer;
- struct ao2_container *sessions;
+ struct ao2_container *sessions = data;
- sessions = ao2_global_obj_ref(mgr_sessions);
if (!any_manager_listeners(sessions)) {
/* Nobody is listening */
- ao2_cleanup(sessions);
return;
}
event_buffer = ast_manager_str_from_json_object(event, NULL);
if (!event_buffer) {
ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type);
- ao2_cleanup(sessions);
return;
}
manager_event_sessions(sessions, class_type, type,
"%s", ast_str_buffer(event_buffer));
ast_free(event_buffer);
- ao2_cleanup(sessions);
+}
+
+/*!
+ * \brief Callback for subscription change messages
+ * \param userdata The subscription user data (in our case a pointer to the sessions container)
+ * \param sub The subscription
+ * \param message The message
+ */
+static void manager_subscription_change_msg_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ /*
+ * When the subscription unsubscribes a final message is sent to the subscription
+ * to indicate it. We use this to manage the lifetime of the sessions container
+ * pointer stored with the subscription. When the subscription is done we drop
+ * the reference to the sessions container (userdata) so it can be cleaned up
+ * if needed.
+ */
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(userdata);
+ }
}
void ast_manager_publish_event(const char *type, int class_type, struct ast_json *obj)
*/
static int manager_subscriptions_init(void)
{
+ struct ao2_container *sessions;
int res = 0;
rtp_topic_forwarder = stasis_forward_all(ast_rtp_topic(), manager_topic);
stasis_message_router_set_congestion_limits(stasis_router, -1,
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
+ /*
+ * The reference to sessions passes to the stasis router subscription so
+ * no need to unref here at all. This is also invoked after creating the
+ * sessions container so it has to exist.
+ */
+ sessions = ao2_global_obj_ref(mgr_sessions);
+
stasis_message_router_set_formatters_default(stasis_router,
- manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI);
+ manager_default_msg_cb, sessions, STASIS_SUBSCRIPTION_FORMATTER_AMI);
res |= stasis_message_router_add(stasis_router,
- ast_manager_get_generic_type(), manager_generic_msg_cb, NULL);
+ ast_manager_get_generic_type(), manager_generic_msg_cb, sessions);
+
+ /*
+ * This specific callback is solely for lifetime management of the sessions
+ * reference. Once the subscription is finalized the reference is dropped in
+ * the callback.
+ */
+ res |= stasis_message_router_add(stasis_router,
+ stasis_subscription_change_type(), manager_subscription_change_msg_cb, sessions);
if (res != 0) {
return -1;