]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Stasis: Fix StasisEnd message ordering
authorKinsey Moore <kmoore@digium.com>
Thu, 13 Nov 2014 15:42:28 +0000 (15:42 +0000)
committerKinsey Moore <kmoore@digium.com>
Thu, 13 Nov 2014 15:42:28 +0000 (15:42 +0000)
This change corrects message ordering in cases where a channel-related
message can be received after a Stasis/ARI application has received the
StasisEnd message. The StasisEnd message was being passed to
applications directly without waiting for the channel topic to empty.

As a result of this fix, other bugs were also identified and fixed:
* StasisStart messages were also being sent directly to apps and are
  now routed through the stasis message bus properly
* Masquerade monitor datastores were being removed at the incorrect
  time in some cases and were causing StasisEnd messages to not be sent
* General refactoring where necessary for the above
* Unsubscription on StasisEnd timing changes to prevent additional
  messages from following the StasisEnd when they shouldn't

A channel sanitization function pointer was added to reduce processing
and AO2 lookups.

Review: https://reviewboard.asterisk.org/r/4163/
ASTERISK-24501 #close
Reported by: Matt Jordan

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@427788 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis.h
include/asterisk/stasis_app.h
res/res_stasis.c
res/stasis/app.c
res/stasis/app.h
res/stasis/stasis_bridge.c

index 4c4052c144f80b1e512578f0384f897ed5891266..55ebb45148ad7cdebf848a74d04bd0c327b5d9e1 100644 (file)
@@ -221,6 +221,17 @@ struct stasis_message_sanitizer {
         * \retval zero if the channel should remain in the message
         */
        int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
+
+       /*!
+        * \brief Callback which determines whether a channel should be sanitized from
+        * a message based on the channel
+        *
+        * \param chan The channel to be checked
+        *
+        * \retval non-zero if the channel should be left out of the message
+        * \retval zero if the channel should remain in the message
+        */
+       int (*channel)(const struct ast_channel *chan);
 };
 
 /*!
index 5a7593d67781af506a0e77773f6b704639486028..4373a62728f12033688c7202f95892277a6df11a 100644 (file)
@@ -799,11 +799,6 @@ void stasis_app_unref(void);
  */
 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void);
 
-/*!
- * \brief Stasis message type for a StasisEnd event
- */
-struct stasis_message_type *ast_stasis_end_message_type(void);
-
 /*!
  * \brief Indicate that this channel has had a StasisEnd published for it
  *
index b848fafad36fabc97a5a99fdcf562ae06a61bd79..47ce1200c640dceec80a34cf37a87a5093d37421 100644 (file)
@@ -108,30 +108,68 @@ struct ao2_container *app_bridges_moh;
 
 struct ao2_container *app_bridges_playback;
 
-static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot,
+static struct ast_json *stasis_end_to_json(struct stasis_message *message,
                const struct stasis_message_sanitizer *sanitize)
 {
+       struct ast_channel_blob *payload = stasis_message_data(message);
+
+       if (sanitize && sanitize->channel_snapshot &&
+                       sanitize->channel_snapshot(payload->snapshot)) {
+               return NULL;
+       }
+
        return ast_json_pack("{s: s, s: o, s: o}",
                "type", "StasisEnd",
                "timestamp", ast_json_timeval(ast_tvnow(), NULL),
-               "channel", ast_channel_snapshot_to_json(snapshot, sanitize));
+               "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
 }
 
-static struct ast_json *stasis_end_to_json(struct stasis_message *message,
+STASIS_MESSAGE_TYPE_DEFN(app_end_message_type,
+       .to_json = stasis_end_to_json);
+
+struct start_message_blob {
+       struct ast_channel_snapshot *channel;           /*!< Channel that is entering Stasis() */
+       struct ast_channel_snapshot *replace_channel;   /*!< Channel that is being replaced (optional) */
+       struct ast_json *blob;                          /*!< JSON blob containing timestamp and args */
+};
+
+static struct ast_json *stasis_start_to_json(struct stasis_message *message,
                const struct stasis_message_sanitizer *sanitize)
 {
-       struct ast_channel_blob *payload = stasis_message_data(message);
+       struct start_message_blob *payload = stasis_message_data(message);
+       struct ast_json *msg;
 
        if (sanitize && sanitize->channel_snapshot &&
-                       sanitize->channel_snapshot(payload->snapshot)) {
+                       sanitize->channel_snapshot(payload->channel)) {
                return NULL;
        }
 
-       return stasis_end_json_payload(payload->snapshot, sanitize);
+       msg = ast_json_pack("{s: s, s: O, s: O, s: o}",
+               "type", "StasisStart",
+               "timestamp", ast_json_object_get(payload->blob, "timestamp"),
+               "args", ast_json_object_get(payload->blob, "args"),
+               "channel", ast_channel_snapshot_to_json(payload->channel, NULL));
+       if (!msg) {
+               ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n");
+               return NULL;
+       }
+
+       if (payload->replace_channel) {
+               int res = ast_json_object_set(msg, "replace_channel",
+                       ast_channel_snapshot_to_json(payload->replace_channel, NULL));
+
+               if (res) {
+                       ast_json_unref(msg);
+                       ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n");
+                       return NULL;
+               }
+       }
+
+       return msg;
 }
 
-STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type,
-       .to_json = stasis_end_to_json);
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type,
+       .to_json = stasis_start_to_json);
 
 const char *stasis_app_name(const struct stasis_app *app)
 {
@@ -862,51 +900,64 @@ char *app_get_replace_channel_app(struct ast_channel *chan)
        return replace_channel_app;
 }
 
-static int send_start_msg_snapshots(struct stasis_app *app,
+static void start_message_blob_dtor(void *obj)
+{
+       struct start_message_blob *payload = obj;
+
+       ao2_cleanup(payload->channel);
+       ao2_cleanup(payload->replace_channel);
+       ast_json_unref(payload->blob);
+}
+
+static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app,
        int argc, char *argv[], struct ast_channel_snapshot *snapshot,
        struct ast_channel_snapshot *replace_channel_snapshot)
 {
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+       RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref);
        struct ast_json *json_args;
-       struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+       RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup);
+       struct stasis_message *msg;
        int i;
 
-       if (sanitize && sanitize->channel_snapshot
-               && sanitize->channel_snapshot(snapshot)) {
-               return 0;
-       }
-
-       msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
-               "type", "StasisStart",
-               "timestamp", ast_json_timeval(ast_tvnow(), NULL),
-               "args",
-               "channel", ast_channel_snapshot_to_json(snapshot, NULL));
-       if (!msg) {
+       payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
+       if (!payload) {
+               ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
                return -1;
        }
 
-       if (replace_channel_snapshot) {
-               int res = ast_json_object_set(msg, "replace_channel",
-                       ast_channel_snapshot_to_json(replace_channel_snapshot, NULL));
+       payload->channel = ao2_bump(snapshot);
+       payload->replace_channel = ao2_bump(replace_channel_snapshot);
 
-               if (res) {
-                       return -1;
-               }
+       json_blob = ast_json_pack("{s: o, s: []}",
+               "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+               "args");
+       if (!json_blob) {
+               ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
+               return -1;
        }
 
        /* Append arguments to args array */
-       json_args = ast_json_object_get(msg, "args");
+       json_args = ast_json_object_get(json_blob, "args");
        ast_assert(json_args != NULL);
        for (i = 0; i < argc; ++i) {
                int r = ast_json_array_append(json_args,
                                              ast_json_string_create(argv[i]));
                if (r != 0) {
-                       ast_log(LOG_ERROR, "Error appending start message\n");
+                       ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
                        return -1;
                }
        }
 
-       app_send(app, msg);
+       payload->blob = ast_json_ref(json_blob);
+
+       msg = stasis_message_create(start_message_type(), payload);
+       if (!msg) {
+               ast_log(LOG_ERROR, "Error sending StasisStart message\n");
+               return -1;
+       }
+
+       stasis_publish(ast_channel_topic(chan), msg);
+       ao2_ref(msg, -1);
        return 0;
 }
 
@@ -928,31 +979,36 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
        if (!snapshot) {
                return -1;
        }
-       return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot);
+       return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
 }
 
-static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot)
+static void remove_masquerade_store(struct ast_channel *chan);
+
+int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
 {
        struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
-       struct ast_json *msg;
+       struct ast_json *blob;
 
-       if (sanitize && sanitize->channel_snapshot
-               && sanitize->channel_snapshot(snapshot)) {
+       if (sanitize && sanitize->channel
+               && sanitize->channel(chan)) {
                return 0;
        }
 
-       msg = stasis_end_json_payload(snapshot, sanitize);
-       if (!msg) {
+       blob = ast_json_pack("{s: s}", "app", app_name(app));
+       if (!blob) {
+               ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
                return -1;
        }
 
-       app_send(app, msg);
-       ast_json_unref(msg);
+       stasis_app_channel_set_stasis_end_published(chan);
+       remove_masquerade_store(chan);
+       ast_channel_publish_blob(chan, app_end_message_type(), blob);
+
+       ast_json_unref(blob);
+
        return 0;
 }
 
-static void remove_masquerade_store(struct ast_channel *chan);
-
 static int masq_match_cb(void *obj, void *data, int flags)
 {
        struct stasis_app_control *control = obj;
@@ -968,32 +1024,22 @@ static int masq_match_cb(void *obj, void *data, int flags)
 
 static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
 {
-       struct ast_channel_snapshot *snapshot;
        struct stasis_app_control *control;
 
-       /* grab a snapshot */
-       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
-       if (!snapshot) {
-               ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
-               return;
-       }
-
        /* find control */
        control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
        if (!control) {
                ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
-               ao2_cleanup(snapshot);
                return;
        }
 
        /* send the StasisEnd message to the app */
-       send_end_msg_snapshot(control_app(control), snapshot);
+       app_send_end_msg(control_app(control), new_chan);
 
        /* remove the datastore */
        remove_masquerade_store(old_chan);
 
        ao2_cleanup(control);
-       ao2_cleanup(snapshot);
 }
 
 static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
@@ -1032,10 +1078,10 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct
 
 
        /* send the StasisStart with replace_channel to the app */
-       send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot,
+       send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot,
                old_snapshot);
        /* send the StasisEnd message to the app */
-       send_end_msg_snapshot(control_app(control), old_snapshot);
+       app_send_end_msg(control_app(control), old_chan);
 
        /* fixup channel topic forwards */
        if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
@@ -1090,33 +1136,6 @@ static void remove_masquerade_store(struct ast_channel *chan)
        ast_datastore_free(datastore);
 }
 
-static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
-{
-       struct ast_channel_snapshot *snapshot;
-       int res = 0;
-
-       ast_assert(chan != NULL);
-
-       /* A masquerade has occurred and this message will be wrong so it
-        * has already been sent elsewhere. */
-       if (!has_masquerade_store(chan)) {
-               return 0;
-       }
-
-       /* Set channel info */
-       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
-       if (!snapshot) {
-               return -1;
-       }
-
-       if (send_end_msg_snapshot(app, snapshot)) {
-               res = -1;
-       }
-
-       ao2_cleanup(snapshot);
-       return res;
-}
-
 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
 {
        while (!control_is_done(control)) {
@@ -1232,18 +1251,18 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                return -1;
        }
 
-       res = send_start_msg(app, chan, argc, argv);
+       res = app_subscribe_channel(app, chan);
        if (res != 0) {
-               ast_log(LOG_ERROR,
-                       "Error sending start message to '%s'\n", app_name);
+               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+                       app_name, ast_channel_name(chan));
                remove_masquerade_store(chan);
                return -1;
        }
 
-       res = app_subscribe_channel(app, chan);
+       res = send_start_msg(app, chan, argc, argv);
        if (res != 0) {
-               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
-                       app_name, ast_channel_name(chan));
+               ast_log(LOG_ERROR,
+                       "Error sending start message to '%s'\n", app_name);
                remove_masquerade_store(chan);
                return -1;
        }
@@ -1327,9 +1346,9 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
 
        /* Only publish a stasis_end event if it hasn't already been published */
        if (!stasis_app_channel_is_stasis_end_published(chan)) {
-               app_unsubscribe_channel(app, chan);
-               res = send_end_msg(app, chan);
-               remove_masquerade_store(chan);
+               /* A masquerade has occurred and this message will be wrong so it
+                * has already been sent elsewhere. */
+               res = has_masquerade_store(chan) && app_send_end_msg(app, chan);
                if (res != 0) {
                        ast_log(LOG_ERROR,
                                "Error sending end message to %s\n", app_name);
@@ -1849,15 +1868,8 @@ void stasis_app_unref(void)
        ast_module_unref(ast_module_info->self);
 }
 
-/*!
- * \brief Subscription to StasisEnd events
- */
-struct stasis_subscription *stasis_end_sub;
-
 static int unload_module(void)
 {
-       stasis_end_sub = stasis_unsubscribe(stasis_end_sub);
-
        stasis_app_unregister_event_sources();
 
        messaging_cleanup();
@@ -1878,7 +1890,8 @@ static int unload_module(void)
        ao2_cleanup(app_bridges_playback);
        app_bridges_playback = NULL;
 
-       STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
 
        return 0;
 }
@@ -1892,6 +1905,15 @@ static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapsho
        return 1;
 }
 
+/* \brief Sanitization callback for channels */
+static int channel_sanitizer(const struct ast_channel *chan)
+{
+       if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) {
+               return 0;
+       }
+       return 1;
+}
+
 /* \brief Sanitization callback for channel unique IDs */
 static int channel_id_sanitizer(const char *id)
 {
@@ -1904,6 +1926,7 @@ static int channel_id_sanitizer(const char *id)
 struct stasis_message_sanitizer app_sanitizer = {
        .channel_id = channel_id_sanitizer,
        .channel_snapshot = channel_snapshot_sanitizer,
+       .channel = channel_sanitizer,
 };
 
 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
@@ -1911,21 +1934,7 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
        return &app_sanitizer;
 }
 
-static void remove_masquerade_store_by_name(const char *channel_name)
-{
-       struct ast_channel *chan;
-
-       chan = ast_channel_get_by_name(channel_name);
-       if (!chan) {
-               return;
-       }
-
-       remove_masquerade_store(chan);
-       ast_channel_unref(chan);
-}
-
-static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
-               struct stasis_message *message)
+void app_end_message_handler(struct stasis_message *message)
 {
        struct ast_channel_blob *payload;
        struct ast_channel_snapshot *snapshot;
@@ -1934,10 +1943,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
        size_t alloc_size;
        const char *channels[1];
 
-       if (stasis_message_type(message) != ast_stasis_end_message_type()) {
-               return;
-       }
-
        payload = stasis_message_data(message);
        snapshot = payload->snapshot;
        app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
@@ -1949,8 +1954,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
 
        channels[0] = channel_uri;
        stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
-
-       remove_masquerade_store_by_name(snapshot->name);
 }
 
 static const struct ast_datastore_info stasis_internal_channel_info = {
@@ -2023,7 +2026,10 @@ int stasis_app_channel_is_internal(struct ast_channel *chan)
 
 static int load_module(void)
 {
-       if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) {
+       if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
+               return AST_MODULE_LOAD_DECLINE;
+       }
+       if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) {
                return AST_MODULE_LOAD_DECLINE;
        }
        apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
@@ -2049,12 +2055,6 @@ static int load_module(void)
 
        stasis_app_register_event_sources();
 
-       stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL);
-       if (!stasis_end_sub) {
-               unload_module();
-               return AST_MODULE_LOAD_DECLINE;
-       }
-
        return AST_MODULE_LOAD_SUCCESS;
 }
 
index aa35e368ff74e2d6db37a183f304d050e80a278b..e3bd6ff0c0e02a9d65a013e6ebd319bf87545574 100644 (file)
@@ -302,6 +302,10 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub,
                call_forwarded_handler(app, message);
        }
 
+       if (stasis_message_type(message) == app_end_message_type()) {
+               app_end_message_handler(message);
+       }
+
        /* By default, send any message that has a JSON representation */
        json = stasis_message_to_json(message, stasis_app_get_sanitizer());
        if (!json) {
index 1ab6097a78e89167d6d1a9686daf3e1031ab3ad0..63143e0268bcdd51c81ff5749f2921ada9d9b8f3 100644 (file)
@@ -270,4 +270,27 @@ char *app_get_replace_channel_app(struct ast_channel *chan);
  */
 int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan);
 
+/*!
+ * \brief Send StasisEnd message to the listening app
+ *
+ * \param app The app that owns the channel
+ * \param chan The channel for which the message is being sent
+ *
+ * \retval zero on success
+ * \return non-zero on failure
+ */
+int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan);
+
+/*!
+ * \brief Handle cleanup related to StasisEnd messages
+ *
+ * \param message The message for which to clean up
+ */
+void app_end_message_handler(struct stasis_message *message);
+
+/*!
+ * \brief Accessor for the StasisEnd message type
+ */
+struct stasis_message_type *app_end_message_type(void);
+
 #endif /* _ASTERISK_RES_STASIS_APP_H */
index 7229a87d5a4d5df46bc946f52d1f7627172a0b22..fd984ab8648bffb3a04cfca86447f1a7755cd9d5 100644 (file)
@@ -164,7 +164,6 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void
 {
        if (src->v_table == &bridge_stasis_v_table &&
                        dst->v_table != &bridge_stasis_v_table) {
-               RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
                RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
                struct ast_channel *chan;
 
@@ -176,11 +175,7 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void
                        return -1;
                }
 
-               blob = ast_json_pack("{s: s}", "app", app_name(control_app(control)));
-
-               stasis_app_channel_set_stasis_end_published(chan);
-
-               ast_channel_publish_blob(chan, ast_stasis_end_message_type(), blob);
+               app_send_end_msg(control_app(control), chan);
        }
 
        return -1;