]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
chan_websocket: Add ability to place a MARK in the media stream.
authorGeorge Joseph <gjoseph@sangoma.com>
Wed, 5 Nov 2025 21:27:32 +0000 (14:27 -0700)
committergithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Wed, 12 Nov 2025 21:24:27 +0000 (21:24 +0000)
Also cleaned up a few unused #if blocks, and started sending a few ERROR
events back to the apps.

Resolves: #1574

DeveloperNote: Apps can now send a `MARK_MEDIA` command with an optional
`correlation_id` parameter to chan_websocket which will be placed in the
media frame queue. When that frame is dequeued after all intervening media
has been played to the core, chan_websocket will send a
`MEDIA_MARK_PROCESSED` event to the app with the same correlation_id
(if any).

channels/chan_websocket.c

index 4c5f92cc65bc50eaa69ad2b0757e7e364fdc170b..8235ccebc34b11744df84c7c0a8048d76c5d56ff 100644 (file)
@@ -112,22 +112,13 @@ struct websocket_pvt {
 #define HANGUP_CHANNEL "HANGUP"
 #define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
 #define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
+#define MARK_MEDIA "MARK_MEDIA"
 #define FLUSH_MEDIA "FLUSH_MEDIA"
 #define GET_DRIVER_STATUS "GET_STATUS"
 #define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
 #define PAUSE_MEDIA "PAUSE_MEDIA"
 #define CONTINUE_MEDIA "CONTINUE_MEDIA"
 
-#if 0
-#define MEDIA_START "MEDIA_START"
-#define MEDIA_XON "MEDIA_XON"
-#define MEDIA_XOFF "MEDIA_XOFF"
-#define QUEUE_DRAINED "QUEUE_DRAINED"
-#define DRIVER_STATUS "STATUS"
-#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED"
-#define DTMF_END "DTMF_END"
-#endif
-
 #define QUEUE_LENGTH_MAX 1000
 #define QUEUE_LENGTH_XOFF_LEVEL 900
 #define QUEUE_LENGTH_XON_LEVEL 800
@@ -272,6 +263,36 @@ static char *_create_event_MEDIA_BUFFERING_COMPLETED(struct websocket_pvt *insta
        return payload;
 }
 
+/*!
+ * \internal
+ * \brief Print the MEDIA_MARK_PROCESSED event.
+ * \warning Do not call directly.
+ */
+static char *_create_event_MEDIA_MARK_PROCESSED(struct websocket_pvt *instance,
+       const char *id)
+{
+       char *payload = NULL;
+       if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
+               struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
+                       "event", "MEDIA_MARK_PROCESSED",
+                       "channel_id", ast_channel_uniqueid(instance->channel),
+                       "correlation_id", S_OR(id, "")
+                       );
+               if (!msg) {
+                       return NULL;
+               }
+               payload = ast_json_dump_string_format(msg, AST_JSON_COMPACT);
+               ast_json_unref(msg);
+       } else {
+               ast_asprintf(&payload, "%s%s%s",
+                       "MEDIA_MARK_PROCESSED",
+                       S_COR(id, " ",""), S_OR(id, ""));
+
+       }
+
+       return payload;
+}
+
 /*!
  * \internal
  * \brief Print the DTMF_END event.
@@ -345,15 +366,27 @@ static char *_create_event_STATUS(struct websocket_pvt *instance)
  * \brief Print the ERROR event.
  * \warning Do not call directly.
  */
-static char *_create_event_ERROR(struct websocket_pvt *instance,
-       const char *error_text)
+static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR(
+       struct websocket_pvt *instance, const char *format, ...)
 {
        char *payload = NULL;
+       char *error_text = NULL;
+       va_list ap;
+       int res = 0;
+
+       va_start(ap, format);
+       res = ast_vasprintf(&error_text, format, ap);
+       va_end(ap);
+       if (res < 0 || !error_text) {
+               return NULL;
+       }
+
        if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
                struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
                        "event", "ERROR",
                        "channel_id", ast_channel_uniqueid(instance->channel),
                        "error_text", error_text);
+               ast_free(error_text);
                if (!msg) {
                        return NULL;
                }
@@ -362,6 +395,7 @@ static char *_create_event_ERROR(struct websocket_pvt *instance,
        } else {
                ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
                        "ERROR", ast_channel_uniqueid(instance->channel), error_text);
+               ast_free(error_text);
        }
 
        return payload;
@@ -722,6 +756,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
 
        } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -743,6 +778,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
                }
 
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -767,10 +803,40 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
                res = queue_option_frame(instance, option);
                ast_free(option);
 
+       } else if (ast_strings_equal(command, MARK_MEDIA)) {
+               const char *id;
+               char *option;
+               SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
+                       AST_LIST_UNLOCK);
+
+               if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
+                       ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
+                               ast_channel_name(instance->channel), command);
+                       return 0;
+               }
+
+               if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
+                       id = ast_json_object_string_get(json, "correlation_id");
+               } else {
+                       id = data;
+               }
+
+               ast_debug(4, "%s: %s %s\n",
+                       ast_channel_name(instance->channel), MARK_MEDIA, id);
+
+               option = create_event(instance, MEDIA_MARK_PROCESSED, id);
+               if (!option) {
+                       return -1;
+               }
+               res = queue_option_frame(instance, option);
+               ast_free(option);
+
        } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
                struct ast_frame *frame = NULL;
 
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -787,6 +853,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
 
        } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -801,6 +868,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
 
        } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -811,6 +879,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
 
        } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
                if (instance->passthrough) {
+                       send_event(instance, ERROR, "%s not supported in passthrough mode", command);
                        ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
                                ast_channel_name(instance->channel), command);
                        return 0;
@@ -1515,7 +1584,8 @@ static struct ast_channel *webchan_request(const char *type,
        );
        struct ast_flags opts = { 0, };
        char *opt_args[OPT_ARG_ARRAY_SIZE];
-       const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel";
+       const char *requestor_name = requestor ? ast_channel_name(requestor) :
+               (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
        RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
 
        global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
@@ -1547,16 +1617,12 @@ static struct ast_channel *webchan_request(const char *type,
 
        if (ast_test_flag(&opts, OPT_WS_CODEC)
                && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
-               ast_debug(3, "%s: Using specified format %s\n",
-                       requestor_name, opt_args[OPT_ARG_WS_CODEC]);
                fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
        } else {
                /*
                 * If codec wasn't specified in the dial string,
                 * use the first format in the capabilities.
                 */
-               ast_debug(3, "%s: Using format %s from requesting channel\n",
-                       requestor_name, opt_args[OPT_ARG_WS_CODEC]);
                fmt = ast_format_cap_get_format(cap, 0);
        }
 
@@ -1566,6 +1632,10 @@ static struct ast_channel *webchan_request(const char *type,
                goto failure;
        }
 
+       ast_debug(3, "%s: Using format %s from %s\n",
+               requestor_name, ast_format_get_name(fmt),
+               ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
+
        instance = websocket_new(requestor_name, args.connection_id, fmt);
        if (!instance) {
                ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",