]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
manager: Eliminate unnecessary code, simplify sessions in stasis callbacks
authorJoshua C. Colp <jcolp@sangoma.com>
Mon, 4 May 2026 21:02:38 +0000 (18:02 -0300)
committerJoshua C. Colp <jcolp@sangoma.com>
Mon, 11 May 2026 12:53:06 +0000 (12:53 +0000)
Due to stasis filtering the stasis callback for AMI type messages is
guaranteed to only receive messages that can be turned into AMI events,
so remove the check done in the callback.

The sessions container usage for the stasis callbacks has also been
simplified by having a reference on the message router subscription
instead of having to acquire the sessions from the global object each
time.

main/manager.c

index 473a2904ae22d4aa2e4bf0553af5a444be56ed0e..f5d2c1f500943e126bca3ae70c2dc0dae6a7c354 100644 (file)
@@ -572,32 +572,27 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
 static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
                                    struct stasis_message *message)
 {
-       struct ao2_container *sessions;
+       struct ao2_container *sessions = data;
        struct ast_manager_event_blob *ev;
 
-       if (!stasis_message_can_be_ami(message)) {
-               /* Not an AMI message; disregard */
-               return;
-       }
-
-       sessions = ao2_global_obj_ref(mgr_sessions);
+       /*
+        * This callback only receives messages that can be turned into AMI events, so
+        * no need to check that the message can be turned into an event before checking for listeners.
+        */
        if (!any_manager_listeners(sessions)) {
                /* Nobody is listening */
-               ao2_cleanup(sessions);
                return;
        }
 
        ev = stasis_message_to_ami(message);
        if (!ev) {
                /* Conversion failure */
-               ao2_cleanup(sessions);
                return;
        }
 
        manager_event_sessions(sessions, ev->event_flags, ev->manager_event,
                "%s", ev->extra_fields);
        ao2_ref(ev, -1);
-       ao2_cleanup(sessions);
 }
 
 static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
@@ -608,12 +603,10 @@ static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
        const char *type;
        struct ast_json *event;
        struct ast_str *event_buffer;
-       struct ao2_container *sessions;
+       struct ao2_container *sessions = data;
 
-       sessions = ao2_global_obj_ref(mgr_sessions);
        if (!any_manager_listeners(sessions)) {
                /* Nobody is listening */
-               ao2_cleanup(sessions);
                return;
        }
 
@@ -625,14 +618,33 @@ static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
        event_buffer = ast_manager_str_from_json_object(event, NULL);
        if (!event_buffer) {
                ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type);
-               ao2_cleanup(sessions);
                return;
        }
 
        manager_event_sessions(sessions, class_type, type,
                "%s", ast_str_buffer(event_buffer));
        ast_free(event_buffer);
-       ao2_cleanup(sessions);
+}
+
+/*!
+ * \brief Callback for subscription change messages
+ * \param userdata The subscription user data (in our case a pointer to the sessions container)
+ * \param sub The subscription
+ * \param message The message
+ */
+static void manager_subscription_change_msg_cb(void *userdata, struct stasis_subscription *sub,
+               struct stasis_message *message)
+{
+       /*
+        * When the subscription unsubscribes a final message is sent to the subscription
+        * to indicate it. We use this to manage the lifetime of the sessions container
+        * pointer stored with the subscription. When the subscription is done we drop
+        * the reference to the sessions container (userdata) so it can be cleaned up
+        * if needed.
+        */
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(userdata);
+       }
 }
 
 void ast_manager_publish_event(const char *type, int class_type, struct ast_json *obj)
@@ -9583,6 +9595,7 @@ static void manager_shutdown(void)
  */
 static int manager_subscriptions_init(void)
 {
+       struct ao2_container *sessions;
        int res = 0;
 
        rtp_topic_forwarder = stasis_forward_all(ast_rtp_topic(), manager_topic);
@@ -9602,11 +9615,26 @@ static int manager_subscriptions_init(void)
        stasis_message_router_set_congestion_limits(stasis_router, -1,
                6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
+       /*
+        * The reference to sessions passes to the stasis router subscription so
+        * no need to unref here at all. This is also invoked after creating the
+        * sessions container so it has to exist.
+        */
+       sessions = ao2_global_obj_ref(mgr_sessions);
+
        stasis_message_router_set_formatters_default(stasis_router,
-               manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI);
+               manager_default_msg_cb, sessions, STASIS_SUBSCRIPTION_FORMATTER_AMI);
 
        res |= stasis_message_router_add(stasis_router,
-               ast_manager_get_generic_type(), manager_generic_msg_cb, NULL);
+               ast_manager_get_generic_type(), manager_generic_msg_cb, sessions);
+
+       /*
+        * This specific callback is solely for lifetime management of the sessions
+        * reference. Once the subscription is finalized the reference is dropped in
+        * the callback.
+        */
+       res |= stasis_message_router_add(stasis_router,
+               stasis_subscription_change_type(), manager_subscription_change_msg_cb, sessions);
 
        if (res != 0) {
                return -1;