res |= ast_data_unregister(NULL);
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_extension_state_del(0, extension_state_cb);
{
poll_thread_run = 0;
- if (mwi_sub_sub) {
- mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
- }
+ mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
ast_mutex_lock(&poll_lock);
ast_cond_signal(&poll_cond);
static void network_change_stasis_unsubscribe(void)
{
- if (network_change_sub) {
- network_change_sub = stasis_unsubscribe(network_change_sub);
- }
+ network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)
if (peer->dnsmgr)
ast_dnsmgr_release(peer->dnsmgr);
- if (peer->mwi_event_sub) {
- peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
- }
+ peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
ast_string_field_free_memory(peer);
}
static void network_change_stasis_unsubscribe(void)
{
- if (network_change_sub) {
- network_change_sub = stasis_unsubscribe(network_change_sub);
- }
+ network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
{
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
- acl_change_stasis_cb, NULL);
+ acl_change_stasis_cb, NULL);
}
}
static void acl_change_event_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)
return AST_TEST_FAIL;
}
- test_sub = stasis_unsubscribe(test_sub);
+ test_sub = stasis_unsubscribe_and_join(test_sub);
ao2_cleanup(cb_data->presence_state);
ast_free((char *)cb_data);
* stasis_subscription. Due to cyclic references, the \ref
* stasis_subscription will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
+ *
+ * \par Shutdown
+ *
+ * Subscriptions have two options for unsubscribing, depending upon the context
+ * in which you need to unsubscribe.
+ *
+ * If your subscription is owned by a module, and you must unsubscribe from the
+ * module_unload() function, then you'll want to use the
+ * stasis_unsubscribe_and_join() function. This will block until the final
+ * message has been received on the subscription. Otherwise, there's the danger
+ * of invoking the callback function after it has been unloaded.
+ *
+ * If your subscription is owned by an object, then your object should have an
+ * explicit shutdown() function, which calls stasis_unsubscribe(). In your
+ * subscription handler, when the stasis_subscription_final_message() has been
+ * received, decrement the refcount on your object. In your object's destructor,
+ * you may assert that stasis_subscription_is_done() to validate that the
+ * subscription's callback will no longer be invoked.
+ *
+ * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
+ * an object's destructor. While code that does this may work most of the time,
+ * it's got one big downside. There's a general assumption that object
+ * destruction is non-blocking. If you block the destruction waiting for the
+ * subscription to complete, there's the danger that the subscription may
+ * process a message which will bump the refcount up by one. Then it does
+ * whatever it does, decrements the refcount, which then proceeds to re-destroy
+ * the object. Now you've got hard to reproduce bugs that only show up under
+ * certain loads.
*/
#include "asterisk/utils.h"
* \since 12
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
- stasis_subscription_cb callback,
- void *data);
+ stasis_subscription_cb callback, void *data);
/*!
* \brief Cancel a subscription.
* delivery of the final message.
*
* \param subscription Subscription to cancel.
- * \retval NULL for convenience
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+ struct stasis_subscription *subscription);
+
+/*!
+ * \brief Block until the last message is processed on a subscription.
+ *
+ * This function will not return until the \a subscription's callback for the
+ * stasis_subscription_final_message() completes. This allows cleanup routines
+ * to run before unblocking the joining thread.
+ *
+ * \param subscription Subscription to block on.
+ * \since 12
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Returns whether \a subscription has received its final message.
+ *
+ * Note that a subscription is considered done even while the
+ * stasis_subscription_final_message() is being processed. This allows cleanup
+ * routines to check the status of the subscription.
+ *
+ * \param subscription Subscription.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_subscription_is_done(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait for
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription);
/*!
* \brief Create a subscription which forwards all messages from one topic to
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic);
/*!
* \brief Get the unique ID for the subscription.
/*!
* \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
* \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
*/
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
* \brief Find or create a topic in the pool
* \param pool Pool for which to get the topic
* \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
/*!
- * Unsubscribes a caching topic from its upstream topic.
+ * \brief Unsubscribes a caching topic from its upstream topic.
+ *
+ * This function returns immediately, so be sure to cleanup when
+ * stasis_subscription_final_message() is received.
+ *
+ * \param caching_topic Caching topic to unsubscribe
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe(
+ struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Unsubscribes a caching topic from its upstream topic, blocking until
+ * all messages have been forwarded.
+ *
+ * See stasis_unsubscriben_and_join() for more info on when to use this as
+ * opposed to stasis_caching_unsubscribe().
+ *
* \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
+ struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
/*!
* \brief Dump cached items to a subscription
* \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
* \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
* \since 12
*/
struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
/*!
* \brief Unsubscribe the router from the upstream topic.
+ *
* \param router Router to unsubscribe.
* \since 12
*/
void stasis_message_router_unsubscribe(struct stasis_message_router *router);
+/*!
+ * \brief Unsubscribe the router from the upstream topic, blocking until the
+ * final message has been processed.
+ *
+ * See stasis_unsubscribe_and_join() for info on when to use this
+ * vs. stasis_message_router_unsubscribe().
+ *
+ * \param router Router to unsubscribe.
+ * \since 12
+ */
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router);
+
+/*!
+ * \brief Returns whether \a router has received its final message.
+ *
+ * \param router Router.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_message_router_is_done(struct stasis_message_router *router);
+
/*!
* \brief Add a route to a message router.
* \param router Router to add the route to.
{
ao2_cleanup(mwi_topic_all);
mwi_topic_all = NULL;
- mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+ mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
ao2_cleanup(mwi_topic_pool);
mwi_topic_pool = NULL;
{
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
struct ast_endpoint *endpoint = obj;
/* The router should be shut down already */
- ast_assert(endpoint->router == NULL);
+ ast_assert(stasis_message_router_is_done(endpoint->router));
+ ao2_cleanup(endpoint->router);
+ endpoint->router = NULL;
stasis_unsubscribe(endpoint->forward);
endpoint->forward = NULL;
stasis_publish(endpoint->topic, message);
}
+ /* Bump refcount to hold on to the router */
+ ao2_ref(endpoint->router, +1);
stasis_message_router_unsubscribe(endpoint->router);
- endpoint->router = NULL;
}
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
static void acl_change_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
/* In order to understand what the heck is going on with the
static void manager_channels_shutdown(void)
{
- stasis_message_router_unsubscribe(channel_state_router);
+ stasis_message_router_unsubscribe_and_join(channel_state_router);
channel_state_router = NULL;
}
{
int x;
- if (presence_state_sub) {
- presence_state_sub = stasis_unsubscribe(presence_state_sub);
- }
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
/* Unregister builtin applications */
for (x = 0; x < ARRAY_LEN(builtins); x++) {
stasis_subscription_cb callback;
/*! Data pointer to be handed to the callback. */
void *data;
+
+ /*! Lock for joining with subscription. */
+ ast_mutex_t join_lock;
+ /*! Condition for joining with subscription. */
+ ast_cond_t join_cond;
+ /*! Flag set when final message for sub has been received.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_rxed;
+ /*! Flag set when final message for sub has been processed.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_processed;
};
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_assert(stasis_subscription_is_done(sub));
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
+ ast_mutex_destroy(&sub->join_lock);
+ ast_cond_destroy(&sub->join_cond);
}
/*!
struct stasis_topic *topic,
struct stasis_message *message)
{
- /* Since sub->topic doesn't change, no need to lock sub */
- sub->callback(sub->data,
- sub,
- topic,
- message);
+ /* Notify that the final message has been received */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_rxed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
+
+ /* Since sub is mostly immutable, no need to lock sub */
+ sub->callback(sub->data, sub, topic, message);
+
+ /* Notify that the final message has been processed */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_processed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
sub->topic = topic;
sub->callback = callback;
sub->data = data;
+ ast_mutex_init(&sub->join_lock);
+ ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
return NULL;
}
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ /* Wait until the processed flag has been set */
+ while (!subscription->final_message_processed) {
+ ast_cond_wait(&subscription->join_cond,
+ &subscription->join_lock);
+ }
+ }
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ return subscription->final_message_rxed;
+ }
+
+ /* Null subscription is about as done as you can get */
+ return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription)
+{
+ if (!subscription) {
+ return NULL;
+ }
+
+ /* Bump refcount to hold it past the unsubscribe */
+ ao2_ref(subscription, +1);
+ stasis_unsubscribe(subscription);
+ stasis_subscription_join(subscription);
+ /* Now decrement the refcount back */
+ ao2_cleanup(subscription);
+ return NULL;
+}
+
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
{
if (sub) {
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ ast_assert(stasis_subscription_is_done(caching_topic->sub));
+ ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
caching_topic->cache = NULL;
{
if (caching_topic) {
if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ /* Increment the reference to hold on to it past the
+ * unsubscribe */
+ ao2_ref(caching_topic->sub, +1);
stasis_unsubscribe(caching_topic->sub);
} else {
ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
return NULL;
}
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
+{
+ if (!caching_topic) {
+ return NULL;
+ }
+
+ /* Hold a ref past the unsubscribe */
+ ao2_ref(caching_topic, +1);
+ stasis_caching_unsubscribe(caching_topic);
+ stasis_subscription_join(caching_topic->sub);
+ ao2_cleanup(caching_topic);
+ return NULL;
+}
+
struct cache_entry {
struct stasis_message_type *type;
char *id;
void ast_stasis_channels_shutdown(void)
{
+ channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
+ ao2_cleanup(channel_topic_all);
+ channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
- ao2_cleanup(channel_topic_all);
- channel_topic_all = NULL;
- channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
}
void ast_stasis_channels_init(void)
}
-static void endpoints_stasis_shutdown(void)
-{
- ao2_cleanup(endpoint_topic_all);
- endpoint_topic_all = NULL;
-
- stasis_caching_unsubscribe(endpoint_topic_all_cached);
- endpoint_topic_all_cached = NULL;
-}
-
struct ast_json *ast_endpoint_snapshot_to_json(
const struct ast_endpoint_snapshot *snapshot)
{
return ast_json_ref(json);
}
+static void endpoints_stasis_shutdown(void)
+{
+ stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
+ endpoint_topic_all_cached = NULL;
+
+ ao2_cleanup(endpoint_topic_all);
+ endpoint_topic_all = NULL;
+}
+
int ast_endpoint_stasis_init(void)
{
ast_register_atexit(endpoints_stasis_shutdown);
size_t i;
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
+ ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
for (i = 0; i < router->num_routes_current; ++i) {
ao2_cleanup(router->routes[i]);
stasis_unsubscribe(router->subscription);
}
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router)
+{
+ if (!router) {
+ return;
+ }
+ stasis_unsubscribe_and_join(router->subscription);
+}
+
+int stasis_message_router_is_done(struct stasis_message_router *router)
+{
+ if (!router) {
+ /* Null router is about as done as you can get */
+ return 1;
+ }
+
+ return stasis_subscription_is_done(router->subscription);
+}
+
+
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,
static int unload_module(void)
{
- stasis_unsubscribe(sub);
+ stasis_unsubscribe_and_join(sub);
sub = NULL;
- stasis_message_router_unsubscribe(router);
+ stasis_message_router_unsubscribe_and_join(router);
router = NULL;
return 0;
}
ast_unregister_application(app_ajileave);
ast_manager_unregister("JabberSend");
ast_custom_function_unregister(&jabberstatus_function);
- if (mwi_sub) {
- mwi_sub = stasis_unsubscribe(mwi_sub);
- }
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_custom_function_unregister(&jabberreceive_function);
ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
{
int r = 0;
- stasis_message_router_unsubscribe(channel_router);
+ stasis_message_router_unsubscribe_and_join(channel_router);
channel_router = NULL;
ao2_cleanup(apps_registry);