]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Revert "endpoints: Remove need for stasis subscription." revert-1530-endpoints-remove-subscription-and-router 1634/head
authorJoshua C. Colp <smooth.egg8323@colp.dev>
Mon, 8 Dec 2025 16:16:03 +0000 (12:16 -0400)
committerGitHub <noreply@github.com>
Mon, 8 Dec 2025 16:16:03 +0000 (12:16 -0400)
This reverts commit 27fbc166e1b2f1c23ce3194a862ce273d66218b6.

include/asterisk/channel.h
include/asterisk/endpoints.h
main/channel.c
main/channel_internal_api.c
main/channel_private.h
main/endpoints.c
tests/test_stasis_endpoints.c

index 7e8b3e353ebc67110f5846791d5d57388f8b7ce5..6a5f7674e2b96f427ce4eda441b4adc9a8e1e1a5 100644 (file)
@@ -2847,16 +2847,6 @@ void ast_channel_internal_swap_endpoint_forward(struct ast_channel *a, struct as
  */
 void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b);
 
-/*!
- * \brief Swap endpoints between two channels
- * \param a First channel
- * \param b Second channel
- *
- * \note
- * This is used in masquerade to exchange endpoints
- */
-void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b);
-
 /*!
  * \brief Set uniqueid and linkedid string value only (not time)
  * \param chan The channel to set the uniqueid to
@@ -4280,8 +4270,6 @@ ast_callid ast_channel_callid(const struct ast_channel *chan);
 struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan);
 void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot);
 struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan);
-struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan);
-void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint);
 
 /*!
  * \pre chan is locked
index 0be9d352baf1912aa8c7dd295a92e922714934ae..0c5edcec31dab3dd1ee026a0ee1b00ad149dca8e 100644 (file)
@@ -210,18 +210,5 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
        struct ast_channel *chan);
 
-/*!
- * \brief Removes a channel from the given endpoint.
- * \since 23.1.0
- * \since 22.7.0
- * \since 20.17.0
- *
- * \param endpoint
- * \param chan Channel.
- * \retval 0 on success.
- * \retval Non-zero on error.
- */
- int ast_endpoint_remove_channel(struct ast_endpoint *endpoint,
-       struct ast_channel *chan);
 
 #endif /* _ASTERISK_ENDPOINTS_H */
index 8121b5c3d78bb19b59e3de2488e272fce969a0da..f3c4d0285ac239600579029809738d11df171a47 100644 (file)
@@ -949,7 +949,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
 
 
        if (endpoint) {
-               ast_channel_endpoint_set(tmp, endpoint);
+               ast_endpoint_add_channel(endpoint, tmp);
        }
 
        /*
@@ -2210,8 +2210,6 @@ static void ast_channel_destructor(void *obj)
 
        ast_channel_lock(chan);
 
-       ast_channel_endpoint_set(chan, NULL);
-
        /* Get rid of each of the data stores on the channel */
        while ((datastore = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry)))
                /* Free the data store */
@@ -6982,9 +6980,6 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
        /* The old snapshots need to follow the channels so the snapshot update is correct */
        ast_channel_internal_swap_snapshots(clonechan, original);
 
-       /* Now we swap the endpoints if present */
-       ast_channel_internal_swap_endpoints(clonechan, original);
-
        /* Swap channel names. This uses ast_channel_name_set directly, so we
         * don't get any spurious rename events.
         */
index 9b41811bd982e13f590bd42245ae0e554653d886..1ebd6e9413f07a2e7c0292e8226469cec2b32acd 100644 (file)
@@ -1409,15 +1409,6 @@ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_chann
        b->snapshot = snapshot;
 }
 
-void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b)
-{
-       struct ast_endpoint *endpoint;
-
-       endpoint = a->endpoint;
-       a->endpoint = b->endpoint;
-       b->endpoint = endpoint;
-}
-
 void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid)
 {
        ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id));
@@ -1616,22 +1607,3 @@ struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan)
 {
        return &chan->snapshot_segment_flags;
 }
-
-struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan)
-{
-       return chan->endpoint;
-}
-
-void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint)
-{
-       if (chan->endpoint) {
-               ast_endpoint_remove_channel(chan->endpoint, chan);
-               ao2_ref(chan->endpoint, -1);
-       }
-
-       chan->endpoint = ao2_bump(endpoint);
-
-       if (chan->endpoint) {
-               ast_endpoint_add_channel(chan->endpoint, chan);
-       }
-}
index 3ff56128fa0507f36632f68a945b13847712666a..0655f11895729a7c732bed75aa69efbdf7673d08 100644 (file)
@@ -199,7 +199,6 @@ struct ast_channel {
        struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */
        struct ast_flags snapshot_segment_flags; /*!< Flags regarding the segments of the snapshot */
        int linked_in_container; /*!< Whether this channel is linked in a storage container */
-       struct ast_endpoint *endpoint; /*!< The endpoint associated with this channel */
 };
 
 #if defined(__cplusplus) || defined(c_plusplus)
index e27ae69c7225dd1d0638031f27a74166402fa064..c53e31d49b288bd5d4f093d93b8994d3b470941a 100644 (file)
@@ -34,6 +34,7 @@
 #include "asterisk/stasis.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_endpoints.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/stringfields.h"
 #include "asterisk/_private.h"
 
@@ -67,6 +68,8 @@ struct ast_endpoint {
        int max_channels;
        /*! Topic for this endpoint's messages */
        struct stasis_cp_single *topics;
+       /*! Router for handling this endpoint's messages */
+       struct stasis_message_router *router;
        /*! ast_str_container of channels associated with this endpoint */
        struct ao2_container *channel_ids;
        /*! Forwarding subscription from an endpoint to its tech endpoint */
@@ -143,6 +146,11 @@ static void endpoint_dtor(void *obj)
 {
        struct ast_endpoint *endpoint = obj;
 
+       /* The router should be shut down already */
+       ast_assert(stasis_message_router_is_done(endpoint->router));
+       ao2_cleanup(endpoint->router);
+       endpoint->router = NULL;
+
        stasis_cp_single_unsubscribe(endpoint->topics);
        endpoint->topics = NULL;
 
@@ -171,26 +179,43 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
        return 0;
 }
 
-int ast_endpoint_remove_channel(struct ast_endpoint *endpoint,
-       struct ast_channel *chan)
+/*! \brief Handler for channel snapshot update */
+static void endpoint_cache_clear(void *data,
+       struct stasis_subscription *sub,
+       struct stasis_message *message)
 {
-       ast_assert(chan != NULL);
+       struct ast_endpoint *endpoint = data;
+       struct ast_channel_snapshot_update *update = stasis_message_data(message);
+
+       /* Only when the channel is dead do we remove it */
+       if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
+               return;
+       }
+
        ast_assert(endpoint != NULL);
-       ast_assert(!ast_strlen_zero(endpoint->resource));
 
        ao2_lock(endpoint);
-       ast_str_container_remove(endpoint->channel_ids, ast_channel_uniqueid(chan));
+       ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid);
        ao2_unlock(endpoint);
-
        endpoint_publish_snapshot(endpoint);
+}
 
-       return 0;
+static void endpoint_subscription_change(void *data,
+       struct stasis_subscription *sub,
+       struct stasis_message *message)
+{
+       struct stasis_endpoint *endpoint = data;
+
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(endpoint);
+       }
 }
 
 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
 {
        RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
        RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
+       int r = 0;
 
        /* Get/create the technology endpoint */
        if (!ast_strlen_zero(resource)) {
@@ -247,6 +272,20 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
                stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
+               endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
+               if (!endpoint->router) {
+                       return NULL;
+               }
+               r |= stasis_message_router_add(endpoint->router,
+                       ast_channel_snapshot_type(), endpoint_cache_clear,
+                       endpoint);
+               r |= stasis_message_router_add(endpoint->router,
+                       stasis_subscription_change_type(), endpoint_subscription_change,
+                       endpoint);
+               if (r) {
+                       return NULL;
+               }
+
                endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
                        stasis_cp_single_topic(tech_endpoint->topics));
 
@@ -327,6 +366,10 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
                        stasis_publish(ast_endpoint_topic(endpoint), message);
                }
        }
+
+       /* Bump refcount to hold on to the router */
+       ao2_ref(endpoint->router, +1);
+       stasis_message_router_unsubscribe(endpoint->router);
 }
 
 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
index cd45196f4151c27f6213d68fd8ba1c2677f7de47..4c70e21fc5a215b994041e00ecf52e8836bbfd28 100644 (file)
@@ -256,8 +256,6 @@ AST_TEST_DEFINE(channel_messages)
        actual_snapshot = stasis_message_data(msg);
        ast_test_validate(test, 1 == actual_snapshot->num_channels);
 
-       ast_endpoint_remove_channel(uut, chan);
-
        ast_hangup(chan);
        chan = NULL;