From 792e2cdbc2b3e73553606e5ad596e8b178560e12 Mon Sep 17 00:00:00 2001 From: "Joshua C. Colp" Date: Mon, 8 Dec 2025 12:16:03 -0400 Subject: [PATCH] Revert "endpoints: Remove need for stasis subscription." This reverts commit 27fbc166e1b2f1c23ce3194a862ce273d66218b6. --- include/asterisk/channel.h | 12 -------- include/asterisk/endpoints.h | 13 -------- main/channel.c | 7 +---- main/channel_internal_api.c | 28 ----------------- main/channel_private.h | 1 - main/endpoints.c | 57 ++++++++++++++++++++++++++++++----- tests/test_stasis_endpoints.c | 2 -- 7 files changed, 51 insertions(+), 69 deletions(-) diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index 7e8b3e353e..6a5f7674e2 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -2847,16 +2847,6 @@ void ast_channel_internal_swap_endpoint_forward(struct ast_channel *a, struct as */ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b); -/*! - * \brief Swap endpoints between two channels - * \param a First channel - * \param b Second channel - * - * \note - * This is used in masquerade to exchange endpoints - */ -void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b); - /*! * \brief Set uniqueid and linkedid string value only (not time) * \param chan The channel to set the uniqueid to @@ -4280,8 +4270,6 @@ ast_callid ast_channel_callid(const struct ast_channel *chan); struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan); void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot); struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan); -struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan); -void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint); /*! * \pre chan is locked diff --git a/include/asterisk/endpoints.h b/include/asterisk/endpoints.h index 0be9d352ba..0c5edcec31 100644 --- a/include/asterisk/endpoints.h +++ b/include/asterisk/endpoints.h @@ -210,18 +210,5 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan); -/*! - * \brief Removes a channel from the given endpoint. - * \since 23.1.0 - * \since 22.7.0 - * \since 20.17.0 - * - * \param endpoint - * \param chan Channel. - * \retval 0 on success. - * \retval Non-zero on error. - */ - int ast_endpoint_remove_channel(struct ast_endpoint *endpoint, - struct ast_channel *chan); #endif /* _ASTERISK_ENDPOINTS_H */ diff --git a/main/channel.c b/main/channel.c index 8121b5c3d7..f3c4d0285a 100644 --- a/main/channel.c +++ b/main/channel.c @@ -949,7 +949,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char if (endpoint) { - ast_channel_endpoint_set(tmp, endpoint); + ast_endpoint_add_channel(endpoint, tmp); } /* @@ -2210,8 +2210,6 @@ static void ast_channel_destructor(void *obj) ast_channel_lock(chan); - ast_channel_endpoint_set(chan, NULL); - /* Get rid of each of the data stores on the channel */ while ((datastore = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) /* Free the data store */ @@ -6982,9 +6980,6 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann /* The old snapshots need to follow the channels so the snapshot update is correct */ ast_channel_internal_swap_snapshots(clonechan, original); - /* Now we swap the endpoints if present */ - ast_channel_internal_swap_endpoints(clonechan, original); - /* Swap channel names. This uses ast_channel_name_set directly, so we * don't get any spurious rename events. */ diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index 9b41811bd9..1ebd6e9413 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -1409,15 +1409,6 @@ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_chann b->snapshot = snapshot; } -void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b) -{ - struct ast_endpoint *endpoint; - - endpoint = a->endpoint; - a->endpoint = b->endpoint; - b->endpoint = endpoint; -} - void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) { ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id)); @@ -1616,22 +1607,3 @@ struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan) { return &chan->snapshot_segment_flags; } - -struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan) -{ - return chan->endpoint; -} - -void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint) -{ - if (chan->endpoint) { - ast_endpoint_remove_channel(chan->endpoint, chan); - ao2_ref(chan->endpoint, -1); - } - - chan->endpoint = ao2_bump(endpoint); - - if (chan->endpoint) { - ast_endpoint_add_channel(chan->endpoint, chan); - } -} diff --git a/main/channel_private.h b/main/channel_private.h index 3ff56128fa..0655f11895 100644 --- a/main/channel_private.h +++ b/main/channel_private.h @@ -199,7 +199,6 @@ struct ast_channel { struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */ struct ast_flags snapshot_segment_flags; /*!< Flags regarding the segments of the snapshot */ int linked_in_container; /*!< Whether this channel is linked in a storage container */ - struct ast_endpoint *endpoint; /*!< The endpoint associated with this channel */ }; #if defined(__cplusplus) || defined(c_plusplus) diff --git a/main/endpoints.c b/main/endpoints.c index e27ae69c72..c53e31d49b 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -34,6 +34,7 @@ #include "asterisk/stasis.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_endpoints.h" +#include "asterisk/stasis_message_router.h" #include "asterisk/stringfields.h" #include "asterisk/_private.h" @@ -67,6 +68,8 @@ struct ast_endpoint { int max_channels; /*! Topic for this endpoint's messages */ struct stasis_cp_single *topics; + /*! Router for handling this endpoint's messages */ + struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; /*! Forwarding subscription from an endpoint to its tech endpoint */ @@ -143,6 +146,11 @@ static void endpoint_dtor(void *obj) { struct ast_endpoint *endpoint = obj; + /* The router should be shut down already */ + ast_assert(stasis_message_router_is_done(endpoint->router)); + ao2_cleanup(endpoint->router); + endpoint->router = NULL; + stasis_cp_single_unsubscribe(endpoint->topics); endpoint->topics = NULL; @@ -171,26 +179,43 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, return 0; } -int ast_endpoint_remove_channel(struct ast_endpoint *endpoint, - struct ast_channel *chan) +/*! \brief Handler for channel snapshot update */ +static void endpoint_cache_clear(void *data, + struct stasis_subscription *sub, + struct stasis_message *message) { - ast_assert(chan != NULL); + struct ast_endpoint *endpoint = data; + struct ast_channel_snapshot_update *update = stasis_message_data(message); + + /* Only when the channel is dead do we remove it */ + if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { + return; + } + ast_assert(endpoint != NULL); - ast_assert(!ast_strlen_zero(endpoint->resource)); ao2_lock(endpoint); - ast_str_container_remove(endpoint->channel_ids, ast_channel_uniqueid(chan)); + ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid); ao2_unlock(endpoint); - endpoint_publish_snapshot(endpoint); +} - return 0; +static void endpoint_subscription_change(void *data, + struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_endpoint *endpoint = data; + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(endpoint); + } } static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource) { RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup); + int r = 0; /* Get/create the technology endpoint */ if (!ast_strlen_zero(resource)) { @@ -247,6 +272,20 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); + if (!endpoint->router) { + return NULL; + } + r |= stasis_message_router_add(endpoint->router, + ast_channel_snapshot_type(), endpoint_cache_clear, + endpoint); + r |= stasis_message_router_add(endpoint->router, + stasis_subscription_change_type(), endpoint_subscription_change, + endpoint); + if (r) { + return NULL; + } + endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), stasis_cp_single_topic(tech_endpoint->topics)); @@ -327,6 +366,10 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) stasis_publish(ast_endpoint_topic(endpoint), message); } } + + /* Bump refcount to hold on to the router */ + ao2_ref(endpoint->router, +1); + stasis_message_router_unsubscribe(endpoint->router); } const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint) diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index cd45196f41..4c70e21fc5 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -256,8 +256,6 @@ AST_TEST_DEFINE(channel_messages) actual_snapshot = stasis_message_data(msg); ast_test_validate(test, 1 == actual_snapshot->num_channels); - ast_endpoint_remove_channel(uut, chan); - ast_hangup(chan); chan = NULL; -- 2.47.3