#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+/*!
+ * \page stasis-impl Stasis Implementation Notes
+ *
+ * \par Reference counting
+ *
+ * Stasis introduces a number of objects, which are tightly related to one
+ * another. Because we rely on ref-counting for memory management, understanding
+ * these relationships is important to understanding this code.
+ *
+ * \code{.txt}
+ *
+ * stasis_topic <----> stasis_subscription
+ * ^ ^
+ * \ /
+ * \ /
+ * dispatch
+ * |
+ * |
+ * v
+ * stasis_message
+ * |
+ * |
+ * v
+ * stasis_message_type
+ *
+ * \endcode
+ *
+ * The most troubling thing in this chart is the cyclic reference between
+ * stasis_topic and stasis_subscription. This is both unfortunate, and
+ * necessary. Topics need the subscription in order to dispatch messages;
+ * subscriptions need the topics to unsubscribe and check subscription status.
+ *
+ * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
+ * topic's reference to a subscription. When the subcription is destroyed, it
+ * will remove its reference to the topic.
+ *
+ * This means that until a subscription has be explicitly unsubscribed, it will
+ * not be destroyed. Neither will a topic be destroyed while it has subscribers.
+ * The destructors of both have assertions regarding this to catch ref-counting
+ * problems where a subscription or topic has had an extra ao2_cleanup().
+ *
+ * The \ref dispatch object is a transient object, which is posted to a
+ * subscription's taskprocessor to send a message to the subscriber. They have
+ * short life cycles, allocated on one thread, destroyed on another.
+ *
+ * During shutdown, or the deletion of a domain object, there are a flurry of
+ * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
+ * are processed. Any one of these cleanups could be the one to actually destroy
+ * a given object, so care must be taken to ensure that an object isn't
+ * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
+ * that might happen when a RAII_VAR() goes out of scope.
+ *
+ * \par Typical life cycles
+ *
+ * \li stasis_topic - There are several topics which live for the duration of
+ * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
+ * are actually fed by shorter-lived topics whose lifetime is associated
+ * with some domain object (like ast_channel_topic() for a given
+ * ast_channel).
+ *
+ * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
+ * topics, for similar reasons.
+ *
+ * \li dispatch - Very short lived; just long enough to post a message to a
+ * subscriber.
+ *
+ * \li stasis_message - Short to intermediate lifetimes, but that is mostly
+ * irrelevant. Messages are strictly data and have no behavior associated
+ * with them, so it doesn't really matter if/when they are destroyed. By
+ * design, a component could hold a ref to a message forever without any
+ * ill consequences (aside from consuming more memory).
+ *
+ * \li stasis_message_type - Long life cycles, typically only destroyed on
+ * module unloading or _clean_ process exit.
+ *
+ * \par Subscriber shutdown sequencing
+ *
+ * Subscribers are sensitive to shutdown sequencing, specifically in how the
+ * reference message types. This is fully detailed on the wiki at
+ * https://wiki.asterisk.org/wiki/x/K4BqAQ.
+ *
+ * In short, the lifetime of the \a data (and \a callback, if in a module) must
+ * be held until the stasis_subscription_final_message() has been received.
+ * Depending on the structure of the subscriber code, this can be handled by
+ * using stasis_subscription_final_message() to free resources on the final
+ * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
+ * block until the unsubscribe has completed.
+ */
+
/*! Initial size of the subscribers list. */
#define INITIAL_SUBSCRIBERS_MAX 4
/*! \internal */
struct stasis_topic {
char *name;
- /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+ /*! Variable length array of the subscribers */
struct stasis_subscription **subscribers;
/*! Allocated length of the subscribers array */
size_t num_subscribers_max;
static void topic_dtor(void *obj)
{
struct stasis_topic *topic = obj;
+
+ /* Subscribers hold a reference to topics, so they should all be
+ * unsubscribed before we get here. */
+ ast_assert(topic->num_subscribers_current == 0);
ast_free(topic->name);
topic->name = NULL;
ast_free(topic->subscribers);
/*! Data pointer to be handed to the callback. */
void *data;
- /*! Lock for joining with subscription. */
+ /*! Lock for completion flags \c final_message_{rxed,processed}. */
ast_mutex_t join_lock;
/*! Condition for joining with subscription. */
ast_cond_t join_cond;
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
+
+ /* Subscriptions need to be manually unsubscribed before destruction
+ * b/c there's a cyclic reference between topics and subscriptions */
ast_assert(!stasis_subscription_is_subscribed(sub));
+ /* If there are any messages in flight to this subscription; that would
+ * be bad. */
ast_assert(stasis_subscription_is_done(sub));
+
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
{
if (sub) {
size_t i;
- struct stasis_topic *topic = sub->topic;
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
+ ao2_cleanup);
SCOPED_AO2LOCK(lock_topic, topic);
for (i = 0; i < topic->num_subscribers_current; ++i) {
return NULL;
}
-/*!
- * \brief Block until the final message has been received on a subscription.
- *
- * \param subscription Subscription to wait on.
- */
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
topic->num_subscribers_max *= 2;
}
- /* Don't ref sub here or we'll cause a reference cycle. */
+ /* The reference from the topic to the subscription is shared with
+ * the owner of the subscription, which will explicitly unsubscribe
+ * to release it.
+ *
+ * If we bumped the refcount here, the owner would have to unsubscribe
+ * and cleanup, which is a bit awkward. */
topic->subscribers[topic->num_subscribers_current++] = sub;
return 0;
}
return 0;
}
-void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
size_t i;
+ /* The topic may be unref'ed by the subscription invocation.
+ * Make sure we hold onto a reference while dispatching. */
+ RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
+ ao2_cleanup);
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
-/*! \brief Cleanup function */
+/*! \brief Shutdown function */
static void stasis_exit(void)
{
ast_threadpool_shutdown(pool);
return AST_TEST_PASS;
}
+static void noop(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ /* no-op */
+}
+
+AST_TEST_DEFINE(dtor_order)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test that destruction order doesn't bomb stuff";
+ info->description = "Test that destruction order doesn't bomb stuff";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("test-topic");
+ ast_test_validate(test, NULL != topic);
+
+ sub = stasis_subscribe(topic, noop, NULL);
+ ast_test_validate(test, NULL != sub);
+
+ /* With any luck, this won't completely blow everything up */
+ ao2_cleanup(topic);
+ stasis_unsubscribe(sub);
+
+ /* These refs were cleaned up manually */
+ topic = NULL;
+ sub = NULL;
+
+ return AST_TEST_PASS;
+}
+
+static const char *noop_get_id(struct stasis_message *message)
+{
+ return NULL;
+}
+
+AST_TEST_DEFINE(caching_dtor_order)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
+ stasis_caching_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test that destruction order doesn't bomb stuff";
+ info->description = "Test that destruction order doesn't bomb stuff";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ cache = stasis_cache_create(noop_get_id);
+ ast_test_validate(test, NULL != cache);
+
+ topic = stasis_topic_create("test-topic");
+ ast_test_validate(test, NULL != topic);
+
+ caching_topic = stasis_caching_topic_create(topic, cache);
+ ast_test_validate(test, NULL != caching_topic);
+
+ sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
+ NULL);
+ ast_test_validate(test, NULL != sub);
+
+ /* With any luck, this won't completely blow everything up */
+ ao2_cleanup(cache);
+ ao2_cleanup(topic);
+ stasis_caching_unsubscribe(caching_topic);
+ stasis_unsubscribe(sub);
+
+ /* These refs were cleaned up manually */
+ cache = NULL;
+ topic = NULL;
+ caching_topic = NULL;
+ sub = NULL;
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(to_json);
AST_TEST_UNREGISTER(no_to_ami);
AST_TEST_UNREGISTER(to_ami);
+ AST_TEST_UNREGISTER(dtor_order);
+ AST_TEST_UNREGISTER(caching_dtor_order);
return 0;
}
AST_TEST_REGISTER(to_json);
AST_TEST_REGISTER(no_to_ami);
AST_TEST_REGISTER(to_ami);
+ AST_TEST_REGISTER(dtor_order);
+ AST_TEST_REGISTER(caching_dtor_order);
return AST_MODULE_LOAD_SUCCESS;
}