]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
endpoints: Remove need for stasis subscription.
authorJoshua C. Colp <jcolp@sangoma.com>
Fri, 10 Oct 2025 16:01:23 +0000 (13:01 -0300)
committergithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Tue, 14 Oct 2025 20:02:04 +0000 (20:02 +0000)
When an endpoint is created in the core of Asterisk a subscription
was previously created alongside it to monitor any channels being
destroyed that were related to it. This was done by receiving all
channel snapshot updates for every channel and only reacting when
it was indicated that the channel was dead.

This change removes this logic and instead provides an API call
for directly removing a channel from an endpoint. This is called
when channels are destroyed. This operation is fast, so blocking
the calling thread for a short period of time doesn't have any
noticeable impact.

include/asterisk/channel.h
include/asterisk/endpoints.h
main/channel.c
main/channel_internal_api.c
main/channel_private.h
main/endpoints.c
tests/test_stasis_endpoints.c

index 030edca1f0bccaea181c570dcd1ee76f1a4005a4..eceb9aa9f28de3a9d645bcfd39bf9bf3ea1af076 100644 (file)
@@ -2847,6 +2847,16 @@ void ast_channel_internal_swap_endpoint_forward(struct ast_channel *a, struct as
  */
 void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b);
 
+/*!
+ * \brief Swap endpoints between two channels
+ * \param a First channel
+ * \param b Second channel
+ *
+ * \note
+ * This is used in masquerade to exchange endpoints
+ */
+void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b);
+
 /*!
  * \brief Set uniqueid and linkedid string value only (not time)
  * \param chan The channel to set the uniqueid to
@@ -4262,6 +4272,8 @@ ast_callid ast_channel_callid(const struct ast_channel *chan);
 struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan);
 void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot);
 struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan);
+struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan);
+void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint);
 
 /*!
  * \pre chan is locked
index 0c5edcec31dab3dd1ee026a0ee1b00ad149dca8e..0be9d352baf1912aa8c7dd295a92e922714934ae 100644 (file)
@@ -210,5 +210,18 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
        struct ast_channel *chan);
 
+/*!
+ * \brief Removes a channel from the given endpoint.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param endpoint
+ * \param chan Channel.
+ * \retval 0 on success.
+ * \retval Non-zero on error.
+ */
+ int ast_endpoint_remove_channel(struct ast_endpoint *endpoint,
+       struct ast_channel *chan);
 
 #endif /* _ASTERISK_ENDPOINTS_H */
index 257051960d8b4a6ca1a0f346b6bedb00ea0038df..2de4bf85d535bacf9fb091e6b24a1c48b7164b23 100644 (file)
@@ -949,7 +949,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
 
 
        if (endpoint) {
-               ast_endpoint_add_channel(endpoint, tmp);
+               ast_channel_endpoint_set(tmp, endpoint);
        }
 
        /*
@@ -2192,6 +2192,8 @@ static void ast_channel_destructor(void *obj)
 
        ast_channel_lock(chan);
 
+       ast_channel_endpoint_set(chan, NULL);
+
        /* Get rid of each of the data stores on the channel */
        while ((datastore = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry)))
                /* Free the data store */
@@ -6955,6 +6957,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
        /* The old snapshots need to follow the channels so the snapshot update is correct */
        ast_channel_internal_swap_snapshots(clonechan, original);
 
+       /* Now we swap the endpoints if present */
+       ast_channel_internal_swap_endpoints(clonechan, original);
+
        /* Swap channel names. This uses ast_channel_name_set directly, so we
         * don't get any spurious rename events.
         */
index 7e256c7404b4ed5de0b04ceb0504284f85af01c3..ce2f7d5d646c3fa86fb9747470689649a7fbb72b 100644 (file)
@@ -1400,6 +1400,15 @@ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_chann
        b->snapshot = snapshot;
 }
 
+void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b)
+{
+       struct ast_endpoint *endpoint;
+
+       endpoint = a->endpoint;
+       a->endpoint = b->endpoint;
+       b->endpoint = endpoint;
+}
+
 void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid)
 {
        ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id));
@@ -1598,3 +1607,22 @@ struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan)
 {
        return &chan->snapshot_segment_flags;
 }
+
+struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan)
+{
+       return chan->endpoint;
+}
+
+void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint)
+{
+       if (chan->endpoint) {
+               ast_endpoint_remove_channel(chan->endpoint, chan);
+               ao2_ref(chan->endpoint, -1);
+       }
+
+       chan->endpoint = ao2_bump(endpoint);
+
+       if (chan->endpoint) {
+               ast_endpoint_add_channel(chan->endpoint, chan);
+       }
+}
index 28b402b08f2fa838643c36fd17234a47c59d79ee..5842f43da724ece90f5f0b0ee2cd522fb35b3d44 100644 (file)
@@ -196,6 +196,7 @@ struct ast_channel {
        struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */
        struct ast_flags snapshot_segment_flags; /*!< Flags regarding the segments of the snapshot */
        int linked_in_container; /*!< Whether this channel is linked in a storage container */
+       struct ast_endpoint *endpoint; /*!< The endpoint associated with this channel */
 };
 
 #if defined(__cplusplus) || defined(c_plusplus)
index c53e31d49b288bd5d4f093d93b8994d3b470941a..e27ae69c7225dd1d0638031f27a74166402fa064 100644 (file)
@@ -34,7 +34,6 @@
 #include "asterisk/stasis.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_endpoints.h"
-#include "asterisk/stasis_message_router.h"
 #include "asterisk/stringfields.h"
 #include "asterisk/_private.h"
 
@@ -68,8 +67,6 @@ struct ast_endpoint {
        int max_channels;
        /*! Topic for this endpoint's messages */
        struct stasis_cp_single *topics;
-       /*! Router for handling this endpoint's messages */
-       struct stasis_message_router *router;
        /*! ast_str_container of channels associated with this endpoint */
        struct ao2_container *channel_ids;
        /*! Forwarding subscription from an endpoint to its tech endpoint */
@@ -146,11 +143,6 @@ static void endpoint_dtor(void *obj)
 {
        struct ast_endpoint *endpoint = obj;
 
-       /* The router should be shut down already */
-       ast_assert(stasis_message_router_is_done(endpoint->router));
-       ao2_cleanup(endpoint->router);
-       endpoint->router = NULL;
-
        stasis_cp_single_unsubscribe(endpoint->topics);
        endpoint->topics = NULL;
 
@@ -179,43 +171,26 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
        return 0;
 }
 
-/*! \brief Handler for channel snapshot update */
-static void endpoint_cache_clear(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
+int ast_endpoint_remove_channel(struct ast_endpoint *endpoint,
+       struct ast_channel *chan)
 {
-       struct ast_endpoint *endpoint = data;
-       struct ast_channel_snapshot_update *update = stasis_message_data(message);
-
-       /* Only when the channel is dead do we remove it */
-       if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
-               return;
-       }
-
+       ast_assert(chan != NULL);
        ast_assert(endpoint != NULL);
+       ast_assert(!ast_strlen_zero(endpoint->resource));
 
        ao2_lock(endpoint);
-       ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid);
+       ast_str_container_remove(endpoint->channel_ids, ast_channel_uniqueid(chan));
        ao2_unlock(endpoint);
-       endpoint_publish_snapshot(endpoint);
-}
 
-static void endpoint_subscription_change(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
-{
-       struct stasis_endpoint *endpoint = data;
+       endpoint_publish_snapshot(endpoint);
 
-       if (stasis_subscription_final_message(sub, message)) {
-               ao2_cleanup(endpoint);
-       }
+       return 0;
 }
 
 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
 {
        RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
        RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
-       int r = 0;
 
        /* Get/create the technology endpoint */
        if (!ast_strlen_zero(resource)) {
@@ -272,20 +247,6 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
                stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
-               endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
-               if (!endpoint->router) {
-                       return NULL;
-               }
-               r |= stasis_message_router_add(endpoint->router,
-                       ast_channel_snapshot_type(), endpoint_cache_clear,
-                       endpoint);
-               r |= stasis_message_router_add(endpoint->router,
-                       stasis_subscription_change_type(), endpoint_subscription_change,
-                       endpoint);
-               if (r) {
-                       return NULL;
-               }
-
                endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
                        stasis_cp_single_topic(tech_endpoint->topics));
 
@@ -366,10 +327,6 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
                        stasis_publish(ast_endpoint_topic(endpoint), message);
                }
        }
-
-       /* Bump refcount to hold on to the router */
-       ao2_ref(endpoint->router, +1);
-       stasis_message_router_unsubscribe(endpoint->router);
 }
 
 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
index 4c70e21fc5a215b994041e00ecf52e8836bbfd28..cd45196f4151c27f6213d68fd8ba1c2677f7de47 100644 (file)
@@ -256,6 +256,8 @@ AST_TEST_DEFINE(channel_messages)
        actual_snapshot = stasis_message_data(msg);
        ast_test_validate(test, 1 == actual_snapshot->num_channels);
 
+       ast_endpoint_remove_channel(uut, chan);
+
        ast_hangup(chan);
        chan = NULL;