]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
ARI/res_stasis: Subscribe to both Local channel halves when originating to app
authorMatthew Jordan <mjordan@digium.com>
Mon, 7 Jul 2014 02:13:13 +0000 (02:13 +0000)
committerMatthew Jordan <mjordan@digium.com>
Mon, 7 Jul 2014 02:13:13 +0000 (02:13 +0000)
This patch fixes two bugs:

1. When originating a channel into a Stasis application, we already create a
   subscription for the channel that is going into our Stasis app.
   Unfortunately, when you create a Local channel and pass it off to a Stasis
   app, you really aren't creating just one channel: you're creating two. This
   patch snags the second half of the Local channel pair (assuming it is a
   Local channel pair, but luckily core_local is kind about such assumptions)
   and subscribes to it as well.

2. Subscriptions are a bit sticky right now. If a subscription is made, the
   'interest' count gets bumped on the Stasis subscription - but unless
   something explicitly unsubscribes the channel, said subscription sticks
   around. This is not much of a problem is a user is creating the subscription
   - if they made it, they must want it. However, when we are creating
   implicit subscriptions, we need to make sure something clears them out.
   This patch takes a pessimistic approach: it watches the cache updates
   coming from Stasis and, if we notice that the cache just cleared out an
   object, we delete our subscription object. This keeps our ao2 container of
   Stasis forwards in an application from growing out of hand; it also is a
   bit more forgiving for end users who may not realize they were supposed to
   unsubscribe from that channel that just hung up.

Review: https://reviewboard.asterisk.org/r/3710/
ASTERISK-23939 #close

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@418089 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis_app.h
res/ari/resource_channels.c
res/res_stasis.c
res/stasis/app.c

index 334155a5b29724f6417d723c83caae9797d009d5..a7b2040346fc12db6d5d3aa5ba930aeec91be439 100644 (file)
@@ -297,6 +297,21 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
        const char **event_source_uris, int event_sources_count,
        struct ast_json **json);
 
+/*!
+ * \brief Directly subscribe an application to a channel
+ *
+ * \param app_name Name of the application to subscribe.
+ * \param chan The channel to subscribe to
+ *
+ * \return \ref stasis_app_subscribe_res return code.
+ *
+ * \note This method can be used when you already hold a channel and its
+ *       lock. This bypasses the channel lookup that would normally be
+ *       performed by \ref stasis_app_subscribe.
+ */
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+       struct ast_channel *chan);
+
 /*! @} */
 
 /*! @{ */
index 6cc00ce418ba55dbc82be7eddc92485577527e48..393609298f0bed313837ad1723e668e67cded8dd 100644 (file)
@@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_app_snoop.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/causes.h"
+#include "asterisk/core_local.h"
 #include "resource_channels.h"
 
 #include <limits.h>
@@ -775,6 +776,7 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
        struct ast_format tmp_fmt;
        char *stuff;
        struct ast_channel *chan;
+       struct ast_channel *local_peer;
        RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
        struct ast_assigned_ids assignedids = {
                .uniqueid = args_channel_id,
@@ -859,20 +861,24 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
                return;
        }
 
-       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
-       ast_channel_unlock(chan);
+       /* See if this is a Local channel and if so, get the peer */
+       local_peer = ast_local_get_peer(chan);
 
        if (!ast_strlen_zero(args_app)) {
-               /* channel: + channel ID + null terminator */
-               char uri[9 + strlen(ast_channel_uniqueid(chan))];
-               const char *uris[1] = { uri, };
-
-               sprintf(uri, "channel:%s", ast_channel_uniqueid(chan));
-               stasis_app_subscribe(args_app, uris, 1, NULL);
+               stasis_app_subscribe_channel(args_app, chan);
+               if (local_peer) {
+                       stasis_app_subscribe_channel(args_app, local_peer);
+               }
        }
 
+       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+       ast_channel_unlock(chan);
+
        ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL));
        ast_channel_unref(chan);
+       if (local_peer) {
+               ast_channel_unref(local_peer);
+       }
 }
 
 void ast_ari_channels_originate_with_id(struct ast_variable *headers,
index 0184d209ca38344ef8deb2e3dbc250de764f4023..ff74245037415a691ac72682b41a6a951117cd96 100644 (file)
@@ -1225,6 +1225,29 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
        return STASIS_ASR_OK;
 }
 
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+       struct ast_channel *chan)
+{
+       RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+       int res;
+
+       if (!app) {
+               return STASIS_ASR_APP_NOT_FOUND;
+       }
+
+       ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
+
+       res = app_subscribe_channel(app, chan);
+       if (res != 0) {
+               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+                       app_name, ast_channel_uniqueid(chan));
+               return STASIS_ASR_INTERNAL_ERROR;
+       }
+
+       return STASIS_ASR_OK;
+}
+
+
 /*!
  * \internal
  * \brief Subscribe an app to an event source.
index 4dcb635efef290ca9b38ca5df06206f9d638c8e3..41f6ccf65969d3a714021bddaa4737cb3cf13db5 100644 (file)
@@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_message_router.h"
 
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
+
 struct stasis_app {
        /*! Aggregation topic for this application. */
        struct stasis_topic *topic;
@@ -449,7 +451,7 @@ static struct ast_json *channel_callerid(
 static channel_snapshot_monitor channel_monitors[] = {
        channel_state,
        channel_dialplan,
-       channel_callerid
+       channel_callerid,
 };
 
 static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data,
                        app_send(app, msg);
                }
        }
+
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+       }
 }
 
 static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data,
        struct stasis_app *app = data;
        struct stasis_cache_update *update;
        struct ast_endpoint_snapshot *new_snapshot;
+       struct ast_endpoint_snapshot *old_snapshot;
        const struct timeval *tv;
 
        ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data,
        ast_assert(update->type == ast_endpoint_snapshot_type());
 
        new_snapshot = stasis_message_data(update->new_snapshot);
-       tv = update->new_snapshot ?
-               stasis_message_timestamp(update->new_snapshot) :
-               stasis_message_timestamp(message);
+       old_snapshot = stasis_message_data(update->old_snapshot);
 
-       json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+       if (new_snapshot) {
+               tv = stasis_message_timestamp(update->new_snapshot);
 
-       if (!json) {
-               return;
+               json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+               if (!json) {
+                       return;
+               }
+
+               app_send(app, json);
        }
 
-       app_send(app, json);
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "endpoint", old_snapshot->id, 1);
+       }
 }
 
 static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data,
                json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
        }
 
-       if (!json) {
-               return;
+       if (json) {
+               app_send(app, json);
        }
 
-       app_send(app, json);
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+       }
 }
 
 
@@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
        return app_subscribe_channel(app, obj);
 }
 
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
 {
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
        forwards->interested--;
 
        ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
-       if (forwards->interested == 0) {
+       if (forwards->interested == 0 || terminate) {
                /* No one is interested any more; unsubscribe */
                ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
                forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
                return -1;
        }
 
-       return unsubscribe(app, "channel", channel_id);
+       return unsubscribe(app, "channel", channel_id, 0);
 }
 
 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
                return -1;
        }
 
-       return unsubscribe(app, "bridge", bridge_id);
+       return unsubscribe(app, "bridge", bridge_id, 0);
 }
 
 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
                return -1;
        }
 
-       return unsubscribe(app, "endpoint", endpoint_id);
+       return unsubscribe(app, "endpoint", endpoint_id, 0);
 }
 
 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)