]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
stasis: Improve topic/subscription names and statistics.
authorJoshua Colp <jcolp@digium.com>
Thu, 7 Mar 2019 12:28:31 +0000 (08:28 -0400)
committerJoshua Colp <jcolp@digium.com>
Mon, 11 Mar 2019 14:39:08 +0000 (11:39 -0300)
Topic names now follow: <subsystem>:<functionality>[/<object>]

This ensures that they are all unique, and also provides better
insight in to what each topic is for.

Subscriber ids now also use the main topic name they are
subscribed to and an incrementing integer as their identifier to
make it easier to understand what the subscription is primarily
responsible for.

Both the CLI commands for listing topic and subscription statistics
now sort to make it a bit easier to see what is going on.

Subscriptions will now show all topics that they are receiving messages
from, not just the main topic they were subscribed to.

ASTERISK-28335

Change-Id: I484e971a38c3640f2bd156282e532eed84bf220d

23 files changed:
apps/app_voicemail.c
include/asterisk/stasis.h
main/app.c
main/cdr.c
main/cel.c
main/channel_internal_api.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/parking.c
main/presencestate.c
main/rtp_engine.c
main/security_events.c
main/stasis.c
main/stasis_bridges.c
main/stasis_cache.c
main/stasis_cache_pattern.c
main/stasis_channels.c
main/stasis_endpoints.c
main/stasis_system.c
main/test.c
res/res_corosync.c
res/stasis/app.c

index 6a29a20c81c1c83eb6e07fcfc2ee510dd11afa79..64167d1ccb8a011e34e66a8debb67afcdc655fb9 100644 (file)
@@ -13361,6 +13361,7 @@ static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
 static void mwi_sub_event_cb(struct stasis_subscription_change *change)
 {
        struct mwi_sub_task *mwist;
+       const char *topic;
        char *context;
        char *mailbox;
 
@@ -13369,7 +13370,9 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
                return;
        }
 
-       if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) {
+       /* The topic name is prefixed with "mwi:all/" as this is a pool topic */
+       topic = stasis_topic_name(change->topic) + 8;
+       if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) {
                ast_free(mwist);
                return;
        }
index a009097cc820ba9ceabc9b032e1afd56497febc0..af3e4a6c7ada3bb235a77172109feef4d3df157e 100644 (file)
@@ -520,6 +520,8 @@ struct stasis_topic;
  * from a topic and destroy it. As a result the topic can persist until
  * the last subscriber unsubscribes itself even if there is no
  * publisher.
+ *
+ * \note Topic names should be in the form of <subsystem>:<functionality>[/<object>]
  */
 struct stasis_topic *stasis_topic_create(const char *name);
 
index bc28ef6913cc60345a7f06e71fcc9a825563e436..a73349b986544cbe93159d81dc2485251490100d 100644 (file)
@@ -3356,7 +3356,7 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a
                stasis_publish(mailbox_specific_topic, clear_msg);
        }
 
-       stasis_topic_pool_delete_topic(mwi_topic_pool, stasis_topic_name(mailbox_specific_topic));
+       stasis_topic_pool_delete_topic(mwi_topic_pool, mwi_state->uniqueid);
 
        ao2_cleanup(clear_msg);
        return 0;
@@ -3449,7 +3449,7 @@ int app_init(void)
        if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) {
                return -1;
        }
-       mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+       mwi_topic_all = stasis_topic_create("mwi:all");
        if (!mwi_topic_all) {
                return -1;
        }
@@ -3465,7 +3465,7 @@ int app_init(void)
        if (!mwi_topic_pool) {
                return -1;
        }
-       queue_topic_all = stasis_topic_create("stasis_queue_topic");
+       queue_topic_all = stasis_topic_create("queue:all");
        if (!queue_topic_all) {
                return -1;
        }
index ba0f3371eafac21c1aa90a839f2ce1d7fa90092f..7b6fac04a20fccfb53db144b1d0330f31f381873 100644 (file)
@@ -4489,7 +4489,7 @@ int ast_cdr_engine_init(void)
                return -1;
        }
 
-       cdr_topic = stasis_topic_create("cdr_engine");
+       cdr_topic = stasis_topic_create("cdr:aggregator");
        if (!cdr_topic) {
                return -1;
        }
index 4e675813a162a2dba50f62e0e8bb58de6cc228f5..31fd1ca5cdea18645909277dc03e9a7a657a5b7d 100644 (file)
@@ -1442,12 +1442,12 @@ static void cel_engine_cleanup(void)
  */
 static int create_subscriptions(void)
 {
-       cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic");
+       cel_aggregation_topic = stasis_topic_create("cel:aggregator");
        if (!cel_aggregation_topic) {
                return -1;
        }
 
-       cel_topic = stasis_topic_create("cel_topic");
+       cel_topic = stasis_topic_create("cel:misc");
        if (!cel_topic) {
                return -1;
        }
index b5fd87dc487c685f8304b4e90952c2d0ab363209..37f99555288166f717ad6bdd23656d338c5b9241 100644 (file)
@@ -1634,15 +1634,24 @@ int ast_channel_forward_endpoint(struct ast_channel *chan,
 
 int ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
-       const char *topic_name = chan->uniqueid.unique_id;
+       char *topic_name;
+       int ret;
        ast_assert(chan->topics == NULL);
 
-       if (ast_strlen_zero(topic_name)) {
-               topic_name = "<dummy-channel>";
+       if (ast_strlen_zero(chan->uniqueid.unique_id)) {
+               static int dummy_id;
+               ret = ast_asprintf(&topic_name, "channel:dummy-%d", ast_atomic_fetchadd_int(&dummy_id, +1));
+       } else {
+               ret = ast_asprintf(&topic_name, "channel:%s", chan->uniqueid.unique_id);
+       }
+
+       if (ret < 0) {
+               return -1;
        }
 
        chan->topics = stasis_cp_single_create(
                ast_channel_cache_all(), topic_name);
+       ast_free(topic_name);
        if (!chan->topics) {
                return -1;
        }
index 6706725e5e377fcfa183521a554a61c6071d73a4..7d00e9bed2e9519d8d5b124491ec3212ee202556 100644 (file)
@@ -927,7 +927,7 @@ int devstate_init(void)
        if (STASIS_MESSAGE_TYPE_INIT(ast_device_state_message_type) != 0) {
                return -1;
        }
-       device_state_topic_all = stasis_topic_create("ast_device_state_topic");
+       device_state_topic_all = stasis_topic_create("devicestate:all");
        if (!device_state_topic_all) {
                return -1;
        }
index b68f20758cd09b8079f41ee8ecce7cf85fb012f8..e9c2cceb30f23e88c8b5dfd139a159b585ef9873 100644 (file)
@@ -259,9 +259,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
        }
 
        if (!ast_strlen_zero(resource)) {
+               char *topic_name;
+               int ret;
+
+               ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+               if (ret < 0) {
+                       return NULL;
+               }
 
                endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
-                       endpoint->id);
+                       topic_name);
+               ast_free(topic_name);
                if (!endpoint->topics) {
                        return NULL;
                }
@@ -288,8 +296,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                endpoint_publish_snapshot(endpoint);
                ao2_link(endpoints, endpoint);
        } else {
+               char *topic_name;
+               int ret;
+
+               ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+               if (ret < 0) {
+                       return NULL;
+               }
+
                endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
-                       endpoint->id);
+                       topic_name);
+               ast_free(topic_name);
                if (!endpoint->topics) {
                        return NULL;
                }
index 789e779bbe9bf1cdb42185ee3f4efca93380a898..5d66b8a2e54a19b7bb3cec9d6e41862997ca354c 100644 (file)
@@ -8986,7 +8986,7 @@ static int __init_manager(int reload, int by_external_config)
                if (res != 0) {
                        return -1;
                }
-               manager_topic = stasis_topic_create("manager_topic");
+               manager_topic = stasis_topic_create("manager:core");
                if (!manager_topic) {
                        return -1;
                }
index f7f1dfb5c018f7daed396f89b9b2a4157bdaa521..3d383c113cf087ec565701fd53f58de84d899dd8 100644 (file)
@@ -58,7 +58,7 @@ int ast_parking_stasis_init(void)
                return -1;
        }
 
-       parking_topic = stasis_topic_create("ast_parking");
+       parking_topic = stasis_topic_create("parking:all");
        if (!parking_topic) {
                return -1;
        }
index ff4934ade132d0c6f9e59b38ef50ac5fa8ca4aab..8928b627416a882702f3e3f9ed5fa71d380eceb2 100644 (file)
@@ -375,7 +375,7 @@ int ast_presence_state_engine_init(void)
                return -1;
        }
 
-       presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all");
+       presence_state_topic_all = stasis_topic_create("presence_state:all");
        if (!presence_state_topic_all) {
                return -1;
        }
index db016fc1adc529614021f9c51ee65c594d460426..9b197d7e8f833e5231af8244dea8f25e9c56d87e 100644 (file)
@@ -2681,7 +2681,7 @@ int ast_rtp_engine_init(void)
        ast_rwlock_init(&mime_types_lock);
        ast_rwlock_init(&static_RTP_PT_lock);
 
-       rtp_topic = stasis_topic_create("rtp_topic");
+       rtp_topic = stasis_topic_create("rtp:all");
        if (!rtp_topic) {
                return -1;
        }
index 392b2b45955ffb04259847c59134bacecad8fc1e..ba345ba70defc83a2508a2b3e10f5f8814faf5e5 100644 (file)
@@ -486,7 +486,7 @@ int ast_security_stasis_init(void)
 {
        ast_register_cleanup(security_stasis_cleanup);
 
-       security_topic = stasis_topic_create("ast_security");
+       security_topic = stasis_topic_create("security:all");
        if (!security_topic) {
                return -1;
        }
index e464101d66e232d32b9717481be35533a6af1c8d..9a1a5ad45d4cfec88512764c649b3e340e208405 100644 (file)
@@ -351,6 +351,8 @@ struct stasis_topic_statistics {
        int messages_dispatched;
        /*! \brief The ids of the subscribers to this topic */
        struct ao2_container *subscribers;
+       /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
+       struct stasis_topic *topic;
        /*! \brief Name of the topic */
        char name[0];
 };
@@ -368,6 +370,9 @@ struct stasis_topic {
        struct stasis_topic_statistics *statistics;
 #endif
 
+       /*! Unique incrementing integer for subscriber ids */
+       int subscriber_id;
+
        /*! Name of the topic */
        char name[0];
 };
@@ -414,11 +419,11 @@ static void topic_statistics_destroy(void *obj)
        ao2_cleanup(statistics->subscribers);
 }
 
-static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
+static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
 {
        struct stasis_topic_statistics *statistics;
 
-       statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, topic_statistics_destroy);
+       statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
        if (!statistics) {
                return NULL;
        }
@@ -429,7 +434,9 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(const char
                return NULL;
        }
 
-       strcpy(statistics->name, name); /* SAFE */
+       /* This is strictly used for the pointer address when showing the topic */
+       statistics->topic = topic;
+       strcpy(statistics->name, topic->name); /* SAFE */
        ao2_link(topic_statistics, statistics);
 
        return statistics;
@@ -450,7 +457,7 @@ struct stasis_topic *stasis_topic_create(const char *name)
        res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
        res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
 #ifdef AST_DEVMODE
-       topic->statistics = stasis_topic_statistics_create(name);
+       topic->statistics = stasis_topic_statistics_create(topic);
        if (!topic->name || !topic->statistics || res)
 #else
        if (!topic->name || res)
@@ -479,8 +486,8 @@ struct stasis_subscription_statistics {
        const char *file;
        /*! \brief The function where the subscription originates */
        const char *func;
-       /*! \brief Name of the topic we subscribed to */
-       char *topic;
+       /*! \brief Names of the topics we are subscribed to */
+       struct ao2_container *topics;
        /*! \brief The message type that currently took the longest to process */
        struct stasis_message_type *highest_time_message_type;
        /*! \brief Highest time spent invoking a message */
@@ -497,6 +504,8 @@ struct stasis_subscription_statistics {
        int uses_threadpool;
        /*! \brief The line number where the subscription originates */
        int lineno;
+       /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
+       struct stasis_subscription *sub;
        /*! \brief Unique ID of the subscription */
        char uniqueid[0];
 };
@@ -505,7 +514,7 @@ struct stasis_subscription_statistics {
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
-       char uniqueid[AST_UUID_STR_LEN];
+       char *uniqueid;
        /*! Topic subscribed to. */
        struct stasis_topic *topic;
        /*! Mailbox for processing incoming messages. */
@@ -548,6 +557,7 @@ static void subscription_dtor(void *obj)
         * be bad. */
        ast_assert(stasis_subscription_is_done(sub));
 
+       ast_free(sub->uniqueid);
        ao2_cleanup(sub->topic);
        sub->topic = NULL;
        ast_taskprocessor_unreference(sub->mailbox);
@@ -630,26 +640,37 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
 }
 
 #ifdef AST_DEVMODE
-static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
-       const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+static void subscription_statistics_destroy(void *obj)
+{
+       struct stasis_subscription_statistics *statistics = obj;
+
+       ao2_cleanup(statistics->topics);
+}
+
+static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
+       int needs_mailbox, int use_thread_pool, const char *file, int lineno,
        const char *func)
 {
        struct stasis_subscription_statistics *statistics;
-       size_t uniqueid_len = strlen(uniqueid) + 1;
 
-       statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
+       statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
        if (!statistics) {
                return NULL;
        }
 
+       statistics->topics = ast_str_container_alloc(1);
+       if (!statistics->topics) {
+               ao2_ref(statistics, -1);
+               return NULL;
+       }
+
        statistics->file = file;
        statistics->lineno = lineno;
        statistics->func = func;
        statistics->uses_mailbox = needs_mailbox;
        statistics->uses_threadpool = use_thread_pool;
-       strcpy(statistics->uniqueid, uniqueid); /* SAFE */
-       statistics->topic = statistics->uniqueid + uniqueid_len;
-       strcpy(statistics->topic, topic); /* SAFE */
+       strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
+       statistics->sub = sub;
        ao2_link(subscription_statistics, statistics);
 
        return statistics;
@@ -667,6 +688,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        const char *func)
 {
        struct stasis_subscription *sub;
+       int ret;
 
        if (!topic) {
                return NULL;
@@ -677,12 +699,17 @@ struct stasis_subscription *internal_stasis_subscribe(
        if (!sub) {
                return NULL;
        }
-       ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 #ifdef AST_DEVMODE
-       sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
-               use_thread_pool, file, lineno, func);
-       if (!sub->statistics) {
+       ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+       sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
+       if (ret < 0 || !sub->statistics) {
+               ao2_ref(sub, -1);
+               return NULL;
+       }
+#else
+       ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+       if (ret < 0) {
                ao2_ref(sub, -1);
                return NULL;
        }
@@ -1014,6 +1041,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
 
 #ifdef AST_DEVMODE
        ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+       ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
 #endif
 
        ao2_unlock(topic);
@@ -1037,6 +1065,7 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
 #ifdef AST_DEVMODE
        if (!res) {
                ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+               ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
        }
 #endif
 
@@ -1500,6 +1529,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
 struct topic_pool_entry {
        struct stasis_forward *forward;
        struct stasis_topic *topic;
+       char name[0];
 };
 
 static void topic_pool_entry_dtor(void *obj)
@@ -1511,10 +1541,19 @@ static void topic_pool_entry_dtor(void *obj)
        entry->topic = NULL;
 }
 
-static struct topic_pool_entry *topic_pool_entry_alloc(void)
+static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
 {
-       return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
-               AO2_ALLOC_OPT_LOCK_NOLOCK);
+       struct topic_pool_entry *topic_pool_entry;
+
+       topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
+               topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+       if (!topic_pool_entry) {
+               return NULL;
+       }
+
+       strcpy(topic_pool_entry->name, topic_name); /* Safe */
+
+       return topic_pool_entry;
 }
 
 struct stasis_topic_pool {
@@ -1552,7 +1591,7 @@ static int topic_pool_entry_hash(const void *obj, const int flags)
                break;
        case OBJ_SEARCH_OBJECT:
                object = obj;
-               key = stasis_topic_name(object->topic);
+               key = object->name;
                break;
        default:
                /* Hash can only work on something with a full key. */
@@ -1571,10 +1610,10 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
 
        switch (flags & OBJ_SEARCH_MASK) {
        case OBJ_SEARCH_OBJECT:
-               right_key = stasis_topic_name(object_right->topic);
+               right_key = object_right->name;
                /* Fall through */
        case OBJ_SEARCH_KEY:
-               cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+               cmp = strcasecmp(object_left->name, right_key);
                break;
        case OBJ_SEARCH_PARTIAL_KEY:
                /* Not supported by container */
@@ -1651,18 +1690,29 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
 {
        RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
+       char *new_topic_name;
+       int ret;
 
        topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
        if (topic_pool_entry) {
                return topic_pool_entry->topic;
        }
 
-       topic_pool_entry = topic_pool_entry_alloc();
+       topic_pool_entry = topic_pool_entry_alloc(topic_name);
        if (!topic_pool_entry) {
                return NULL;
        }
 
-       topic_pool_entry->topic = stasis_topic_create(topic_name);
+       /* To provide further detail and to ensure that the topic is unique within the scope of the
+        * system we prefix it with the pooling topic name, which should itself already be unique.
+        */
+       ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
+       if (ret < 0) {
+               return NULL;
+       }
+
+       topic_pool_entry->topic = stasis_topic_create(new_topic_name);
+       ast_free(new_topic_name);
        if (!topic_pool_entry->topic) {
                return NULL;
        }
@@ -2084,12 +2134,15 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
 
 #ifdef AST_DEVMODE
 
+AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
+
 /*!
  * \internal
  * \brief CLI command implementation for 'stasis statistics show subscriptions'
  */
 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+       struct ao2_container *sorted_subscriptions;
        struct ao2_iterator iter;
        struct stasis_subscription_statistics *statistics;
        int count = 0;
@@ -2114,9 +2167,22 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
                return CLI_SHOWUSAGE;
        }
 
+       sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+               stasis_subscription_statistics_sort_fn, NULL);
+       if (!sorted_subscriptions) {
+               ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) {
+               ao2_ref(sorted_subscriptions, -1);
+               ast_cli(a->fd, "Could not sort subscription statistics\n");
+               return CLI_SUCCESS;
+       }
+
        ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
 
-       iter = ao2_iterator_init(subscription_statistics, 0);
+       iter = ao2_iterator_init(sorted_subscriptions, 0);
        while ((statistics = ao2_iterator_next(&iter))) {
                ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
                        statistics->lowest_time_invoked, statistics->highest_time_invoked);
@@ -2127,6 +2193,8 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
        }
        ao2_iterator_destroy(&iter);
 
+       ao2_ref(sorted_subscriptions, -1);
+
        ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
        ast_cli(a->fd, "\n%d subscriptions\n\n", count);
 
@@ -2171,6 +2239,8 @@ static char *subscription_statistics_complete_name(const char *word, int state)
 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
        struct stasis_subscription_statistics *statistics;
+       struct ao2_iterator i;
+       char *name;
 
        switch (cmd) {
        case CLI_INIT:
@@ -2198,7 +2268,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
        }
 
        ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
-       ast_cli(a->fd, "Topic: %s\n", statistics->topic);
+       ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
        ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
        ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
        ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
@@ -2215,25 +2285,38 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
        }
        ao2_unlock(statistics);
 
+       ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
+
+       ast_cli(a->fd, "Subscribed topics:\n");
+       i = ao2_iterator_init(statistics->topics, 0);
+       while ((name = ao2_iterator_next(&i))) {
+               ast_cli(a->fd, "\t%s\n", name);
+               ao2_ref(name, -1);
+       }
+       ao2_iterator_destroy(&i);
+
        ao2_ref(statistics, -1);
 
        return CLI_SUCCESS;
 }
 
+AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
+
 /*!
  * \internal
  * \brief CLI command implementation for 'stasis statistics show topics'
  */
 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+       struct ao2_container *sorted_topics;
        struct ao2_iterator iter;
        struct stasis_topic_statistics *statistics;
        int count = 0;
        int not_dispatched = 0;
        int dispatched = 0;
-#define FMT_HEADERS            "%-64s %10s %10s %16s %16s\n"
-#define FMT_FIELDS             "%-64s %10d %10d %16ld %16ld\n"
-#define FMT_FIELDS2            "%-64s %10d %10d\n"
+#define FMT_HEADERS            "%-64s %10s %10s %10s %16s %16s\n"
+#define FMT_FIELDS             "%-64s %10d %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2            "%-64s %10s %10d %10d\n"
 
        switch (cmd) {
        case CLI_INIT:
@@ -2250,11 +2333,25 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
                return CLI_SHOWUSAGE;
        }
 
-       ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+       sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+               stasis_topic_statistics_sort_fn, NULL);
+       if (!sorted_topics) {
+               ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       if (ao2_container_dup(sorted_topics, topic_statistics, 0)) {
+               ao2_ref(sorted_topics, -1);
+               ast_cli(a->fd, "Could not sort topic statistics\n");
+               return CLI_SUCCESS;
+       }
+
+       ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
 
-       iter = ao2_iterator_init(topic_statistics, 0);
+       iter = ao2_iterator_init(sorted_topics, 0);
        while ((statistics = ao2_iterator_next(&iter))) {
-               ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
+               ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
+                       statistics->messages_not_dispatched, statistics->messages_dispatched,
                        statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
                not_dispatched += statistics->messages_not_dispatched;
                dispatched += statistics->messages_dispatched;
@@ -2263,7 +2360,9 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
        }
        ao2_iterator_destroy(&iter);
 
-       ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
+       ao2_ref(sorted_topics, -1);
+
+       ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
        ast_cli(a->fd, "\n%d topics\n\n", count);
 
 #undef FMT_HEADERS
@@ -2336,6 +2435,7 @@ static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_
        }
 
        ast_cli(a->fd, "Topic: %s\n", statistics->name);
+       ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
        ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
        ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
        ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
index 0de41973fdcdf64cdbe029aa1e4c3200cb97aa8b..5432df821d48a49329b3c326d3c4132b7045fa3e 100644 (file)
@@ -194,12 +194,22 @@ struct stasis_topic *ast_bridge_topic_all_cached(void)
 
 int bridge_topics_init(struct ast_bridge *bridge)
 {
+       char *topic_name;
+       int ret;
+
        if (ast_strlen_zero(bridge->uniqueid)) {
                ast_log(LOG_ERROR, "Bridge id initialization required\n");
                return -1;
        }
+
+       ret = ast_asprintf(&topic_name, "bridge:%s", bridge->uniqueid);
+       if (ret < 0) {
+               return -1;
+       }
+
        bridge->topics = stasis_cp_single_create(bridge_cache_all,
-               bridge->uniqueid);
+               topic_name);
+       ast_free(topic_name);
        if (!bridge->topics) {
                return -1;
        }
@@ -1290,7 +1300,7 @@ int ast_stasis_bridging_init(void)
 
        ast_register_cleanup(stasis_bridging_cleanup);
 
-       bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
+       bridge_cache_all = stasis_cp_all_create("bridge:all",
                bridge_snapshot_get_id);
 
        if (!bridge_cache_all) {
index c7041f1bd2f4bb50cbbc85442062e245de5c2423..541261a87cfe87758b6c9b76811e583826894e1f 100644 (file)
@@ -950,10 +950,11 @@ static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
 {
        struct stasis_caching_topic *caching_topic;
+       static int caching_id;
        char *new_name;
        int ret;
 
-       ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+       ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
        if (ret < 0) {
                return NULL;
        }
index c2ea76622063512948e9fe8dca944f26b3e80590..b662a1acb6d1f9e8c800863cfce0fecc8aa3c3da 100644 (file)
@@ -69,13 +69,14 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name,
 {
        char *cached_name = NULL;
        struct stasis_cp_all *all;
+       static int cache_id;
 
        all = ao2_t_alloc(sizeof(*all), all_dtor, name);
        if (!all) {
                return NULL;
        }
 
-       ast_asprintf(&cached_name, "%s-cached", name);
+       ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
        if (!cached_name) {
                ao2_ref(all, -1);
 
index 8c7a85f04cfd830a264991599686d8a1d24f3784..d0f502666adebc9654d684af72d8b41283ee87c5 100644 (file)
@@ -1361,7 +1361,7 @@ int ast_stasis_channels_init(void)
 
        ast_register_cleanup(stasis_channels_cleanup);
 
-       channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
+       channel_cache_all = stasis_cp_all_create("channel:all",
                channel_snapshot_get_id);
        if (!channel_cache_all) {
                return -1;
index cdee048ec273a20a462732a26925620886fcba4a..339443f072dffa7e9b082ec1cbfb5e78eeb42614 100644 (file)
@@ -462,7 +462,7 @@ int ast_endpoint_stasis_init(void)
        int res = 0;
        ast_register_cleanup(endpoints_stasis_cleanup);
 
-       endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
+       endpoint_cache_all = stasis_cp_all_create("endpoint:all",
                endpoint_snapshot_get_id);
        if (!endpoint_cache_all) {
                return -1;
index 4b2963c4c93d2af1fc9f7bd6b6d9de665e1969b1..35bbdcc0a2e0aeaa364d12a4c6d243b35ed804d2 100644 (file)
@@ -376,7 +376,7 @@ int ast_stasis_system_init(void)
 {
        ast_register_cleanup(stasis_system_cleanup);
 
-       system_topic = stasis_topic_create("ast_system");
+       system_topic = stasis_topic_create("system:all");
        if (!system_topic) {
                return 1;
        }
index cd13a32009c5712f71bf77ee8087cb68c90a7f4f..28bae64e6967029eeb480c4c55c422b0610b49af 100644 (file)
@@ -1226,7 +1226,7 @@ int ast_test_init(void)
        ast_register_cleanup(test_cleanup);
 
        /* Create stasis topic */
-       test_suite_topic = stasis_topic_create("test_suite_topic");
+       test_suite_topic = stasis_topic_create("testsuite:all");
        if (!test_suite_topic) {
                return -1;
        }
index b1518c5feb5ca4b7c3d956bd9ea48b9183ef6f45..7a0eefe9cd3d366802f9d753b8181ee27bca2c61 100644 (file)
@@ -1133,7 +1133,7 @@ static int load_module(void)
                goto failed;
        }
 
-       corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+       corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
        if (!corosync_aggregate_topic) {
                ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
                goto failed;
index 068cef3a70ef39139afb294d1cc5ed2e1a8eff06..336dfdc2b958d1b1d3bb588be8169b18c82809e7 100644 (file)
@@ -962,6 +962,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
        int res = 0;
        size_t context_size = strlen("stasis-") + strlen(name) + 1;
        char context_name[context_size];
+       char *topic_name;
+       int ret;
 
        ast_assert(name != NULL);
        ast_assert(handler != NULL);
@@ -982,7 +984,13 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
                return NULL;
        }
 
-       app->topic = stasis_topic_create(name);
+       ret = ast_asprintf(&topic_name, "ari:application/%s", name);
+       if (ret < 0) {
+               return NULL;
+       }
+
+       app->topic = stasis_topic_create(topic_name);
+       ast_free(topic_name);
        if (!app->topic) {
                return NULL;
        }