"channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
}
-STASIS_MESSAGE_TYPE_DEFN(app_end_message_type,
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type,
.to_json = stasis_end_to_json);
struct start_message_blob {
struct stasis_message *msg;
int i;
+ if (app_subscribe_channel(app, chan)) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name(app), ast_channel_name(chan));
+ return -1;
+ }
+
payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
if (!payload) {
ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
payload->channel = ao2_bump(snapshot);
payload->replace_channel = ao2_bump(replace_channel_snapshot);
- json_blob = ast_json_pack("{s: o, s: []}",
+ json_blob = ast_json_pack("{s: s, s: o, s: []}",
+ "app", app_name(app),
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
"args");
if (!json_blob) {
return -1;
}
- stasis_publish(ast_channel_topic(chan), msg);
+ if (replace_channel_snapshot) {
+ app_unsubscribe_channel_id(app, replace_channel_snapshot->uniqueid);
+ }
+ stasis_publish(ast_app_get_topic(app), msg);
ao2_ref(msg, -1);
return 0;
}
{
struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
struct ast_json *blob;
+ struct stasis_message *msg;
if (sanitize && sanitize->channel
&& sanitize->channel(chan)) {
return -1;
}
- stasis_app_channel_set_stasis_end_published(chan);
remove_masquerade_store(chan);
- ast_channel_publish_blob(chan, app_end_message_type(), blob);
-
+ app_unsubscribe_channel(app, chan);
+ msg = ast_channel_blob_create(chan, end_message_type(), blob);
+ if (msg) {
+ stasis_publish(ast_app_get_topic(app), msg);
+ }
+ ao2_cleanup(msg);
ast_json_unref(blob);
return 0;
}
/* send the StasisEnd message to the app */
+ stasis_app_channel_set_stasis_end_published(new_chan);
app_send_end_msg(control_app(control), new_chan);
/* remove the datastore */
/* send the StasisEnd message to the app */
app_send_end_msg(control_app(control), old_chan);
- /* fixup channel topic forwards */
- if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
- ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n",
- old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control)));
- }
ao2_cleanup(control);
}
return -1;
}
- res = app_subscribe_channel(app, chan);
- if (res != 0) {
- ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
- app_name, ast_channel_name(chan));
- remove_masquerade_store(chan);
- return -1;
- }
-
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
ao2_cleanup(app_bridges_playback);
app_bridges_playback = NULL;
- STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(end_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
return 0;
return &app_sanitizer;
}
-void app_end_message_handler(struct stasis_message *message)
-{
- struct ast_channel_blob *payload;
- struct ast_channel_snapshot *snapshot;
- const char *app_name;
- char *channel_uri;
- size_t alloc_size;
- const char *channels[1];
-
- payload = stasis_message_data(message);
- snapshot = payload->snapshot;
- app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
-
- /* +8 is for the length of "channel:" */
- alloc_size = AST_MAX_UNIQUEID + 8;
- channel_uri = ast_alloca(alloc_size);
- snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid);
-
- channels[0] = channel_uri;
- stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
-}
-
static const struct ast_datastore_info stasis_internal_channel_info = {
.type = "stasis-internal-channel",
};
if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
return AST_MODULE_LOAD_DECLINE;
}
- if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) {
+ if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) {
return AST_MODULE_LOAD_DECLINE;
}
apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
call_forwarded_handler(app, message);
}
- if (stasis_message_type(message) == app_end_message_type()) {
- app_end_message_handler(message);
- }
-
/* By default, send any message that has a JSON representation */
json = stasis_message_to_json(message, stasis_app_get_sanitizer());
if (!json) {
return forwards != NULL;
}
-int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan)
-{
- RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup);
- struct app_forwards *new_forwards;
-
- old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
- if (!old_forwards) {
- return -1;
- }
-
- new_forwards = forwards_create_channel(app, new_chan);
- if (!new_forwards) {
- return -1;
- }
-
- new_forwards->interested = old_forwards->interested;
- ao2_link_flags(app->forwards, new_forwards, 0);
- ao2_cleanup(new_forwards);
-
- /* Clean up old forwards */
- forwards_unsubscribe(old_forwards);
- return 0;
-}
-
static void *channel_find(const struct stasis_app *app, const char *id)
{
return ast_channel_get_by_name(id);
*/
char *app_get_replace_channel_app(struct ast_channel *chan);
-/*!
- * \brief Replace channel topic forwards for the old channel with forwards for the new channel
- *
- * \param app The app that owns the channel
- * \param old_id The unique ID of the channel to be replaced
- * \param new_chan The channel that is replacing the old one
- *
- * \retval zero on success
- * \return non-zero on failure
- */
-int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan);
-
/*!
* \brief Send StasisEnd message to the listening app
*
*/
int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan);
-/*!
- * \brief Handle cleanup related to StasisEnd messages
- *
- * \param message The message for which to clean up
- */
-void app_end_message_handler(struct stasis_message *message);
-
-/*!
- * \brief Accessor for the StasisEnd message type
- */
-struct stasis_message_type *app_end_message_type(void);
-
#endif /* _ASTERISK_RES_STASIS_APP_H */