]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
stasis: Improve topic/subscription names and statistics.
authorJoshua Colp <jcolp@digium.com>
Thu, 7 Mar 2019 12:28:31 +0000 (08:28 -0400)
committerJoshua Colp <jcolp@digium.com>
Mon, 11 Mar 2019 14:39:35 +0000 (11:39 -0300)
Topic names now follow: <subsystem>:<functionality>[/<object>]

This ensures that they are all unique, and also provides better
insight in to what each topic is for.

Subscriber ids now also use the main topic name they are
subscribed to and an incrementing integer as their identifier to
make it easier to understand what the subscription is primarily
responsible for.

Both the CLI commands for listing topic and subscription statistics
now sort to make it a bit easier to see what is going on.

Subscriptions will now show all topics that they are receiving messages
from, not just the main topic they were subscribed to.

ASTERISK-28335

Change-Id: I484e971a38c3640f2bd156282e532eed84bf220d

23 files changed:
apps/app_voicemail.c
include/asterisk/stasis.h
main/app.c
main/cdr.c
main/cel.c
main/channel_internal_api.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/parking.c
main/presencestate.c
main/rtp_engine.c
main/security_events.c
main/stasis.c
main/stasis_bridges.c
main/stasis_cache.c
main/stasis_cache_pattern.c
main/stasis_channels.c
main/stasis_endpoints.c
main/stasis_system.c
main/test.c
res/res_corosync.c
res/stasis/app.c

index a5a369148ce4f39c4b36541c475325f61c6794b1..a151f0cb964ef0886c1f106db9f0d26811680a0c 100644 (file)
@@ -13334,6 +13334,7 @@ static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
 static void mwi_sub_event_cb(struct stasis_subscription_change *change)
 {
        struct mwi_sub_task *mwist;
+       const char *topic;
        char *context;
        char *mailbox;
 
@@ -13342,7 +13343,9 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
                return;
        }
 
-       if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) {
+       /* The topic name is prefixed with "mwi:all/" as this is a pool topic */
+       topic = stasis_topic_name(change->topic) + 8;
+       if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) {
                ast_free(mwist);
                return;
        }
index 09db9ea912e26bac81a9b163711d6f6fd76c98da..0b229bf7fd8431b032418c5b81a0d58b39a1098d 100644 (file)
@@ -514,6 +514,8 @@ struct stasis_topic;
  * from a topic and destroy it. As a result the topic can persist until
  * the last subscriber unsubscribes itself even if there is no
  * publisher.
+ *
+ * \note Topic names should be in the form of <subsystem>:<functionality>[/<object>]
  */
 struct stasis_topic *stasis_topic_create(const char *name);
 
index ec7449065642129ab9c27dad447a0ba3962de8dc..e8a4d2f4565819dac663588527ab8ac5dbe4b61d 100644 (file)
@@ -3337,7 +3337,7 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a
                stasis_publish(mailbox_specific_topic, clear_msg);
        }
 
-       stasis_topic_pool_delete_topic(mwi_topic_pool, stasis_topic_name(mailbox_specific_topic));
+       stasis_topic_pool_delete_topic(mwi_topic_pool, mwi_state->uniqueid);
 
        ao2_cleanup(clear_msg);
        return 0;
@@ -3430,7 +3430,7 @@ int app_init(void)
        if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) {
                return -1;
        }
-       mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+       mwi_topic_all = stasis_topic_create("mwi:all");
        if (!mwi_topic_all) {
                return -1;
        }
@@ -3446,7 +3446,7 @@ int app_init(void)
        if (!mwi_topic_pool) {
                return -1;
        }
-       queue_topic_all = stasis_topic_create("stasis_queue_topic");
+       queue_topic_all = stasis_topic_create("queue:all");
        if (!queue_topic_all) {
                return -1;
        }
index 53f3362827a389b399c222d42e78fbd779d13a3c..f8f038c138b386c3f10c2eb7ee396d470364f8b3 100644 (file)
@@ -4504,7 +4504,7 @@ static int load_module(void)
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       cdr_topic = stasis_topic_create("cdr_engine");
+       cdr_topic = stasis_topic_create("cdr:aggregator");
        if (!cdr_topic) {
                return AST_MODULE_LOAD_FAILURE;
        }
index 95376db6e3c729153d9137e641d8ea4b700e861d..1e77d2589558b4fd033b3f6eb725637ec7f45e77 100644 (file)
@@ -1431,12 +1431,12 @@ static int unload_module(void)
  */
 static int create_subscriptions(void)
 {
-       cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic");
+       cel_aggregation_topic = stasis_topic_create("cel:aggregator");
        if (!cel_aggregation_topic) {
                return -1;
        }
 
-       cel_topic = stasis_topic_create("cel_topic");
+       cel_topic = stasis_topic_create("cel:misc");
        if (!cel_topic) {
                return -1;
        }
index 22a2bb6b313a9385872661dbbb8f9015647687a4..be8fd7c0240da3c4380d4136250aac75ffa577c3 100644 (file)
@@ -1520,14 +1520,23 @@ int ast_channel_forward_endpoint(struct ast_channel *chan,
 
 int ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
-       const char *topic_name = chan->uniqueid.unique_id;
+       char *topic_name;
+       int ret;
        ast_assert(chan->topic == NULL);
 
-       if (ast_strlen_zero(topic_name)) {
-               topic_name = "<dummy-channel>";
+       if (ast_strlen_zero(chan->uniqueid.unique_id)) {
+               static int dummy_id;
+               ret = ast_asprintf(&topic_name, "channel:dummy-%d", ast_atomic_fetchadd_int(&dummy_id, +1));
+       } else {
+               ret = ast_asprintf(&topic_name, "channel:%s", chan->uniqueid.unique_id);
+       }
+
+       if (ret < 0) {
+               return -1;
        }
 
        chan->topic = stasis_topic_create(topic_name);
+       ast_free(topic_name);
        if (!chan->topic) {
                return -1;
        }
index b6c740ce26f995c2fcf30575b8db086531ebe105..ecf255fefeab4a071bb19bd3530a83561a29c60a 100644 (file)
@@ -902,7 +902,7 @@ int devstate_init(void)
        if (STASIS_MESSAGE_TYPE_INIT(ast_device_state_message_type) != 0) {
                return -1;
        }
-       device_state_topic_all = stasis_topic_create("ast_device_state_topic");
+       device_state_topic_all = stasis_topic_create("devicestate:all");
        if (!device_state_topic_all) {
                return -1;
        }
index b95893270b904598566a1b6a3a3a8453a5826bd1..c53e31d49b288bd5d4f093d93b8994d3b470941a 100644 (file)
@@ -255,9 +255,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
        }
 
        if (!ast_strlen_zero(resource)) {
+               char *topic_name;
+               int ret;
+
+               ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+               if (ret < 0) {
+                       return NULL;
+               }
 
                endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
-                       endpoint->id);
+                       topic_name);
+               ast_free(topic_name);
                if (!endpoint->topics) {
                        return NULL;
                }
@@ -284,8 +292,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                endpoint_publish_snapshot(endpoint);
                ao2_link(endpoints, endpoint);
        } else {
+               char *topic_name;
+               int ret;
+
+               ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+               if (ret < 0) {
+                       return NULL;
+               }
+
                endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
-                       endpoint->id);
+                       topic_name);
+               ast_free(topic_name);
                if (!endpoint->topics) {
                        return NULL;
                }
index 0c715e45f2893ebf7d5ba3bce910f52c68d250a0..8e7a8b203f5fcdd99077b51a63d9ae767cbca387 100644 (file)
@@ -8996,7 +8996,7 @@ static int __init_manager(int reload, int by_external_config)
                if (res != 0) {
                        return -1;
                }
-               manager_topic = stasis_topic_create("manager_topic");
+               manager_topic = stasis_topic_create("manager:core");
                if (!manager_topic) {
                        return -1;
                }
index bf0d0b6b78ca983a075dc7f64251afd426e80e2f..d77a767b0f411bc242f7c3a184779deae1d90d36 100644 (file)
@@ -56,7 +56,7 @@ int ast_parking_stasis_init(void)
                return -1;
        }
 
-       parking_topic = stasis_topic_create("ast_parking");
+       parking_topic = stasis_topic_create("parking:all");
        if (!parking_topic) {
                return -1;
        }
index 65b7f69270cbafa4a7dfe5a6211d79f4337cce14..45433b18915f095bd04f5194f73ab66051274a10 100644 (file)
@@ -500,7 +500,7 @@ int ast_presence_state_engine_init(void)
                return -1;
        }
 
-       presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all");
+       presence_state_topic_all = stasis_topic_create("presence_state:all");
        if (!presence_state_topic_all) {
                return -1;
        }
index fd1613cab865b6c970afe511865490f820edc5df..403b663ca328a59790398056828a31a732cceb67 100644 (file)
@@ -3539,7 +3539,7 @@ int ast_rtp_engine_init(void)
        ast_rwlock_init(&mime_types_lock);
        ast_rwlock_init(&static_RTP_PT_lock);
 
-       rtp_topic = stasis_topic_create("rtp_topic");
+       rtp_topic = stasis_topic_create("rtp:all");
        if (!rtp_topic) {
                return -1;
        }
index 37dce02940fb4d723763459291d367ea3115f623..0328eca359574edeee94f358bdf9d75146e4542e 100644 (file)
@@ -484,7 +484,7 @@ int ast_security_stasis_init(void)
 {
        ast_register_cleanup(security_stasis_cleanup);
 
-       security_topic = stasis_topic_create("ast_security");
+       security_topic = stasis_topic_create("security:all");
        if (!security_topic) {
                return -1;
        }
index fa92eeb134d46cf69cfc2236afec331ee9ae5e94..5f4a147deed9d5ef66d65de60cc6f6d1a2de7b49 100644 (file)
@@ -349,6 +349,8 @@ struct stasis_topic_statistics {
        int messages_dispatched;
        /*! \brief The ids of the subscribers to this topic */
        struct ao2_container *subscribers;
+       /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
+       struct stasis_topic *topic;
        /*! \brief Name of the topic */
        char name[0];
 };
@@ -366,6 +368,9 @@ struct stasis_topic {
        struct stasis_topic_statistics *statistics;
 #endif
 
+       /*! Unique incrementing integer for subscriber ids */
+       int subscriber_id;
+
        /*! Name of the topic */
        char name[0];
 };
@@ -412,11 +417,11 @@ static void topic_statistics_destroy(void *obj)
        ao2_cleanup(statistics->subscribers);
 }
 
-static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
+static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
 {
        struct stasis_topic_statistics *statistics;
 
-       statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, topic_statistics_destroy);
+       statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
        if (!statistics) {
                return NULL;
        }
@@ -427,7 +432,9 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(const char
                return NULL;
        }
 
-       strcpy(statistics->name, name); /* SAFE */
+       /* This is strictly used for the pointer address when showing the topic */
+       statistics->topic = topic;
+       strcpy(statistics->name, topic->name); /* SAFE */
        ao2_link(topic_statistics, statistics);
 
        return statistics;
@@ -448,7 +455,7 @@ struct stasis_topic *stasis_topic_create(const char *name)
        res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
        res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
 #ifdef AST_DEVMODE
-       topic->statistics = stasis_topic_statistics_create(name);
+       topic->statistics = stasis_topic_statistics_create(topic);
        if (!topic->name || !topic->statistics || res)
 #else
        if (!topic->name || res)
@@ -477,8 +484,8 @@ struct stasis_subscription_statistics {
        const char *file;
        /*! \brief The function where the subscription originates */
        const char *func;
-       /*! \brief Name of the topic we subscribed to */
-       char *topic;
+       /*! \brief Names of the topics we are subscribed to */
+       struct ao2_container *topics;
        /*! \brief The message type that currently took the longest to process */
        struct stasis_message_type *highest_time_message_type;
        /*! \brief Highest time spent invoking a message */
@@ -495,6 +502,8 @@ struct stasis_subscription_statistics {
        int uses_threadpool;
        /*! \brief The line number where the subscription originates */
        int lineno;
+       /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
+       struct stasis_subscription *sub;
        /*! \brief Unique ID of the subscription */
        char uniqueid[0];
 };
@@ -503,7 +512,7 @@ struct stasis_subscription_statistics {
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
-       char uniqueid[AST_UUID_STR_LEN];
+       char *uniqueid;
        /*! Topic subscribed to. */
        struct stasis_topic *topic;
        /*! Mailbox for processing incoming messages. */
@@ -546,6 +555,7 @@ static void subscription_dtor(void *obj)
         * be bad. */
        ast_assert(stasis_subscription_is_done(sub));
 
+       ast_free(sub->uniqueid);
        ao2_cleanup(sub->topic);
        sub->topic = NULL;
        ast_taskprocessor_unreference(sub->mailbox);
@@ -628,26 +638,37 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
 }
 
 #ifdef AST_DEVMODE
-static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
-       const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+static void subscription_statistics_destroy(void *obj)
+{
+       struct stasis_subscription_statistics *statistics = obj;
+
+       ao2_cleanup(statistics->topics);
+}
+
+static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
+       int needs_mailbox, int use_thread_pool, const char *file, int lineno,
        const char *func)
 {
        struct stasis_subscription_statistics *statistics;
-       size_t uniqueid_len = strlen(uniqueid) + 1;
 
-       statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
+       statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
        if (!statistics) {
                return NULL;
        }
 
+       statistics->topics = ast_str_container_alloc(1);
+       if (!statistics->topics) {
+               ao2_ref(statistics, -1);
+               return NULL;
+       }
+
        statistics->file = file;
        statistics->lineno = lineno;
        statistics->func = func;
        statistics->uses_mailbox = needs_mailbox;
        statistics->uses_threadpool = use_thread_pool;
-       strcpy(statistics->uniqueid, uniqueid); /* SAFE */
-       statistics->topic = statistics->uniqueid + uniqueid_len;
-       strcpy(statistics->topic, topic); /* SAFE */
+       strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
+       statistics->sub = sub;
        ao2_link(subscription_statistics, statistics);
 
        return statistics;
@@ -665,6 +686,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        const char *func)
 {
        struct stasis_subscription *sub;
+       int ret;
 
        if (!topic) {
                return NULL;
@@ -675,12 +697,17 @@ struct stasis_subscription *internal_stasis_subscribe(
        if (!sub) {
                return NULL;
        }
-       ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 #ifdef AST_DEVMODE
-       sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
-               use_thread_pool, file, lineno, func);
-       if (!sub->statistics) {
+       ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+       sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
+       if (ret < 0 || !sub->statistics) {
+               ao2_ref(sub, -1);
+               return NULL;
+       }
+#else
+       ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+       if (ret < 0) {
                ao2_ref(sub, -1);
                return NULL;
        }
@@ -1012,6 +1039,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
 
 #ifdef AST_DEVMODE
        ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+       ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
 #endif
 
        ao2_unlock(topic);
@@ -1035,6 +1063,7 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
 #ifdef AST_DEVMODE
        if (!res) {
                ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+               ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
        }
 #endif
 
@@ -1498,6 +1527,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
 struct topic_pool_entry {
        struct stasis_forward *forward;
        struct stasis_topic *topic;
+       char name[0];
 };
 
 static void topic_pool_entry_dtor(void *obj)
@@ -1509,10 +1539,19 @@ static void topic_pool_entry_dtor(void *obj)
        entry->topic = NULL;
 }
 
-static struct topic_pool_entry *topic_pool_entry_alloc(void)
+static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
 {
-       return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
-               AO2_ALLOC_OPT_LOCK_NOLOCK);
+       struct topic_pool_entry *topic_pool_entry;
+
+       topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
+               topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+       if (!topic_pool_entry) {
+               return NULL;
+       }
+
+       strcpy(topic_pool_entry->name, topic_name); /* Safe */
+
+       return topic_pool_entry;
 }
 
 struct stasis_topic_pool {
@@ -1550,7 +1589,7 @@ static int topic_pool_entry_hash(const void *obj, const int flags)
                break;
        case OBJ_SEARCH_OBJECT:
                object = obj;
-               key = stasis_topic_name(object->topic);
+               key = object->name;
                break;
        default:
                /* Hash can only work on something with a full key. */
@@ -1569,10 +1608,10 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
 
        switch (flags & OBJ_SEARCH_MASK) {
        case OBJ_SEARCH_OBJECT:
-               right_key = stasis_topic_name(object_right->topic);
+               right_key = object_right->name;
                /* Fall through */
        case OBJ_SEARCH_KEY:
-               cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+               cmp = strcasecmp(object_left->name, right_key);
                break;
        case OBJ_SEARCH_PARTIAL_KEY:
                /* Not supported by container */
@@ -1649,18 +1688,29 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
 {
        RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
+       char *new_topic_name;
+       int ret;
 
        topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
        if (topic_pool_entry) {
                return topic_pool_entry->topic;
        }
 
-       topic_pool_entry = topic_pool_entry_alloc();
+       topic_pool_entry = topic_pool_entry_alloc(topic_name);
        if (!topic_pool_entry) {
                return NULL;
        }
 
-       topic_pool_entry->topic = stasis_topic_create(topic_name);
+       /* To provide further detail and to ensure that the topic is unique within the scope of the
+        * system we prefix it with the pooling topic name, which should itself already be unique.
+        */
+       ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
+       if (ret < 0) {
+               return NULL;
+       }
+
+       topic_pool_entry->topic = stasis_topic_create(new_topic_name);
+       ast_free(new_topic_name);
        if (!topic_pool_entry->topic) {
                return NULL;
        }
@@ -2082,12 +2132,15 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
 
 #ifdef AST_DEVMODE
 
+AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
+
 /*!
  * \internal
  * \brief CLI command implementation for 'stasis statistics show subscriptions'
  */
 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+       struct ao2_container *sorted_subscriptions;
        struct ao2_iterator iter;
        struct stasis_subscription_statistics *statistics;
        int count = 0;
@@ -2112,9 +2165,22 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
                return CLI_SHOWUSAGE;
        }
 
+       sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+               stasis_subscription_statistics_sort_fn, NULL);
+       if (!sorted_subscriptions) {
+               ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) {
+               ao2_ref(sorted_subscriptions, -1);
+               ast_cli(a->fd, "Could not sort subscription statistics\n");
+               return CLI_SUCCESS;
+       }
+
        ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
 
-       iter = ao2_iterator_init(subscription_statistics, 0);
+       iter = ao2_iterator_init(sorted_subscriptions, 0);
        while ((statistics = ao2_iterator_next(&iter))) {
                ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
                        statistics->lowest_time_invoked, statistics->highest_time_invoked);
@@ -2125,6 +2191,8 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
        }
        ao2_iterator_destroy(&iter);
 
+       ao2_ref(sorted_subscriptions, -1);
+
        ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
        ast_cli(a->fd, "\n%d subscriptions\n\n", count);
 
@@ -2169,6 +2237,8 @@ static char *subscription_statistics_complete_name(const char *word, int state)
 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
        struct stasis_subscription_statistics *statistics;
+       struct ao2_iterator i;
+       char *name;
 
        switch (cmd) {
        case CLI_INIT:
@@ -2196,7 +2266,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
        }
 
        ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
-       ast_cli(a->fd, "Topic: %s\n", statistics->topic);
+       ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
        ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
        ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
        ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
@@ -2213,25 +2283,38 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
        }
        ao2_unlock(statistics);
 
+       ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
+
+       ast_cli(a->fd, "Subscribed topics:\n");
+       i = ao2_iterator_init(statistics->topics, 0);
+       while ((name = ao2_iterator_next(&i))) {
+               ast_cli(a->fd, "\t%s\n", name);
+               ao2_ref(name, -1);
+       }
+       ao2_iterator_destroy(&i);
+
        ao2_ref(statistics, -1);
 
        return CLI_SUCCESS;
 }
 
+AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
+
 /*!
  * \internal
  * \brief CLI command implementation for 'stasis statistics show topics'
  */
 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+       struct ao2_container *sorted_topics;
        struct ao2_iterator iter;
        struct stasis_topic_statistics *statistics;
        int count = 0;
        int not_dispatched = 0;
        int dispatched = 0;
-#define FMT_HEADERS            "%-64s %10s %10s %16s %16s\n"
-#define FMT_FIELDS             "%-64s %10d %10d %16ld %16ld\n"
-#define FMT_FIELDS2            "%-64s %10d %10d\n"
+#define FMT_HEADERS            "%-64s %10s %10s %10s %16s %16s\n"
+#define FMT_FIELDS             "%-64s %10d %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2            "%-64s %10s %10d %10d\n"
 
        switch (cmd) {
        case CLI_INIT:
@@ -2248,11 +2331,25 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
                return CLI_SHOWUSAGE;
        }
 
-       ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+       sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+               stasis_topic_statistics_sort_fn, NULL);
+       if (!sorted_topics) {
+               ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       if (ao2_container_dup(sorted_topics, topic_statistics, 0)) {
+               ao2_ref(sorted_topics, -1);
+               ast_cli(a->fd, "Could not sort topic statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
 
-       iter = ao2_iterator_init(topic_statistics, 0);
+       iter = ao2_iterator_init(sorted_topics, 0);
        while ((statistics = ao2_iterator_next(&iter))) {
-               ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
+               ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
+                       statistics->messages_not_dispatched, statistics->messages_dispatched,
                        statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
                not_dispatched += statistics->messages_not_dispatched;
                dispatched += statistics->messages_dispatched;
@@ -2261,7 +2358,9 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
        }
        ao2_iterator_destroy(&iter);
 
-       ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
+       ao2_ref(sorted_topics, -1);
+
+       ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
        ast_cli(a->fd, "\n%d topics\n\n", count);
 
 #undef FMT_HEADERS
@@ -2334,6 +2433,7 @@ static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_
        }
 
        ast_cli(a->fd, "Topic: %s\n", statistics->name);
+       ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
        ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
        ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
        ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
index cfdf117c4eef3d20a06e92beaeb74c36f38059a5..31d3eac3980af88c510ba506afa2b348e7471bd7 100644 (file)
@@ -294,12 +294,21 @@ static struct ast_bridge_snapshot_update *bridge_snapshot_update_create(
 
 int bridge_topics_init(struct ast_bridge *bridge)
 {
+       char *topic_name;
+       int ret;
+
        if (ast_strlen_zero(bridge->uniqueid)) {
                ast_log(LOG_ERROR, "Bridge id initialization required\n");
                return -1;
        }
 
-       bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
+       ret = ast_asprintf(&topic_name, "bridge:%s", bridge->uniqueid);
+       if (ret < 0) {
+               return -1;
+       }
+
+       bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, topic_name);
+       ast_free(topic_name);
        if (!bridge->topic) {
                return -1;
        }
@@ -1365,7 +1374,7 @@ int ast_stasis_bridging_init(void)
 
        ast_register_cleanup(stasis_bridging_cleanup);
 
-       bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
+       bridge_topic_all = stasis_topic_create("bridge:all");
        if (!bridge_topic_all) {
                return -1;
        }
index ee8a1dd4b7a5252d2ec532aa63a6e7d2654e83d0..6be4bf1916feab8d2d63c64ec23f2347b46e498e 100644 (file)
@@ -948,10 +948,11 @@ static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
 {
        struct stasis_caching_topic *caching_topic;
+       static int caching_id;
        char *new_name;
        int ret;
 
-       ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+       ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
        if (ret < 0) {
                return NULL;
        }
index 04d816463ce84c8baaf27c4073d581b9dce189c7..463be69ac5f75fd2eee2bd2a3734921f43b305ac 100644 (file)
@@ -67,13 +67,14 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name,
 {
        char *cached_name = NULL;
        struct stasis_cp_all *all;
+       static int cache_id;
 
        all = ao2_t_alloc(sizeof(*all), all_dtor, name);
        if (!all) {
                return NULL;
        }
 
-       ast_asprintf(&cached_name, "%s-cached", name);
+       ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
        if (!cached_name) {
                ao2_ref(all, -1);
 
index d39fb08fbf41570fd74e895e725b370a53190b4d..e8842c1bfe6926db63e2429f1477c88994b64271 100644 (file)
@@ -1658,7 +1658,7 @@ int ast_stasis_channels_init(void)
 
        ast_register_cleanup(stasis_channels_cleanup);
 
-       channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+       channel_topic_all = stasis_topic_create("channel:all");
        if (!channel_topic_all) {
                return -1;
        }
index b3a837b166a71ac4b5b3c4386186074cbdf2d6da..289a90e14f887ba76436c850265ce837a0866fd4 100644 (file)
@@ -460,7 +460,7 @@ int ast_endpoint_stasis_init(void)
        int res = 0;
        ast_register_cleanup(endpoints_stasis_cleanup);
 
-       endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
+       endpoint_cache_all = stasis_cp_all_create("endpoint:all",
                endpoint_snapshot_get_id);
        if (!endpoint_cache_all) {
                return -1;
index 961a2b06ac54ea8de79eaba92237a3ea55c4baa0..4c84f57e6cfccc7523d2d8ffdedc40c19f565bf8 100644 (file)
@@ -374,7 +374,7 @@ int ast_stasis_system_init(void)
 {
        ast_register_cleanup(stasis_system_cleanup);
 
-       system_topic = stasis_topic_create("ast_system");
+       system_topic = stasis_topic_create("system:all");
        if (!system_topic) {
                return 1;
        }
index 2abe6988db8a9f9618d7ffa8a93d7f86a5730cda..32df829d9ff7aed4d306a4075b3fcee2a498db18 100644 (file)
@@ -1224,7 +1224,7 @@ int ast_test_init(void)
        ast_register_cleanup(test_cleanup);
 
        /* Create stasis topic */
-       test_suite_topic = stasis_topic_create("test_suite_topic");
+       test_suite_topic = stasis_topic_create("testsuite:all");
        if (!test_suite_topic) {
                return -1;
        }
index bf172e31837bb05f2e07b4159c8f84a504dc55dd..6e66c4fef03697d60edee36eecf3da4d51714196 100644 (file)
@@ -1131,7 +1131,7 @@ static int load_module(void)
                goto failed;
        }
 
-       corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+       corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
        if (!corosync_aggregate_topic) {
                ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
                goto failed;
index 0f0923ae97a692f2bf322c3999c4eca737daab2b..a69ca5562c53d6818820a4b5adac7662a6a166aa 100644 (file)
@@ -919,6 +919,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
        int res = 0;
        size_t context_size = strlen("stasis-") + strlen(name) + 1;
        char context_name[context_size];
+       char *topic_name;
+       int ret;
 
        ast_assert(name != NULL);
        ast_assert(handler != NULL);
@@ -939,7 +941,13 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
                return NULL;
        }
 
-       app->topic = stasis_topic_create(name);
+       ret = ast_asprintf(&topic_name, "ari:application/%s", name);
+       if (ret < 0) {
+               return NULL;
+       }
+
+       app->topic = stasis_topic_create(topic_name);
+       ast_free(topic_name);
        if (!app->topic) {
                return NULL;
        }