]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Fix shutdown assertions in stasis-core
authorDavid M. Lee <dlee@digium.com>
Fri, 17 May 2013 21:10:32 +0000 (21:10 +0000)
committerDavid M. Lee <dlee@digium.com>
Fri, 17 May 2013 21:10:32 +0000 (21:10 +0000)
In r388005, macros were introduced to consistently define message
types. This added an assert if a message type was used either before
it was initialized or after it had been cleaned up. It turns out that
this assertion fires during shutdown.

This actually exposed a hidden shutdown ordering problem. Since
unsubscribing is asynchronous, it's possible that the message types
used by the subscription could be freed before the final message of
the subscription was processed.

This patch adds stasis_subscription_join(), which blocks until the
last message has been processed by the subscription. Since joining was
most commonly done right after an unsubscribe, a
stasis_unsubscribe_and_join() convenience function was also added.

Similar functions were also added to the stasis_caching_topic and
stasis_message_router, since they wrap subscriptions and have similar
problems.

Other code in trunk was refactored to join() where appropriate, or at
least verify that the subscription was complete before being
destroyed.

Review: https://reviewboard.asterisk.org/r/2540

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@389011 65c4cc65-6c06-0410-ace0-fbb531ad65f3

21 files changed:
apps/app_queue.c
apps/app_voicemail.c
channels/chan_iax2.c
channels/chan_sip.c
funcs/func_presencestate.c
include/asterisk/stasis.h
include/asterisk/stasis_message_router.h
main/app.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/manager_channels.c
main/pbx.c
main/stasis.c
main/stasis_cache.c
main/stasis_channels.c
main/stasis_endpoints.c
main/stasis_message_router.c
res/res_chan_stats.c
res/res_jabber.c
res/res_stasis.c

index 3455cc748b744ea42b3bc7c88d6bf8f73a63b927..c63cd071ed0191c55ed99d5d95aa19d4766438c9 100644 (file)
@@ -9866,9 +9866,7 @@ static int unload_module(void)
 
        res |= ast_data_unregister(NULL);
 
-       if (device_state_sub) {
-               device_state_sub = stasis_unsubscribe(device_state_sub);
-       }
+       device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
        ast_extension_state_del(0, extension_state_cb);
 
index 13bcd3ea57dbe881e93e26bc66d63359fd0f224e..b3ceeebc9c9c74bd311a8dd044b85cb14772214e 100644 (file)
@@ -12689,9 +12689,7 @@ static void stop_poll_thread(void)
 {
        poll_thread_run = 0;
 
-       if (mwi_sub_sub) {
-               mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
-       }
+       mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
 
        ast_mutex_lock(&poll_lock);
        ast_cond_signal(&poll_cond);
index eeffb6696eb2a4e4bcf3571bf45b9eb76d464264..112a99375366b9e2bdf954437a610d3c0a5a46ce 100644 (file)
@@ -1334,9 +1334,7 @@ static void network_change_stasis_subscribe(void)
 
 static void network_change_stasis_unsubscribe(void)
 {
-       if (network_change_sub) {
-               network_change_sub = stasis_unsubscribe(network_change_sub);
-       }
+       network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
@@ -1349,9 +1347,7 @@ static void acl_change_stasis_subscribe(void)
 
 static void acl_change_stasis_unsubscribe(void)
 {
-       if (acl_change_sub) {
-               acl_change_sub = stasis_unsubscribe(acl_change_sub);
-       }
+       acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)
@@ -12424,9 +12420,7 @@ static void peer_destructor(void *obj)
        if (peer->dnsmgr)
                ast_dnsmgr_release(peer->dnsmgr);
 
-       if (peer->mwi_event_sub) {
-               peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
-       }
+       peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
 
        ast_string_field_free_memory(peer);
 }
index 937acb94ebcf55cbe3badc5635422222f74b5bd0..88965fc73a94f01de15f60c534d55ce3023973f4 100644 (file)
@@ -16742,25 +16742,21 @@ static void network_change_stasis_subscribe(void)
 
 static void network_change_stasis_unsubscribe(void)
 {
-       if (network_change_sub) {
-               network_change_sub = stasis_unsubscribe(network_change_sub);
-       }
+       network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
 {
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
-               acl_change_stasis_cb, NULL);
+                       acl_change_stasis_cb, NULL);
        }
 
 }
 
 static void acl_change_event_stasis_unsubscribe(void)
 {
-       if (acl_change_sub) {
-               acl_change_sub = stasis_unsubscribe(acl_change_sub);
-       }
+       acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)
index 01e6d09c2c03f06c59524178b6412b453f5c2798..3bf4a81b31370b38f845b3ef36443b5db24ca9f2 100644 (file)
@@ -706,7 +706,7 @@ AST_TEST_DEFINE(test_presence_state_change)
                return AST_TEST_FAIL;
        }
 
-       test_sub = stasis_unsubscribe(test_sub);
+       test_sub = stasis_unsubscribe_and_join(test_sub);
 
        ao2_cleanup(cb_data->presence_state);
        ast_free((char *)cb_data);
index 7e46dbf412436025ae793e34d81953092aaa5564..e6ea6fa1306ce87bc4c3dddb78ec561d62a63378 100644 (file)
  * stasis_subscription. Due to cyclic references, the \ref
  * stasis_subscription will not be freed until after it has been unsubscribed,
  * and all other ao2_ref()'s have been cleaned up.
+ *
+ * \par Shutdown
+ *
+ * Subscriptions have two options for unsubscribing, depending upon the context
+ * in which you need to unsubscribe.
+ *
+ * If your subscription is owned by a module, and you must unsubscribe from the
+ * module_unload() function, then you'll want to use the
+ * stasis_unsubscribe_and_join() function. This will block until the final
+ * message has been received on the subscription. Otherwise, there's the danger
+ * of invoking the callback function after it has been unloaded.
+ *
+ * If your subscription is owned by an object, then your object should have an
+ * explicit shutdown() function, which calls stasis_unsubscribe(). In your
+ * subscription handler, when the stasis_subscription_final_message() has been
+ * received, decrement the refcount on your object. In your object's destructor,
+ * you may assert that stasis_subscription_is_done() to validate that the
+ * subscription's callback will no longer be invoked.
+ *
+ * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
+ * an object's destructor. While code that does this may work most of the time,
+ * it's got one big downside. There's a general assumption that object
+ * destruction is non-blocking. If you block the destruction waiting for the
+ * subscription to complete, there's the danger that the subscription may
+ * process a message which will bump the refcount up by one. Then it does
+ * whatever it does, decrements the refcount, which then proceeds to re-destroy
+ * the object. Now you've got hard to reproduce bugs that only show up under
+ * certain loads.
  */
 
 #include "asterisk/utils.h"
@@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
  * \since 12
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
-                                            stasis_subscription_cb callback,
-                                            void *data);
+       stasis_subscription_cb callback, void *data);
 
 /*!
  * \brief Cancel a subscription.
@@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
  * delivery of the final message.
  *
  * \param subscription Subscription to cancel.
- * \retval NULL for convenience
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+       struct stasis_subscription *subscription);
+
+/*!
+ * \brief Block until the last message is processed on a subscription.
+ *
+ * This function will not return until the \a subscription's callback for the
+ * stasis_subscription_final_message() completes. This allows cleanup routines
+ * to run before unblocking the joining thread.
+ *
+ * \param subscription Subscription to block on.
+ * \since 12
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Returns whether \a subscription has received its final message.
+ *
+ * Note that a subscription is considered done even while the
+ * stasis_subscription_final_message() is being processed. This allows cleanup
+ * routines to check the status of the subscription.
+ *
+ * \param subscription Subscription.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ *         received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_subscription_is_done(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait for
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
  * \since 12
  */
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+struct stasis_subscription *stasis_unsubscribe_and_join(
+       struct stasis_subscription *subscription);
 
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
@@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+       struct stasis_topic *to_topic);
 
 /*!
  * \brief Get the unique ID for the subscription.
@@ -389,7 +459,8 @@ struct stasis_topic_pool;
 /*!
  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
  * \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
  */
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
 
@@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t
  * \brief Find or create a topic in the pool
  * \param pool Pool for which to get the topic
  * \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
  */
 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
 
@@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message);
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
 
 /*!
- * Unsubscribes a caching topic from its upstream topic.
+ * \brief Unsubscribes a caching topic from its upstream topic.
+ *
+ * This function returns immediately, so be sure to cleanup when
+ * stasis_subscription_final_message() is received.
+ *
+ * \param caching_topic Caching topic to unsubscribe
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe(
+       struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Unsubscribes a caching topic from its upstream topic, blocking until
+ * all messages have been forwarded.
+ *
+ * See stasis_unsubscriben_and_join() for more info on when to use this as
+ * opposed to stasis_caching_unsubscribe().
+ *
  * \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
+ * \return \c NULL for convenience
  * \since 12
  */
-struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
+       struct stasis_caching_topic *caching_topic);
 
 /*!
  * \brief Returns the topic of cached events from a caching topics.
@@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
 /*!
  * \brief Dump cached items to a subscription
  * \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
  * \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
  * \since 12
  */
 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
index 42770d293eaf7dbf77221cbaf7e47a08017986d5..e7d5a4cc65f252a3cb5788d8e6fd7ff2f4a405e7 100644 (file)
@@ -57,11 +57,35 @@ struct stasis_message_router *stasis_message_router_create(
 
 /*!
  * \brief Unsubscribe the router from the upstream topic.
+ *
  * \param router Router to unsubscribe.
  * \since 12
  */
 void stasis_message_router_unsubscribe(struct stasis_message_router *router);
 
+/*!
+ * \brief Unsubscribe the router from the upstream topic, blocking until the
+ * final message has been processed.
+ *
+ * See stasis_unsubscribe_and_join() for info on when to use this
+ * vs. stasis_message_router_unsubscribe().
+ *
+ * \param router Router to unsubscribe.
+ * \since 12
+ */
+void stasis_message_router_unsubscribe_and_join(
+       struct stasis_message_router *router);
+
+/*!
+ * \brief Returns whether \a router has received its final message.
+ *
+ * \param router Router.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ *         received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_message_router_is_done(struct stasis_message_router *router);
+
 /*!
  * \brief Add a route to a message router.
  * \param router Router to add the route to.
index 0cdd9d31de8e2f79dac12146ae769a5be76bd502..0e8a68f7baa012e93376554c7d8e8f5104dd92e1 100644 (file)
@@ -2727,7 +2727,7 @@ static void app_exit(void)
 {
        ao2_cleanup(mwi_topic_all);
        mwi_topic_all = NULL;
-       mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+       mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
        STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
        ao2_cleanup(mwi_topic_pool);
        mwi_topic_pool = NULL;
index aa31dbfd6f80ac260296a57b9f02d909431dd3a7..afa9621d31e8048882d4e8939f79e60c9b3b6266 100644 (file)
@@ -776,7 +776,7 @@ static void devstate_exit(void)
 {
        ao2_cleanup(device_state_topic_all);
        device_state_topic_all = NULL;
-       device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+       device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
        ao2_cleanup(device_state_topic_pool);
        device_state_topic_pool = NULL;
index 95397f9602ae4b6ba9c4ed215c75dfb135bf52ba..c2d0577f977613bf21313cd64e561a5220b5c7ac 100644 (file)
@@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj)
        struct ast_endpoint *endpoint = obj;
 
        /* The router should be shut down already */
-       ast_assert(endpoint->router == NULL);
+       ast_assert(stasis_message_router_is_done(endpoint->router));
+       ao2_cleanup(endpoint->router);
+       endpoint->router = NULL;
 
        stasis_unsubscribe(endpoint->forward);
        endpoint->forward = NULL;
@@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
                stasis_publish(endpoint->topic, message);
        }
 
+       /* Bump refcount to hold on to the router */
+       ao2_ref(endpoint->router, +1);
        stasis_message_router_unsubscribe(endpoint->router);
-       endpoint->router = NULL;
 }
 
 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
index 4d2923eb5dc78a692c36126afafe8cea40e56a44..c9b2fbe1e1a74c9a2190f591d7e741586a5e9bf8 100644 (file)
@@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void)
 
 static void acl_change_stasis_unsubscribe(void)
 {
-       if (acl_change_sub) {
-               acl_change_sub = stasis_unsubscribe(acl_change_sub);
-       }
+       acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 /* In order to understand what the heck is going on with the
index 63380a762c1a8e746240956d12d185d9db26423f..e1f918868913b4636994009228b520e04c91c361 100644 (file)
@@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
 
 static void manager_channels_shutdown(void)
 {
-       stasis_message_router_unsubscribe(channel_state_router);
+       stasis_message_router_unsubscribe_and_join(channel_state_router);
        channel_state_router = NULL;
 }
 
index 74aeee928aaf05475b752bf250be66d9a61222f9..9492d955947c8b5713a0ed07d28457efad065c45 100644 (file)
@@ -11700,12 +11700,8 @@ static void unload_pbx(void)
 {
        int x;
 
-       if (presence_state_sub) {
-               presence_state_sub = stasis_unsubscribe(presence_state_sub);
-       }
-       if (device_state_sub) {
-               device_state_sub = stasis_unsubscribe(device_state_sub);
-       }
+       presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+       device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
        /* Unregister builtin applications */
        for (x = 0; x < ARRAY_LEN(builtins); x++) {
index 98dff95a6b35a42937e285adcd830f46a9a61ffb..d0ded401c6619755c3f8173218d5369f982e3926 100644 (file)
@@ -114,16 +114,30 @@ struct stasis_subscription {
        stasis_subscription_cb callback;
        /*! Data pointer to be handed to the callback. */
        void *data;
+
+       /*! Lock for joining with subscription. */
+       ast_mutex_t join_lock;
+       /*! Condition for joining with subscription. */
+       ast_cond_t join_cond;
+       /*! Flag set when final message for sub has been received.
+        *  Be sure join_lock is held before reading/setting. */
+       int final_message_rxed;
+       /*! Flag set when final message for sub has been processed.
+        *  Be sure join_lock is held before reading/setting. */
+       int final_message_processed;
 };
 
 static void subscription_dtor(void *obj)
 {
        struct stasis_subscription *sub = obj;
        ast_assert(!stasis_subscription_is_subscribed(sub));
+       ast_assert(stasis_subscription_is_done(sub));
        ao2_cleanup(sub->topic);
        sub->topic = NULL;
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
+       ast_mutex_destroy(&sub->join_lock);
+       ast_cond_destroy(&sub->join_cond);
 }
 
 /*!
@@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub,
                                  struct stasis_topic *topic,
                                  struct stasis_message *message)
 {
-       /* Since sub->topic doesn't change, no need to lock sub */
-       sub->callback(sub->data,
-                     sub,
-                     topic,
-                     message);
+       /* Notify that the final message has been received */
+       if (stasis_subscription_final_message(sub, message)) {
+               SCOPED_MUTEX(lock, &sub->join_lock);
+               sub->final_message_rxed = 1;
+               ast_cond_signal(&sub->join_cond);
+       }
+
+       /* Since sub is mostly immutable, no need to lock sub */
+       sub->callback(sub->data, sub, topic, message);
+
+       /* Notify that the final message has been processed */
+       if (stasis_subscription_final_message(sub, message)) {
+               SCOPED_MUTEX(lock, &sub->join_lock);
+               sub->final_message_processed = 1;
+               ast_cond_signal(&sub->join_cond);
+       }
 }
 
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe(
        sub->topic = topic;
        sub->callback = callback;
        sub->data = data;
+       ast_mutex_init(&sub->join_lock);
+       ast_cond_init(&sub->join_cond, NULL);
 
        if (topic_add_subscription(topic, sub) != 0) {
                return NULL;
@@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
        return NULL;
 }
 
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+       if (subscription) {
+               SCOPED_MUTEX(lock, &subscription->join_lock);
+               /* Wait until the processed flag has been set */
+               while (!subscription->final_message_processed) {
+                       ast_cond_wait(&subscription->join_cond,
+                               &subscription->join_lock);
+               }
+       }
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+       if (subscription) {
+               SCOPED_MUTEX(lock, &subscription->join_lock);
+               return subscription->final_message_rxed;
+       }
+
+       /* Null subscription is about as done as you can get */
+       return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+       struct stasis_subscription *subscription)
+{
+       if (!subscription) {
+               return NULL;
+       }
+
+       /* Bump refcount to hold it past the unsubscribe */
+       ao2_ref(subscription, +1);
+       stasis_unsubscribe(subscription);
+       stasis_subscription_join(subscription);
+       /* Now decrement the refcount back */
+       ao2_cleanup(subscription);
+       return NULL;
+}
+
 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
 {
        if (sub) {
index 75e6b5f956e53841ccf37c453f94885e7e19f6f0..154b4f020a93f04d556fd8f9fecdfa5fb7350492 100644 (file)
@@ -53,6 +53,8 @@ struct stasis_caching_topic {
 static void stasis_caching_topic_dtor(void *obj) {
        struct stasis_caching_topic *caching_topic = obj;
        ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+       ast_assert(stasis_subscription_is_done(caching_topic->sub));
+       ao2_cleanup(caching_topic->sub);
        caching_topic->sub = NULL;
        ao2_cleanup(caching_topic->cache);
        caching_topic->cache = NULL;
@@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
 {
        if (caching_topic) {
                if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+                       /* Increment the reference to hold on to it past the
+                        * unsubscribe */
+                       ao2_ref(caching_topic->sub, +1);
                        stasis_unsubscribe(caching_topic->sub);
                } else {
                        ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
@@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
        return NULL;
 }
 
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
+{
+       if (!caching_topic) {
+               return NULL;
+       }
+
+       /* Hold a ref past the unsubscribe */
+       ao2_ref(caching_topic, +1);
+       stasis_caching_unsubscribe(caching_topic);
+       stasis_subscription_join(caching_topic->sub);
+       ao2_cleanup(caching_topic);
+       return NULL;
+}
+
 struct cache_entry {
        struct stasis_message_type *type;
        char *id;
index 3df52d0b3702235a634939731bf434a0fe1d234b..95f5f9d0e95ad432bc8877b37c15c98437216741 100644 (file)
@@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal(
 
 void ast_stasis_channels_shutdown(void)
 {
+       channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
+       ao2_cleanup(channel_topic_all);
+       channel_topic_all = NULL;
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void)
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
-       ao2_cleanup(channel_topic_all);
-       channel_topic_all = NULL;
-       channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
 }
 
 void ast_stasis_channels_init(void)
index 7428e2cf1f01c9c9a9e9266e79fb02d9ddafa2a2..252614f629aadf7afb9d68d64202f8481c94f5bb 100644 (file)
@@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message)
 }
 
 
-static void endpoints_stasis_shutdown(void)
-{
-       ao2_cleanup(endpoint_topic_all);
-       endpoint_topic_all = NULL;
-
-       stasis_caching_unsubscribe(endpoint_topic_all_cached);
-       endpoint_topic_all_cached = NULL;
-}
-
 struct ast_json *ast_endpoint_snapshot_to_json(
        const struct ast_endpoint_snapshot *snapshot)
 {
@@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json(
        return ast_json_ref(json);
 }
 
+static void endpoints_stasis_shutdown(void)
+{
+       stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
+       endpoint_topic_all_cached = NULL;
+
+       ao2_cleanup(endpoint_topic_all);
+       endpoint_topic_all = NULL;
+}
+
 int ast_endpoint_stasis_init(void)
 {
        ast_register_atexit(endpoints_stasis_shutdown);
index 97ed7ad92c343bf37c27707958f7ca3b7f2b3621..c7acca1ffd795f0f58e7b85a3738ddea112c0321 100644 (file)
@@ -74,6 +74,7 @@ static void router_dtor(void *obj)
        size_t i;
 
        ast_assert(!stasis_subscription_is_subscribed(router->subscription));
+       ast_assert(stasis_subscription_is_done(router->subscription));
        router->subscription = NULL;
        for (i = 0; i < router->num_routes_current; ++i) {
                ao2_cleanup(router->routes[i]);
@@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router)
        stasis_unsubscribe(router->subscription);
 }
 
+void stasis_message_router_unsubscribe_and_join(
+       struct stasis_message_router *router)
+{
+       if (!router) {
+               return;
+       }
+       stasis_unsubscribe_and_join(router->subscription);
+}
+
+int stasis_message_router_is_done(struct stasis_message_router *router)
+{
+       if (!router) {
+               /* Null router is about as done as you can get */
+               return 1;
+       }
+
+       return stasis_subscription_is_done(router->subscription);
+}
+
+
 static struct stasis_message_route *route_create(
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback,
index f5f2267aa52e38c3c1f610a8a5568fa3c67224ce..d54ba2f0b3bf002f771b8cd73c9127eab4d692ce 100644 (file)
@@ -172,9 +172,9 @@ static int load_module(void)
 
 static int unload_module(void)
 {
-       stasis_unsubscribe(sub);
+       stasis_unsubscribe_and_join(sub);
        sub = NULL;
-       stasis_message_router_unsubscribe(router);
+       stasis_message_router_unsubscribe_and_join(router);
        router = NULL;
        return 0;
 }
index 69f3665228418103df23519b57c88c1933fa2a33..2070c8024c8f2aafe6015e3120a560da914dc3e0 100644 (file)
@@ -4770,12 +4770,8 @@ static int unload_module(void)
        ast_unregister_application(app_ajileave);
        ast_manager_unregister("JabberSend");
        ast_custom_function_unregister(&jabberstatus_function);
-       if (mwi_sub) {
-               mwi_sub = stasis_unsubscribe(mwi_sub);
-       }
-       if (device_state_sub) {
-               device_state_sub = stasis_unsubscribe(device_state_sub);
-       }
+       mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
+       device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
        ast_custom_function_unregister(&jabberreceive_function);
 
        ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
index 5327ca4acf65c4e5228460c9b53acd3cc1d5c79f..e9931a09a71110bebae761b184387e5d739b08a8 100644 (file)
@@ -722,7 +722,7 @@ static int unload_module(void)
 {
        int r = 0;
 
-       stasis_message_router_unsubscribe(channel_router);
+       stasis_message_router_unsubscribe_and_join(channel_router);
        channel_router = NULL;
 
        ao2_cleanup(apps_registry);