]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
ARI: Add the ability to subscribe to all events 93/1193/4
authorMatt Jordan <mjordan@digium.com>
Fri, 4 Sep 2015 17:25:07 +0000 (12:25 -0500)
committerMatt Jordan <mjordan@digium.com>
Tue, 22 Sep 2015 18:27:14 +0000 (13:27 -0500)
This patch adds the ability to subscribe to all events. There are two possible
ways to accomplish this:
(1) On initial WebSocket connection. This patch adds a new query parameter,
    'subscribeAll'. If present and True, Asterisk will subscribe the
    applications to all ARI events.
(2) Via the applications resource. When subscribing in this manner, an ARI
    client should merely specify a blank resource name, i.e., 'channels:'
    instead of 'channels:12354'. This will subscribe the application to all
    resources of the 'channels' type.

ASTERISK-24870 #close

Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6

CHANGES
include/asterisk/stasis_app.h
res/ari/resource_events.c
res/ari/resource_events.h
res/res_ari_events.c
res/res_stasis.c
res/stasis/app.c
res/stasis/app.h
res/stasis/messaging.c
rest-api/api-docs/events.json

diff --git a/CHANGES b/CHANGES
index c24919517b62cde87b3a8ed6f5e6362d358ac331..0c467a803c4d2559ca12ed1459f50a2dd1d0b950 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,21 @@ Dialplan Functions
    return the SIP Call-ID associated with the INVITE request that established
    the PJSIP channel.
 
+ARI
+------------------
+ * Added the ability to subscribe to all ARI events in Asterisk, regardless
+   of whether the application 'controls' the resource. This is useful for
+   scenarios where an ARI application merely wants to observe the system,
+   as opposed to control it. There are two ways to accomplish this:
+   (1) Via the WebSocket connection URI. A new query paramter, 'subscribeAll',
+       has been added that, when present and True, will subscribe all
+       specified applications to all ARI event sources in Asterisk.
+   (2) Via the applications resource. An ARI client can, at any time, subscribe
+       to all resources in an event source merely by not providing an explicit
+       resource. For example, subscribing to an event source of 'channels:'
+       as opposed to 'channels:12345' will subscribe the application to all
+       channels.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.4.0 to Asterisk 13.5.0 ------------
 ------------------------------------------------------------------------------
index 567670b69293056dda10df8c969c42ce91ff3637..f2b07e0bfebb93eba2150023dc92cb49fd2780bc 100644 (file)
@@ -91,6 +91,21 @@ struct ao2_container *stasis_app_get_all(void);
  */
 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
 
+/*!
+ * \brief Register a new Stasis application that receives all Asterisk events.
+ *
+ * If an application is already registered with the given name, the old
+ * application is sent a 'replaced' message and unregistered.
+ *
+ * \param app_name Name of this application.
+ * \param handler Callback for application messages.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
+ *
+ * \return 0 for success
+ * \return -1 for error.
+ */
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
+
 /*!
  * \brief Unregister a Stasis application.
  * \param app_name Name of the application to unregister.
index 09bcafc2d1e82ca1ea72e28e3531cc69acedbea2..71d54b49469aa1e0c05ebca62f1a2450c2e70ea1 100644 (file)
@@ -148,9 +148,11 @@ static void app_handler(void *data, const char *app_name,
  * \brief Register for all of the apps given.
  * \param session Session info struct.
  * \param app_name Name of application to register.
+ * \param register_handler Pointer to the application registration handler
  */
 static int session_register_app(struct event_session *session,
-                                const char *app_name)
+                                const char *app_name,
+                                int (* register_handler)(const char *, stasis_app_cb handler, void *data))
 {
        SCOPED_AO2LOCK(lock, session);
 
@@ -167,7 +169,7 @@ static int session_register_app(struct event_session *session,
                return -1;
        }
 
-       stasis_app_register(app_name, app_handler, session);
+       register_handler(app_name, app_handler, session);
 
        return 0;
 }
@@ -178,6 +180,7 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
 {
        int res = 0;
        size_t i, j;
+       int (* register_handler)(const char *, stasis_app_cb handler, void *data);
 
        ast_debug(3, "/events WebSocket attempted\n");
 
@@ -186,13 +189,19 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
                return -1;
        }
 
+       if (args->subscribe_all) {
+               register_handler = &stasis_app_register_all;
+       } else {
+               register_handler = &stasis_app_register;
+       }
+
        for (i = 0; i < args->app_count; ++i) {
                if (ast_strlen_zero(args->app[i])) {
                        res = -1;
                        break;
                }
 
-               res |= stasis_app_register(args->app[i], app_handler, NULL);
+               res |= register_handler(args->app[i], app_handler, NULL);
        }
 
        if (res) {
@@ -213,6 +222,7 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
        struct ast_json *msg;
        int res;
        size_t i;
+       int (* register_handler)(const char *, stasis_app_cb handler, void *data);
 
        ast_debug(3, "/events WebSocket connection\n");
 
@@ -222,12 +232,18 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
                return;
        }
 
+       if (args->subscribe_all) {
+               register_handler = &stasis_app_register_all;
+       } else {
+               register_handler = &stasis_app_register;
+       }
+
        res = 0;
        for (i = 0; i < args->app_count; ++i) {
                if (ast_strlen_zero(args->app[i])) {
                        continue;
                }
-               res |= session_register_app(session, args->app[i]);
+               res |= session_register_app(session, args->app[i], register_handler);
        }
 
        if (ao2_container_count(session->websocket_apps) == 0) {
index 2b631819b266ac93fa9cb23564ed58d778f03db3..c4826995896f6ec99bcd12ffd00b21a853a1b4a1 100644 (file)
@@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args {
        size_t app_count;
        /*! Parsing context for app. */
        char *app_parse;
+       /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
+       int subscribe_all;
 };
 
 /*!
index 4265385116a9925b40f1077de717829857bce6f4..65bd38d5bcba6a9969f1aa9899e38b9ce0211bda 100644 (file)
@@ -110,6 +110,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess
                                args.app[j] = (vals[j]);
                        }
                } else
+               if (strcmp(i->name, "subscribeAll") == 0) {
+                       args.subscribe_all = ast_true(i->value);
+               } else
                {}
        }
 
@@ -208,6 +211,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke
                                args.app[j] = (vals[j]);
                        }
                } else
+               if (strcmp(i->name, "subscribeAll") == 0) {
+                       args.subscribe_all = ast_true(i->value);
+               } else
                {}
        }
 
index fc34fa36f99dfcd8601d8216fe7a8bbbe783b4fb..25866d9bba546ab6c9c9cfcdbee60c5f3224ea5b 100644 (file)
@@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh;
 
 struct ao2_container *app_bridges_playback;
 
+/*!
+ * \internal \brief List of registered event sources.
+ */
+AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
+
 static struct ast_json *stasis_end_to_json(struct stasis_message *message,
                const struct stasis_message_sanitizer *sanitize)
 {
@@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void)
        return ao2_bump(apps);
 }
 
-int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
 {
        RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
 
@@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
        if (app) {
                app_update(app, handler, data);
        } else {
-               app = app_create(app_name, handler, data);
+               app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
                if (app) {
+                       if (all_events) {
+                               struct stasis_app_event_source *source;
+                               SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+                               AST_LIST_TRAVERSE(&event_sources, source, next) {
+                                       if (!source->subscribe) {
+                                               continue;
+                                       }
+
+                                       source->subscribe(app, NULL);
+                               }
+                       }
                        ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
                } else {
                        ao2_unlock(apps_registry);
@@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
        return 0;
 }
 
+int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+{
+       return __stasis_app_register(app_name, handler, data, 0);
+}
+
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
+{
+       return __stasis_app_register(app_name, handler, data, 1);
+}
+
 void stasis_app_unregister(const char *app_name)
 {
        RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name)
        cleanup();
 }
 
-/*!
- * \internal \brief List of registered event sources.
- */
-AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
-
 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
 {
        SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe(
 
        ast_debug(3, "%s: Checking %s\n", app_name, uri);
 
-       if (!event_source->find ||
-           (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
+       if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
+           (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
                ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
                return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
        }
@@ -2062,6 +2084,7 @@ static int load_module(void)
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
+       .load_pri = AST_MODPRI_APP_DEPEND,
        .support_level = AST_MODULE_SUPPORT_CORE,
        .load = load_module,
        .unload = unload_module,
index caa27abfc2ce7316c98116bd8a75324de35dab91..3539182410adcfb98b113b0d25f0a690280be429 100644 (file)
@@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_message_router.h"
 
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
 
 struct stasis_app {
@@ -47,12 +51,16 @@ struct stasis_app {
        struct stasis_message_router *router;
        /*! Router for handling messages to the bridge all \a topic. */
        struct stasis_message_router *bridge_router;
+       /*! Optional router for handling endpoint messages in 'all' subscriptions */
+       struct stasis_message_router *endpoint_router;
        /*! Container of the channel forwards to this app's topic. */
        struct ao2_container *forwards;
        /*! Callback function for this application. */
        stasis_app_cb handler;
        /*! Opaque data to hand to callback function. */
        void *data;
+       /*! Subscription model for the application */
+       enum stasis_app_subscription_model subscription_model;
        /*! Name of the Stasis application */
        char name[];
 };
@@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
        struct ast_channel *chan)
 {
-       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+       struct app_forwards *forwards;
 
-       if (!app || !chan) {
+       if (!app) {
                return NULL;
        }
 
-       forwards = forwards_create(app, ast_channel_uniqueid(chan));
+       forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
        if (!forwards) {
                return NULL;
        }
 
        forwards->forward_type = FORWARD_CHANNEL;
-       forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
-               app->topic);
-       if (!forwards->topic_forward) {
-               return NULL;
+       if (chan) {
+               forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+                       app->topic);
        }
-
        forwards->topic_cached_forward = stasis_forward_all(
-               ast_channel_topic_cached(chan), app->topic);
-       if (!forwards->topic_cached_forward) {
+               chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+               app->topic);
+
+       if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
                /* Half-subscribed is a bad thing */
-               stasis_forward_cancel(forwards->topic_forward);
-               forwards->topic_forward = NULL;
+               forwards_unsubscribe(forwards);
+               ao2_ref(forwards, -1);
                return NULL;
        }
 
-       ao2_ref(forwards, +1);
        return forwards;
 }
 
@@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
        struct ast_bridge *bridge)
 {
-       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+       struct app_forwards *forwards;
 
-       if (!app || !bridge) {
+       if (!app) {
                return NULL;
        }
 
-       forwards = forwards_create(app, bridge->uniqueid);
+       forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
        if (!forwards) {
                return NULL;
        }
 
        forwards->forward_type = FORWARD_BRIDGE;
-       forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
-               app->topic);
-       if (!forwards->topic_forward) {
-               return NULL;
+       if (bridge) {
+               forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+                       app->topic);
        }
-
        forwards->topic_cached_forward = stasis_forward_all(
-               ast_bridge_topic_cached(bridge), app->topic);
-       if (!forwards->topic_cached_forward) {
+               bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+               app->topic);
+
+       if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
                /* Half-subscribed is a bad thing */
-               stasis_forward_cancel(forwards->topic_forward);
-               forwards->topic_forward = NULL;
+               forwards_unsubscribe(forwards);
+               ao2_ref(forwards, -1);
                return NULL;
        }
 
-       ao2_ref(forwards, +1);
        return forwards;
 }
 
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+       struct stasis_message *message)
+{
+       struct stasis_app *app = data;
+
+       stasis_publish(app->topic, message);
+}
+
 /*! Forward a endpoint's topics to an app */
 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
        struct ast_endpoint *endpoint)
 {
-       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
 
-       if (!app || !endpoint) {
+       struct app_forwards *forwards;
+       int ret = 0;
+
+       if (!app) {
                return NULL;
        }
 
-       forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+       forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
        if (!forwards) {
                return NULL;
        }
 
        forwards->forward_type = FORWARD_ENDPOINT;
-       forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
-               app->topic);
-       if (!forwards->topic_forward) {
-               return NULL;
-       }
+       if (endpoint) {
+               forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+                       app->topic);
+               forwards->topic_cached_forward = stasis_forward_all(
+                       ast_endpoint_topic_cached(endpoint), app->topic);
+
+               if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+                       /* Half-subscribed is a bad thing */
+                       forwards_unsubscribe(forwards);
+                       ao2_ref(forwards, -1);
+                       return NULL;
+               }
+       } else {
+               /* Since endpoint subscriptions also subscribe to channels, in the case
+                * of all endpoint subscriptions, we only want messages for the endpoints.
+                * As such, we route those particular messages and then re-publish them
+                * on the app's topic.
+                */
+               ast_assert(app->endpoint_router == NULL);
+               app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+               if (!app->endpoint_router) {
+                       forwards_unsubscribe(forwards);
+                       ao2_ref(forwards, -1);
+                       return NULL;
+               }
 
-       forwards->topic_cached_forward = stasis_forward_all(
-               ast_endpoint_topic_cached(endpoint), app->topic);
-       if (!forwards->topic_cached_forward) {
-               /* Half-subscribed is a bad thing */
-               stasis_forward_cancel(forwards->topic_forward);
-               forwards->topic_forward = NULL;
-               return NULL;
+               ret |= stasis_message_router_add(app->endpoint_router,
+                       ast_endpoint_state_type(), endpoint_state_cb, app);
+               ret |= stasis_message_router_add(app->endpoint_router,
+                       ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+               if (ret) {
+                       ao2_ref(app->endpoint_router, -1);
+                       app->endpoint_router = NULL;
+                       ao2_ref(forwards, -1);
+                       return NULL;
+               }
        }
 
-       ao2_ref(forwards, +1);
        return forwards;
 }
 
@@ -260,6 +299,7 @@ static void app_dtor(void *obj)
 
        ast_assert(app->router == NULL);
        ast_assert(app->bridge_router == NULL);
+       ast_assert(app->endpoint_router == NULL);
 
        ao2_cleanup(app->topic);
        app->topic = NULL;
@@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
        }
 }
 
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
 {
        RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
        size_t size;
@@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
 
        size = sizeof(*app) + strlen(name) + 1;
        app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
        if (!app) {
                return NULL;
        }
+       app->subscription_model = subscription_model;
 
        app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
                AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
        return app;
 }
 
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
        return app->topic;
 }
 
@@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app)
        app->router = NULL;
        stasis_message_router_unsubscribe(app->bridge_router);
        app->bridge_router = NULL;
+       stasis_message_router_unsubscribe(app->endpoint_router);
+       app->endpoint_router = NULL;
 }
 
 int app_is_active(struct stasis_app *app)
@@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
 
 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
 {
+       struct app_forwards *forwards;
+       SCOPED_AO2LOCK(lock, app->forwards);
        int res;
 
-       if (!app || !chan) {
+       if (!app) {
                return -1;
-       } else {
-               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-               SCOPED_AO2LOCK(lock, app->forwards);
+       }
 
-               forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
-                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
-               if (!forwards) {
-                       /* Forwards not found, create one */
-                       forwards = forwards_create_channel(app, chan);
-                       if (!forwards) {
-                               return -1;
-                       }
+       /* If subscribed to all, don't subscribe again */
+       forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (forwards) {
+               ao2_ref(forwards, -1);
+               return 0;
+       }
 
-                       res = ao2_link_flags(app->forwards, forwards,
-                               OBJ_NOLOCK);
-                       if (!res) {
-                               return -1;
-                       }
+       forwards = ao2_find(app->forwards,
+               chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+               OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!forwards) {
+               /* Forwards not found, create one */
+               forwards = forwards_create_channel(app, chan);
+               if (!forwards) {
+                       return -1;
                }
 
-               ++forwards->interested;
-               ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
-               return 0;
+               res = ao2_link_flags(app->forwards, forwards,
+                       OBJ_NOLOCK);
+               if (!res) {
+                       ao2_ref(forwards, -1);
+                       return -1;
+               }
        }
+
+       ++forwards->interested;
+       ast_debug(3, "Channel '%s' is %d interested in %s\n",
+               chan ? ast_channel_uniqueid(chan) : "ALL",
+               forwards->interested,
+               app->name);
+
+       ao2_ref(forwards, -1);
+       return 0;
 }
 
 static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(lock, app->forwards);
 
+       if (!id) {
+               if (!strcmp(kind, "bridge")) {
+                       id = BRIDGE_ALL;
+               } else if (!strcmp(kind, "channel")) {
+                       id = CHANNEL_ALL;
+               } else if (!strcmp(kind, "endpoint")) {
+                       id = ENDPOINT_ALL;
+               } else {
+                       ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+                       return -1;
+               }
+       }
+
        forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
        if (!forwards) {
                ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
 
 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
 {
-       if (!app || !chan) {
+       if (!app) {
                return -1;
        }
 
-       return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+       return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
 }
 
 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
 {
-       if (!app || !channel_id) {
+       if (!app) {
                return -1;
        }
 
@@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
 {
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(channel_id)) {
+               channel_id = CHANNEL_ALL;
+       }
        forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
        return forwards != NULL;
 }
@@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = {
 
 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
 {
-       if (!app || !bridge) {
+       struct app_forwards *forwards;
+       SCOPED_AO2LOCK(lock, app->forwards);
+
+       if (!app) {
                return -1;
-       } else {
-               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-               SCOPED_AO2LOCK(lock, app->forwards);
+       }
 
-               forwards = ao2_find(app->forwards, bridge->uniqueid,
-                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       /* If subscribed to all, don't subscribe again */
+       forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (forwards) {
+               ao2_ref(forwards, -1);
+               return 0;
+       }
 
+       forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+               OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!forwards) {
+               /* Forwards not found, create one */
+               forwards = forwards_create_bridge(app, bridge);
                if (!forwards) {
-                       /* Forwards not found, create one */
-                       forwards = forwards_create_bridge(app, bridge);
-                       if (!forwards) {
-                               return -1;
-                       }
-                       ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+                       return -1;
                }
-
-               ++forwards->interested;
-               ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
-               return 0;
+               ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
        }
+
+       ++forwards->interested;
+       ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+               bridge ? bridge->uniqueid : "ALL",
+               forwards->interested,
+               app->name);
+
+       ao2_ref(forwards, -1);
+       return 0;
 }
 
 static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
 
 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
 {
-       if (!app || !bridge) {
+       if (!app) {
                return -1;
        }
 
-       return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+       return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
 }
 
 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
 {
-       if (!app || !bridge_id) {
+       if (!app) {
                return -1;
        }
 
@@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
 
 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
 {
-       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-       forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
-       return forwards != NULL;
+       struct app_forwards *forwards;
+       SCOPED_AO2LOCK(lock, app->forwards);
+
+       forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (forwards) {
+               ao2_ref(forwards, -1);
+               return 1;
+       }
+
+       if (ast_strlen_zero(bridge_id)) {
+               bridge_id = BRIDGE_ALL;
+       }
+
+       forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (forwards) {
+               ao2_ref(forwards, -1);
+               return 1;
+       }
+
+       return 0;
 }
 
 static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = {
 
 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
 {
-       if (!app || !endpoint) {
+       struct app_forwards *forwards;
+       SCOPED_AO2LOCK(lock, app->forwards);
+
+       if (!app) {
                return -1;
-       } else {
-               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-               SCOPED_AO2LOCK(lock, app->forwards);
+       }
 
-               forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
-                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       /* If subscribed to all, don't subscribe again */
+       forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (forwards) {
+               ao2_ref(forwards, -1);
+               return 0;
+       }
 
+       forwards = ao2_find(app->forwards,
+               endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+               OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!forwards) {
+               /* Forwards not found, create one */
+               forwards = forwards_create_endpoint(app, endpoint);
                if (!forwards) {
-                       /* Forwards not found, create one */
-                       forwards = forwards_create_endpoint(app, endpoint);
-                       if (!forwards) {
-                               return -1;
-                       }
-                       ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
-                       /* Subscribe for messages */
-                       messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+                       return -1;
                }
+               ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
 
-               ++forwards->interested;
-               ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
-               return 0;
+               /* Subscribe for messages */
+               messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
        }
+
+       ++forwards->interested;
+       ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+               endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+               forwards->interested,
+               app->name);
+
+       ao2_ref(forwards, -1);
+       return 0;
 }
 
 static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
 
 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
 {
-       if (!app || !endpoint_id) {
+       if (!app) {
                return -1;
        }
 
@@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
 {
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(endpoint_id)) {
+               endpoint_id = ENDPOINT_ALL;
+       }
        forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
        return forwards != NULL;
 }
index 59574f5849ad648c44c6b3d1c0f2718123b22df6..2c8db1ccd3b3a2ce2e1e9319cb8603dee67de9ca 100644 (file)
  */
 struct stasis_app;
 
+enum stasis_app_subscription_model {
+       /*
+        * \brief An application must manually subscribe to each
+        * resource that it cares about. This is the default approach.
+        */
+       STASIS_APP_SUBSCRIBE_MANUAL,
+       /*
+        * \brief An application is automatically subscribed to all
+        * resources in Asterisk, even if it does not control them.
+        */
+       STASIS_APP_SUBSCRIBE_ALL
+};
+
 /*!
  * \brief Create a res_stasis application.
  *
@@ -45,7 +58,7 @@ struct stasis_app;
  * \return New \c res_stasis application.
  * \return \c NULL on error.
  */
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
 
 /*!
  * \brief Tears down an application.
index fd7cf9f7bb35ccad473892736b53bf41baed917f..229a3a6462be7db773d938e01b34a2e30bcf8bc0 100644 (file)
@@ -37,6 +37,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/test.h"
 #include "messaging.h"
 
+/*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
 /*!
  * \brief Number of buckets for the \ref endpoint_subscriptions container
  */
@@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg)
        for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
                sub = AST_VECTOR_GET(&tech_subscriptions, i);
 
-               if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
-                           || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+               if (!sub) {
+                       continue;
+               }
+
+               if (!strcmp(sub->token, TECH_WILDCARD)
+                   || !strncasecmp(sub->token, buf, strlen(sub->token))
+                   || !strncasecmp(sub->token, buf, strlen(sub->token))) {
                        ast_rwlock_unlock(&tech_subscriptions_lock);
-                       sub = NULL; /* No ref bump! */
                        goto match;
                }
 
@@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg)
 
        sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
        if (sub) {
+               ao2_ref(sub, -1);
                goto match;
        }
 
@@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg)
        return 0;
 
 match:
-       ao2_cleanup(sub);
        return 1;
 }
 
@@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg)
                        continue;
                }
 
-               if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+               if (!strcmp(sub->token, TECH_WILDCARD)
+                   || !strncasecmp(sub->token, buf, strlen(sub->token))) {
                        ast_rwlock_unlock(&tech_subscriptions_lock);
                        ao2_bump(sub);
                        endpoint_name = buf;
@@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
 {
        struct message_subscription *sub = NULL;
 
-       if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+       if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
                sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
        } else {
                int i;
@@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
                for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
                        sub = AST_VECTOR_GET(&tech_subscriptions, i);
 
-                       if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+                       if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
                                ao2_bump(sub);
                                break;
                        }
@@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
        RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
 
        endpoint = ast_endpoint_find_by_id(endpoint_id);
-       if (!endpoint) {
-               return;
-       }
-
        sub = get_subscription(endpoint);
        if (!sub) {
                return;
@@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
 
        AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
        if (AST_VECTOR_SIZE(&sub->applications) == 0) {
-               if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+               if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
                        ao2_unlink(endpoint_subscriptions, sub);
                } else {
                        ast_rwlock_wrlock(&tech_subscriptions_lock);
-                       AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+                       AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
                                messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
                        ast_rwlock_unlock(&tech_subscriptions_lock);
                }
@@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
        ao2_unlock(sub);
        ao2_ref(sub, -1);
 
-       ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+       ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
        ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
-               app_name, ast_endpoint_get_id(endpoint));
+               app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
 }
 
 static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi
                return sub;
        }
 
-       sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+       sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
        if (!sub) {
                return NULL;
        }
 
-       if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+       if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
                ao2_link(endpoint_subscriptions, sub);
        } else {
                ast_rwlock_wrlock(&tech_subscriptions_lock);
@@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *
        AST_VECTOR_APPEND(&sub->applications, tuple);
        ao2_unlock(sub);
 
-       ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+       ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
        ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
-               app_name, ast_endpoint_get_id(endpoint));
+               app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
 
        return 0;
 }
index 8d74900f2787ec6990989e7674df42cb25d54f39..6276fc224497c1c3b058eff8be7cb4fb6b8b4aad 100644 (file)
                                                        "required": true,
                                                        "allowMultiple": true,
                                                        "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "subscribeAll",
+                                                       "description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
+                                                       "paramType": "query",
+                                                       "required": false,
+                                                       "allowMultiple": false,
+                                                       "dataType": "boolean"
                                                }
                                        ]
                                }