};
static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static int unload_module(void)
{
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
stasis_message_router_unsubscribe_and_join(agent_router);
- topic_forwarder = stasis_unsubscribe(topic_forwarder);
+ topic_forwarder = stasis_forward_cancel(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
+struct stasis_forward;
+
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
+
/*!
* \brief Get the unique ID for the subscription.
*
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_VECTOR_H
+#define _ASTERISK_VECTOR_H
+
+/*! \file
+ *
+ * \brief Vector container support.
+ *
+ * A vector is a variable length array, with properties that can be useful when
+ * order doesn't matter.
+ * - Appends are asymptotically constant time.
+ * - Unordered removes are constant time.
+ * - Search is linear time
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ */
+
+/*! \brief Define a vector structure */
+#define ast_vector(type) \
+ struct { \
+ type *elems; \
+ size_t max; \
+ size_t current; \
+ }
+
+/*!
+ * \brief Initialize a vector
+ *
+ * If \a size is 0, then no space will be allocated until the vector is
+ * appended to.
+ *
+ * \param vec Vector to initialize.
+ * \param size Initial size of the vector.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_init(vec, size) ({ \
+ size_t __size = (size); \
+ size_t alloc_size = __size * sizeof(*(vec).elems); \
+ (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \
+ (vec).current = 0; \
+ if ((vec).elems) { \
+ (vec).max = __size; \
+ } else { \
+ (vec).max = 0; \
+ } \
+ alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \
+})
+
+/*!
+ * \brief Deallocates this vector.
+ *
+ * If any code to free the elements of this vector need to be run, that should
+ * be done prior to this call.
+ *
+ * \param vec Vector to deallocate.
+ */
+#define ast_vector_free(vec) do { \
+ ast_free((vec).elems); \
+ (vec).elems = NULL; \
+ (vec).max = 0; \
+ (vec).current = 0; \
+} while (0)
+
+/*!
+ * \brief Append an element to a vector, growing the vector if needed.
+ *
+ * \param vec Vector to append to.
+ * \param elem Element to append.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_append(vec, elem) ({ \
+ int res = 0; \
+ \
+ if ((vec).current + 1 > (vec).max) { \
+ size_t new_max = (vec).max ? 2 * (vec).max : 1; \
+ typeof((vec).elems) new_elems = ast_realloc( \
+ (vec).elems, new_max * sizeof(*new_elems)); \
+ if (new_elems) { \
+ (vec).elems = new_elems; \
+ (vec).max = new_max; \
+ } else { \
+ res = -1; \
+ } \
+ } \
+ \
+ if (res == 0) { \
+ (vec).elems[(vec).current++] = (elem); \
+ } \
+ res; \
+})
+
+/*!
+ * \brief Remove an element from a vector by index.
+ *
+ * Note that elements in the vector may be reordered, so that the remove can
+ * happen in constant time.
+ *
+ * \param vec Vector to remove from.
+ * \param idx Index of the element to remove.
+ * \return The element that was removed.
+ */
+#define ast_vector_remove_unordered(vec, idx) ({ \
+ typeof((vec).elems[0]) res; \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ res = (vec).elems[__idx]; \
+ (vec).elems[__idx] = (vec).elems[--(vec).current]; \
+ res; \
+})
+
+
+/*!
+ * \brief Remove an element from a vector that matches the given comparison
+ *
+ * \param vec Vector to remove from.
+ * \param value Value to pass into comparator.
+ * \param cmp Comparator function/macros (called as \c cmp(elem, value))
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \
+ int res = -1; \
+ size_t idx; \
+ typeof(value) __value = (value); \
+ for (idx = 0; idx < (vec).current; ++idx) { \
+ if (cmp((vec).elems[idx], __value)) { \
+ ast_vector_remove_unordered((vec), idx); \
+ res = 0; \
+ break; \
+ } \
+ } \
+ res; \
+})
+
+/*! \brief Default comparator for ast_vector_remove_elem_unordered() */
+#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b))
+
+/*!
+ * \brief Remove an element from a vector.
+ *
+ * \param vec Vector to remove from.
+ * \param elem Element to remove
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_elem_unordered(vec, elem) ({ \
+ ast_vector_remove_cmp_unordered((vec), (elem), \
+ AST_VECTOR_DEFAULT_CMP); \
+})
+
+/*!
+ * \brief Get the number of elements in a vector.
+ *
+ * \param vec Vector to query.
+ * \return Number of elements in the vector.
+ */
+#define ast_vector_size(vec) (vec).current
+
+/*!
+ * \brief Get an element from a vector.
+ *
+ * \param vec Vector to query.
+ * \param idx Index of the element to get.
+ */
+#define ast_vector_get(vec, idx) ({ \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ (vec).elems[__idx]; \
+})
+
+#endif /* _ASTERISK_VECTOR_H */
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
static void cdr_engine_cleanup(void)
{
- channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
- bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
- parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+ channel_subscription = stasis_forward_cancel(channel_subscription);
+ bridge_subscription = stasis_forward_cancel(bridge_subscription);
+ parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
- cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
- cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
- cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
- cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+ cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+ cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+ cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+ cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
- struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
- struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
+ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
ast_string_field_free_memory(chan);
- chan->forwarder = stasis_unsubscribe(chan->forwarder);
- chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+ chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
- stasis_unsubscribe_and_join(rtp_topic_forwarder);
+ stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
static void manager_bridging_cleanup(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
static void manager_channels_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
static void manager_mwi_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
- struct stasis_subscription **subscribers;
- /*! Allocated length of the subscribers array */
- size_t num_subscribers_max;
- /*! Current size of the subscribers array */
- size_t num_subscribers_current;
+ ast_vector(struct stasis_subscription *) subscribers;
+
+ /*! Topics forwarding into this topic */
+ ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+ struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
- ast_assert(topic->num_subscribers_current == 0);
+ ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
- ast_free(topic->subscribers);
- topic->subscribers = NULL;
+
+ ast_vector_free(topic->subscribers);
+ ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
return NULL;
}
- topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
- topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
- if (!topic->subscribers) {
+ res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+ res |= ast_vector_init(topic->upstream_topics, 0);
+
+ if (res != 0) {
return NULL;
}
}
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
- send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+ send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
- if (sub) {
- size_t i;
- /* The subscription may be the last ref to this topic. Hold
- * the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
- ao2_cleanup);
- SCOPED_AO2LOCK(lock_topic, topic);
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic,
+ ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
- /* swap [i] with last entry; remove last entry */
- topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* Unsubscribing unrefs the subscription */
- ao2_cleanup(sub);
- return NULL;
- }
- }
+ if (!sub) {
+ return NULL;
+ }
- ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ /* We have to remove the subscription first, to ensure the unsubscribe
+ * is the final message */
+ if (topic_remove_subscription(sub->topic, sub) != 0) {
+ ast_log(LOG_ERROR,
+ "Internal error: subscription has invalid topic\n");
+ return NULL;
}
+
+ /* Now let everyone know about the unsubscribe */
+ send_subscription_unsubscribe(topic, sub);
+
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
return NULL;
}
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- struct stasis_subscription **subscribers;
+ size_t idx;
SCOPED_AO2LOCK(lock, topic);
- /* Increase list size, if needed */
- if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
- subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
- if (!subscribers) {
- return -1;
- }
- topic->subscribers = subscribers;
- topic->num_subscribers_max *= 2;
- }
-
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
- topic->subscribers[topic->num_subscribers_current++] = sub;
+ ast_vector_append(topic->subscribers, sub);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_add_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
return 0;
}
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ size_t idx;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_remove_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
+ return ast_vector_remove_elem_unordered(topic->subscribers, sub);
+}
+
/*!
* \internal
* \brief Information needed to dispatch a message to a subscription
return 0;
}
+static void dispatch_message(struct stasis_subscription *sub,
+ struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ if (sub->mailbox) {
+ struct dispatch *dispatch;
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ return;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+ /* Push failed; just delete the dispatch.
+ */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ dispatch_dtor(dispatch);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, publisher_topic, message);
+ }
+}
+
void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
size_t i;
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- struct stasis_subscription *sub = topic->subscribers[i];
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- struct dispatch *dispatch;
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_ERROR, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
- /* Push failed; just delete the dispatch.
- */
- ast_log(LOG_ERROR, "Dropping dispatch\n");
- dispatch_dtor(dispatch);
- }
- } else {
- /* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
- }
+ dispatch_message(sub, publisher_topic, message);
}
}
stasis_forward_message(topic, topic, message);
}
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \brief Forwarding information
+ *
+ * Any message posted to \a from_topic is forwarded to \a to_topic.
+ *
+ * In cases where both the \a from_topic and \a to_topic need to be locked,
+ * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
+ */
+struct stasis_forward {
+ /*! Originating topic */
+ struct stasis_topic *from_topic;
+ /*! Destination topic */
+ struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
{
- struct stasis_topic *to_topic = data;
- stasis_forward_message(to_topic, topic, message);
+ struct stasis_forward *forward = obj;
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(to_topic);
+ ao2_cleanup(forward->from_topic);
+ forward->from_topic = NULL;
+ ao2_cleanup(forward->to_topic);
+ forward->to_topic = NULL;
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+ if (forward) {
+ int idx;
+
+ struct stasis_topic *from = forward->from_topic;
+ struct stasis_topic *to = forward->to_topic;
+
+ SCOPED_AO2LOCK(to_lock, to);
+
+ ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+ ao2_lock(from);
+ for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+ topic_remove_subscription(
+ from, ast_vector_get(to->subscribers, idx));
+ }
+ ao2_unlock(from);
}
+
+ ao2_cleanup(forward);
+
+ return NULL;
}
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic)
{
- struct stasis_subscription *sub;
+ RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
if (!from_topic || !to_topic) {
return NULL;
}
- /* Forwarding subscriptions should dispatch directly instead of having a
- * mailbox. Otherwise, messages forwarded to the same topic from
- * different topics may get reordered. Which is bad.
- */
- sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
- if (sub) {
- /* hold a ref to to_topic for this forwarding subscription */
- ao2_ref(to_topic, +1);
+ forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ if (!forward) {
+ return NULL;
}
- return sub;
+
+ forward->from_topic = ao2_bump(from_topic);
+ forward->to_topic = ao2_bump(to_topic);
+
+ {
+ SCOPED_AO2LOCK(lock, to_topic);
+ int res;
+
+ res = ast_vector_append(to_topic->upstream_topics, from_topic);
+ if (res != 0) {
+ return NULL;
+ }
+
+ {
+ SCOPED_AO2LOCK(lock, from_topic);
+ size_t idx;
+ for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+ topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+ }
+ }
+ }
+
+ return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
ao2_cleanup(change->topic);
}
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
return change;
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- change = subscription_change_alloc(topic, uniqueid, description);
+ /* This assumes that we have already unsubscribed */
+ ast_assert(stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
stasis_publish(topic, msg);
}
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+ struct stasis_subscription *sub)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ /* This assumes that we have already unsubscribed */
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+
+ /* Now we have to dispatch to the subscription itself */
+ dispatch_message(sub, topic, msg);
+}
+
struct topic_pool_entry {
- struct stasis_subscription *forward;
+ struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
- entry->forward = stasis_unsubscribe(entry->forward);
+ entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
- struct stasis_subscription *forward_all_to_cached;
+ struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward_topic_to_all;
- struct stasis_subscription *forward_cached_to_all;
+ struct stasis_forward *forward_topic_to_all;
+ struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
- stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
return;
}
- stasis_unsubscribe(one->forward_topic_to_all);
+ stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
- stasis_unsubscribe(one->forward_cached_to_all);
+ stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
int interested;
/*! Forward for the regular topic */
- struct stasis_subscription *topic_forward;
+ struct stasis_forward *topic_forward;
/*! Forward for the caching topic */
- struct stasis_subscription *topic_cached_forward;
+ struct stasis_forward *topic_cached_forward;
/*! Unique id of the object being forwarded */
char id[];
static void forwards_unsubscribe(struct app_forwards *forwards)
{
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
- stasis_unsubscribe(forwards->topic_cached_forward);
+ stasis_forward_cancel(forwards->topic_cached_forward);
forwards->topic_cached_forward = NULL;
}
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
- RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
type = stasis_message_type(msg);
ast_test_validate(test, ast_channel_snapshot_type() == type);
+ /* The ordering of the cache clear and endpoint snapshot are
+ * unspecified */
msg = sink->messages[3];
- type = stasis_message_type(msg);
- ast_test_validate(test, stasis_cache_clear_type() == type);
+ if (stasis_message_type(msg) == stasis_cache_clear_type()) {
+ /* Okay; the next message should be the endpoint snapshot */
+ msg = sink->messages[4];
+ }
- msg = sink->messages[4];
type = stasis_message_type(msg);
ast_test_validate(test, ast_endpoint_snapshot_type() == type);
actual_snapshot = stasis_message_data(msg);