#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
+#ifdef AST_DEVMODE
+#include "asterisk/cli.h"
+#endif
/*** DOCUMENTATION
<managerEvent language="en_US" name="UserEvent">
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
+#ifdef AST_DEVMODE
+
+/*! The number of buckets to use for topic statistics */
+#define TOPIC_STATISTICS_BUCKETS 57
+
+/*! The number of buckets to use for subscription statistics */
+#define SUBSCRIPTION_STATISTICS_BUCKETS 57
+
+/*! Container which stores statistics for topics */
+static struct ao2_container *topic_statistics;
+
+/*! Container which stores statistics for subscriptions */
+static struct ao2_container *subscription_statistics;
+
+/*! \internal */
+struct stasis_message_type_statistics {
+ /*! \brief The number of messages of this published */
+ int published;
+ /*! \brief The number of messages of this that did not reach a subscriber */
+ int unused;
+ /*! \brief The stasis message type */
+ struct stasis_message_type *message_type;
+};
+
+/*! Lock to protect the message types vector */
+AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
+
+/*! Vector containing message type information */
+static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
+
+/*! \internal */
+struct stasis_topic_statistics {
+ /*! \brief The number of messages that were not dispatched to any subscriber */
+ int messages_not_dispatched;
+ /*! \brief The number of messages that were dispatched to at least 1 subscriber */
+ int messages_dispatched;
+ /*! \brief Highest time spent dispatching messages to subscribers */
+ int64_t highest_time_dispatched;
+ /*! \brief Lowest time spent dispatching messages to subscribers */
+ int64_t lowest_time_dispatched;
+ /*! \brief The number of subscribers to this topic */
+ int subscriber_count;
+ /*! \brief Name of the topic */
+ char name[0];
+};
+#endif
+
/*! \internal */
struct stasis_topic {
- char *name;
/*! Variable length array of the subscribers */
AST_VECTOR(, struct stasis_subscription *) subscribers;
/*! Topics forwarding into this topic */
AST_VECTOR(, struct stasis_topic *) upstream_topics;
+
+#ifdef AST_DEVMODE
+ struct stasis_topic_statistics *statistics;
+#endif
+
+ /*! Name of the topic */
+ char name[0];
};
/* Forward declarations for the tightly-coupled subscription object */
* unsubscribed before we get here. */
ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
- ast_free(topic->name);
- topic->name = NULL;
-
AST_VECTOR_FREE(&topic->subscribers);
AST_VECTOR_FREE(&topic->upstream_topics);
+
+#ifdef AST_DEVMODE
+ if (topic->statistics) {
+ ao2_unlink(topic_statistics, topic->statistics);
+ ao2_ref(topic->statistics, -1);
+ }
+#endif
+}
+
+#ifdef AST_DEVMODE
+static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
+{
+ struct stasis_topic_statistics *statistics;
+
+ statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, NULL);
+ if (!statistics) {
+ return NULL;
+ }
+
+ strcpy(statistics->name, name); /* SAFE */
+ ao2_link(topic_statistics, statistics);
+
+ return statistics;
}
+#endif
struct stasis_topic *stasis_topic_create(const char *name)
{
struct stasis_topic *topic;
int res = 0;
- topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
+ topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
if (!topic) {
return NULL;
}
- topic->name = ast_strdup(name);
+ strcpy(topic->name, name); /* SAFE */
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
+#ifdef AST_DEVMODE
+ topic->statistics = stasis_topic_statistics_create(name);
+ if (!topic->name || !topic->statistics || res) {
+#else
if (!topic->name || res) {
- ao2_cleanup(topic);
+#endif
+ ao2_ref(topic, -1);
return NULL;
}
return AST_VECTOR_SIZE(&topic->subscribers);
}
+#ifdef AST_DEVMODE
+struct stasis_subscription_statistics {
+ /*! \brief The filename where the subscription originates */
+ const char *file;
+ /*! \brief The line number where the subscription originates */
+ int lineno;
+ /*! \brief The function where the subscription originates */
+ const char *func;
+ /*! \brief The number of messages that were filtered out */
+ int messages_dropped;
+ /*! \brief The number of messages that passed filtering */
+ int messages_passed;
+ /*! \brief Highest time spent invoking a message */
+ int64_t highest_time_invoked;
+ /*! \brief The message type that currently took the longest to process */
+ struct stasis_message_type *highest_time_message_type;
+ /*! \brief Lowest time spent invoking a message */
+ int64_t lowest_time_invoked;
+ /*! \brief Using a mailbox to queue messages */
+ int uses_mailbox;
+ /*! \brief Using stasis threadpool for handling messages */
+ int uses_threadpool;
+ /*! \brief Name of the topic we subscribed to */
+ char *topic;
+ /*! \brief Unique ID of the subscription */
+ char uniqueid[0];
+};
+#endif
+
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
enum stasis_subscription_message_formatters accepted_formatters;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter;
+
+#ifdef AST_DEVMODE
+ /*! Statistics information */
+ struct stasis_subscription_statistics *statistics;
+#endif
};
static void subscription_dtor(void *obj)
ast_cond_destroy(&sub->join_cond);
AST_VECTOR_FREE(&sub->accepted_message_types);
+
+#ifdef AST_DEVMODE
+ if (sub->statistics) {
+ ao2_unlink(subscription_statistics, sub->statistics);
+ ao2_ref(sub->statistics, -1);
+ }
+#endif
}
/*!
{
unsigned int final = stasis_subscription_final_message(sub, message);
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+#ifdef AST_DEVMODE
+ struct timeval start;
+ int elapsed;
+
+ start = ast_tvnow();
+#endif
/* Notify that the final message has been received */
if (final) {
ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
}
+
+#ifdef AST_DEVMODE
+ elapsed = ast_tvdiff_ms(ast_tvnow(), start);
+ if (elapsed > sub->statistics->highest_time_invoked) {
+ sub->statistics->highest_time_invoked = elapsed;
+ ao2_lock(sub->statistics);
+ sub->statistics->highest_time_message_type = stasis_message_type(message);
+ ao2_unlock(sub->statistics);
+ }
+ if (elapsed < sub->statistics->lowest_time_invoked) {
+ sub->statistics->lowest_time_invoked = elapsed;
+ }
+#endif
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
{
}
+#ifdef AST_DEVMODE
+static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
+ const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+ const char *func)
+{
+ struct stasis_subscription_statistics *statistics;
+ size_t uniqueid_len = strlen(uniqueid) + 1;
+
+ statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
+ if (!statistics) {
+ return NULL;
+ }
+
+ statistics->file = file;
+ statistics->lineno = lineno;
+ statistics->func = func;
+ statistics->uses_mailbox = needs_mailbox;
+ statistics->uses_threadpool = use_thread_pool;
+ strcpy(statistics->uniqueid, uniqueid); /* SAFE */
+ statistics->topic = statistics->uniqueid + uniqueid_len;
+ strcpy(statistics->topic, topic); /* SAFE */
+ ao2_link(subscription_statistics, statistics);
+
+ return statistics;
+}
+#endif
+
+#ifdef AST_DEVMODE
+struct stasis_subscription *internal_stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data,
+ int needs_mailbox,
+ int use_thread_pool,
+ const char *file,
+ int lineno,
+ const char *func)
+#else
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
int use_thread_pool)
+#endif
{
struct stasis_subscription *sub;
}
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
+#ifdef AST_DEVMODE
+ sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
+ use_thread_pool, file, lineno, func);
+ if (!sub->statistics) {
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+#endif
+
if (needs_mailbox) {
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
return sub;
}
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data,
+ const char *file,
+ int lineno,
+ const char *func)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
+}
+#else
struct stasis_subscription *stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
{
return internal_stasis_subscribe(topic, callback, data, 1, 0);
}
+#endif
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe_pool(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data,
+ const char *file,
+ int lineno,
+ const char *func)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
+}
+#else
struct stasis_subscription *stasis_subscribe_pool(
struct stasis_topic *topic,
stasis_subscription_cb callback,
{
return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
+#endif
static int sub_cleanup(void *data)
{
topic_add_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
}
+
+#ifdef AST_DEVMODE
+ topic->statistics->subscriber_count += 1;
+#endif
+
ao2_unlock(topic);
return 0;
}
res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
AST_VECTOR_ELEM_CLEANUP_NOOP);
+
+#ifdef AST_DEVMODE
+ if (!res) {
+ topic->statistics->subscriber_count -= 1;
+ }
+#endif
+
ao2_unlock(topic);
return res;
* \param message The message to send
* \param synchronous If non-zero, synchronize on the subscriber receiving
* the message
+ * \retval 0 if message was not dispatched
+ * \retval 1 if message was dispatched
*/
-static void dispatch_message(struct stasis_subscription *sub,
+static unsigned int dispatch_message(struct stasis_subscription *sub,
struct stasis_message *message,
int synchronous)
{
break;
}
- return;
+#ifdef AST_DEVMODE
+ ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
+#endif
+
+ return 0;
} while (0);
+#ifdef AST_DEVMODE
+ ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
+#endif
+
if (!sub->mailbox) {
/* Dispatch directly */
subscription_invoke(sub, message);
- return;
+ return 1;
}
/* Bump the message for the taskprocessor push. This will get de-ref'd
/* Push failed; ugh. */
ast_log(LOG_ERROR, "Dropping async dispatch\n");
ao2_cleanup(message);
+ return 0;
}
} else {
struct sync_task_data std;
ao2_cleanup(message);
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
- return;
+ return 0;
}
ast_mutex_lock(&std.lock);
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
}
+
+ return 1;
}
/*!
struct stasis_message *message, struct stasis_subscription *sync_sub)
{
size_t i;
+ unsigned int dispatched = 0;
+#ifdef AST_DEVMODE
+ int message_type_id = stasis_message_type_id(stasis_message_type(message));
+ struct stasis_message_type_statistics *statistics;
+ struct timeval start;
+ int elapsed;
+#endif
ast_assert(topic != NULL);
ast_assert(message != NULL);
+#ifdef AST_DEVMODE
+ ast_mutex_lock(&message_type_statistics_lock);
+ if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
+ struct stasis_message_type_statistics new_statistics = {
+ .published = 0,
+ };
+ if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
+ ast_mutex_unlock(&message_type_statistics_lock);
+ return;
+ }
+ }
+ statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
+ statistics->message_type = stasis_message_type(message);
+ ast_mutex_unlock(&message_type_statistics_lock);
+
+ ast_atomic_fetchadd_int(&statistics->published, +1);
+#endif
+
/* If there are no subscribers don't bother */
if (!stasis_topic_subscribers(topic)) {
+#ifdef AST_DEVMODE
+ ast_atomic_fetchadd_int(&statistics->unused, +1);
+ ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
+#endif
return;
}
* Make sure we hold onto a reference while dispatching.
*/
ao2_ref(topic, +1);
+#ifdef AST_DEVMODE
+ start = ast_tvnow();
+#endif
ao2_lock(topic);
for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
ast_assert(sub != NULL);
- dispatch_message(sub, message, (sub == sync_sub));
+ dispatched += dispatch_message(sub, message, (sub == sync_sub));
}
ao2_unlock(topic);
+
+#ifdef AST_DEVMODE
+ elapsed = ast_tvdiff_ms(ast_tvnow(), start);
+ if (elapsed > topic->statistics->highest_time_dispatched) {
+ topic->statistics->highest_time_dispatched = elapsed;
+ }
+ if (elapsed < topic->statistics->lowest_time_dispatched) {
+ topic->statistics->lowest_time_dispatched = elapsed;
+ }
+ if (dispatched) {
+ ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
+ } else {
+ ast_atomic_fetchadd_int(&statistics->unused, +1);
+ ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
+ }
+#endif
+
ao2_ref(topic, -1);
}
/*! @} */
+#ifdef AST_DEVMODE
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show subscriptions'
+ */
+static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ao2_iterator iter;
+ struct stasis_subscription_statistics *statistics;
+ int count = 0;
+ int dropped = 0;
+ int passed = 0;
+#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
+#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2 "%-64s %10d %10d\n"
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "stasis statistics show subscriptions";
+ e->usage =
+ "Usage: stasis statistics show subscriptions\n"
+ " Shows a list of subscriptions and their general statistics\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != e->args) {
+ return CLI_SHOWUSAGE;
+ }
+
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
+
+ iter = ao2_iterator_init(subscription_statistics, 0);
+ while ((statistics = ao2_iterator_next(&iter))) {
+ ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
+ statistics->lowest_time_invoked, statistics->highest_time_invoked);
+ dropped += statistics->messages_dropped;
+ passed += statistics->messages_passed;
+ ao2_ref(statistics, -1);
+ ++count;
+ }
+ ao2_iterator_destroy(&iter);
+
+ ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
+ ast_cli(a->fd, "\n%d subscriptions\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+#undef FMT_FIELDS2
+
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI tab completion for subscription statistics names
+ */
+static char *subscription_statistics_complete_name(const char *word, int state)
+{
+ struct stasis_subscription_statistics *statistics;
+ struct ao2_iterator it_statistics;
+ int wordlen = strlen(word);
+ int which = 0;
+ char *result = NULL;
+
+ it_statistics = ao2_iterator_init(subscription_statistics, 0);
+ while ((statistics = ao2_iterator_next(&it_statistics))) {
+ if (!strncasecmp(word, statistics->uniqueid, wordlen)
+ && ++which > state) {
+ result = ast_strdup(statistics->uniqueid);
+ }
+ ao2_ref(statistics, -1);
+ if (result) {
+ break;
+ }
+ }
+ ao2_iterator_destroy(&it_statistics);
+ return result;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show subscription'
+ */
+static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct stasis_subscription_statistics *statistics;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "stasis statistics show subscription";
+ e->usage =
+ "Usage: stasis statistics show subscription <uniqueid>\n"
+ " Show stasis subscription statistics.\n";
+ return NULL;
+ case CLI_GENERATE:
+ if (a->pos == 4) {
+ return subscription_statistics_complete_name(a->word, a->n);
+ } else {
+ return NULL;
+ }
+ }
+
+ if (a->argc != 5) {
+ return CLI_SHOWUSAGE;
+ }
+
+ statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY);
+ if (!statistics) {
+ ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
+ ast_cli(a->fd, "Topic: %s\n", statistics->topic);
+ ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
+ ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
+ ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
+ ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
+ ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
+ ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
+ ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
+ ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
+ ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
+
+ ao2_lock(statistics);
+ if (statistics->highest_time_message_type) {
+ ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
+ }
+ ao2_unlock(statistics);
+
+ ao2_ref(statistics, -1);
+
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show topics'
+ */
+static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ao2_iterator iter;
+ struct stasis_topic_statistics *statistics;
+ int count = 0;
+ int not_dispatched = 0;
+ int dispatched = 0;
+#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
+#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2 "%-64s %10d %10d\n"
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "stasis statistics show topics";
+ e->usage =
+ "Usage: stasis statistics show topics\n"
+ " Shows a list of topics and their general statistics\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != e->args) {
+ return CLI_SHOWUSAGE;
+ }
+
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+
+ iter = ao2_iterator_init(topic_statistics, 0);
+ while ((statistics = ao2_iterator_next(&iter))) {
+ ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
+ statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
+ not_dispatched += statistics->messages_not_dispatched;
+ dispatched += statistics->messages_dispatched;
+ ao2_ref(statistics, -1);
+ ++count;
+ }
+ ao2_iterator_destroy(&iter);
+
+ ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
+ ast_cli(a->fd, "\n%d topics\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+#undef FMT_FIELDS2
+
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI tab completion for topic statistics names
+ */
+static char *topic_statistics_complete_name(const char *word, int state)
+{
+ struct stasis_topic_statistics *statistics;
+ struct ao2_iterator it_statistics;
+ int wordlen = strlen(word);
+ int which = 0;
+ char *result = NULL;
+
+ it_statistics = ao2_iterator_init(topic_statistics, 0);
+ while ((statistics = ao2_iterator_next(&it_statistics))) {
+ if (!strncasecmp(word, statistics->name, wordlen)
+ && ++which > state) {
+ result = ast_strdup(statistics->name);
+ }
+ ao2_ref(statistics, -1);
+ if (result) {
+ break;
+ }
+ }
+ ao2_iterator_destroy(&it_statistics);
+ return result;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show topic'
+ */
+static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct stasis_topic_statistics *statistics;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "stasis statistics show topic";
+ e->usage =
+ "Usage: stasis statistics show topic <name>\n"
+ " Show stasis topic statistics.\n";
+ return NULL;
+ case CLI_GENERATE:
+ if (a->pos == 4) {
+ return topic_statistics_complete_name(a->word, a->n);
+ } else {
+ return NULL;
+ }
+ }
+
+ if (a->argc != 5) {
+ return CLI_SHOWUSAGE;
+ }
+
+ statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY);
+ if (!statistics) {
+ ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "Topic: %s\n", statistics->name);
+ ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
+ ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
+ ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
+ ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
+ ast_cli(a->fd, "Number of subscribers: %d\n", statistics->subscriber_count);
+
+ ao2_ref(statistics, -1);
+
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show messages'
+ */
+static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ int i;
+ int count = 0;
+ int published = 0;
+ int unused = 0;
+#define FMT_HEADERS "%-64s %10s %10s\n"
+#define FMT_FIELDS "%-64s %10d %10d\n"
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "stasis statistics show messages";
+ e->usage =
+ "Usage: stasis statistics show messages\n"
+ " Shows a list of message types and their general statistics\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != e->args) {
+ return CLI_SHOWUSAGE;
+ }
+
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
+
+ ast_mutex_lock(&message_type_statistics_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
+ struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
+
+ if (!statistics->message_type) {
+ continue;
+ }
+
+ ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
+ statistics->unused);
+ published += statistics->published;
+ unused += statistics->unused;
+ ++count;
+ }
+ ast_mutex_unlock(&message_type_statistics_lock);
+
+ ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
+ ast_cli(a->fd, "\n%d seen message types\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_stasis_statistics[] = {
+ AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
+ AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
+ AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
+ AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
+ AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
+};
+
+static int subscription_statistics_hash(const void *obj, const int flags)
+{
+ const struct stasis_subscription_statistics *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->uniqueid;
+ break;
+ default:
+ /* Hash can only work on something with a full key. */
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_case_hash(key);
+}
+
+static int subscription_statistics_cmp(void *obj, void *arg, int flags)
+{
+ const struct stasis_subscription_statistics *object_left = obj;
+ const struct stasis_subscription_statistics *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->uniqueid;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcasecmp(object_left->uniqueid, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container */
+ ast_assert(0);
+ cmp = -1;
+ break;
+ default:
+ /*
+ * What arg points to is specific to this traversal callback
+ * and has no special meaning to astobj2.
+ */
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ /*
+ * At this point the traversal callback is identical to a sorted
+ * container.
+ */
+ return CMP_MATCH;
+}
+
+static int topic_statistics_hash(const void *obj, const int flags)
+{
+ const struct stasis_topic_statistics *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->name;
+ break;
+ default:
+ /* Hash can only work on something with a full key. */
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_case_hash(key);
+}
+
+static int topic_statistics_cmp(void *obj, void *arg, int flags)
+{
+ const struct stasis_topic_statistics *object_left = obj;
+ const struct stasis_topic_statistics *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->name;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcasecmp(object_left->name, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container */
+ ast_assert(0);
+ cmp = -1;
+ break;
+ default:
+ /*
+ * What arg points to is specific to this traversal callback
+ * and has no special meaning to astobj2.
+ */
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ /*
+ * At this point the traversal callback is identical to a sorted
+ * container.
+ */
+ return CMP_MATCH;
+}
+#endif
+
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
+#ifdef AST_DEVMODE
+ ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
+ AST_VECTOR_FREE(&message_type_statistics);
+ ao2_cleanup(subscription_statistics);
+ ao2_cleanup(topic_statistics);
+#endif
ast_threadpool_shutdown(pool);
pool = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
return -1;
}
+#ifdef AST_DEVMODE
+ /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
+ * topic or subscripton.
+ */
+ subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
+ subscription_statistics_hash, 0, subscription_statistics_cmp);
+ if (!subscription_statistics) {
+ return -1;
+ }
+
+ topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
+ topic_statistics_hash, 0, topic_statistics_cmp);
+ if (!topic_statistics) {
+ return -1;
+ }
+
+ AST_VECTOR_INIT(&message_type_statistics, 0);
+
+ if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
+ return -1;
+ }
+#endif
+
return 0;
}