From: George Joseph Date: Mon, 28 Apr 2025 16:39:50 +0000 (-0600) Subject: Media over Websocket Channel Driver X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5963e624e23bc48866fe98119f74f6d8bf69854a;p=thirdparty%2Fasterisk.git Media over Websocket Channel Driver * Created chan_websocket which can exchange media over both inbound and outbound websockets which the driver will frame and time. See http://s.asterisk.net/mow for more information. * res_http_websocket: Made defines for max message size public and converted a few nuisance verbose messages to debugs. * main/channel.c: Changed an obsolete nuisance error to a debug. * ARI channels: Updated externalMedia to include chan_websocket as a supported transport. UserNote: A new channel driver "chan_websocket" is now available. It can exchange media over both inbound and outbound websockets and will both frame and re-time the media it receives. See http://s.asterisk.net/mow for more information. UserNote: The ARI channels/externalMedia API now includes support for the WebSocket transport provided by chan_websocket. --- diff --git a/channels/chan_websocket.c b/channels/chan_websocket.c new file mode 100644 index 0000000000..58728af32a --- /dev/null +++ b/channels/chan_websocket.c @@ -0,0 +1,1517 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Corporation + * + * George Joseph + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \author George Joseph + * + * \brief Websocket Media Channel + * + * \ingroup channel_drivers + */ + +/*** MODULEINFO + res_http_websocket + res_websocket_client + core + ***/ + +#include "asterisk.h" + +#include "asterisk/app.h" +#include "asterisk/causes.h" +#include "asterisk/channel.h" +#include "asterisk/codec.h" +#include "asterisk/http_websocket.h" +#include "asterisk/format_cache.h" +#include "asterisk/frame.h" +#include "asterisk/lock.h" +#include "asterisk/mod_format.h" +#include "asterisk/module.h" +#include "asterisk/pbx.h" +#include "asterisk/uuid.h" +#include "asterisk/timing.h" +#include "asterisk/translate.h" +#include "asterisk/websocket_client.h" + +static struct ast_websocket_server *ast_ws_server; + +static struct ao2_container *instances = NULL; + +struct websocket_pvt { + enum ast_websocket_type type; + struct ast_websocket_client *client; + struct ast_websocket *websocket; + struct ast_format *native_format; + struct ast_codec *native_codec; + struct ast_format *slin_format; + struct ast_codec *slin_codec; + struct ast_channel *channel; + struct ast_timer *timer; + struct ast_frame silence; + struct ast_trans_pvt *translator; + AST_LIST_HEAD(, ast_frame) frame_queue; + pthread_t outbound_read_thread; + size_t bytes_read; + size_t leftover_len; + char *leftover_data; + int no_auto_answer; + int optimal_frame_size; + int bulk_media_in_progress; + int report_queue_drained; + int frame_queue_length; + int queue_full; + int queue_paused; + char connection_id[0]; +}; + +#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE" +#define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID" +#define INCOMING_CONNECTION_ID "INCOMING" + +#define ANSWER_CHANNEL "ANSWER" +#define HANGUP_CHANNEL "HANGUP" +#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING" +#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING" +#define FLUSH_MEDIA "FLUSH_MEDIA" +#define GET_DRIVER_STATUS "GET_STATUS" +#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED" +#define PAUSE_MEDIA "PAUSE_MEDIA" +#define CONTINUE_MEDIA "CONTINUE_MEDIA" + +#define MEDIA_START "MEDIA_START" +#define MEDIA_XON "MEDIA_XON" +#define MEDIA_XOFF "MEDIA_XOFF" +#define QUEUE_DRAINED "QUEUE_DRAINED" +#define DRIVER_STATUS "STATUS" +#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED" + +#define QUEUE_LENGTH_MAX 1000 +#define QUEUE_LENGTH_XOFF_LEVEL 900 +#define QUEUE_LENGTH_XON_LEVEL 800 +#define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1)) + +/* Forward declarations */ +static struct ast_channel *webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause); +static int webchan_call(struct ast_channel *ast, const char *dest, int timeout); +static struct ast_frame *webchan_read(struct ast_channel *ast); +static int webchan_write(struct ast_channel *ast, struct ast_frame *f); +static int webchan_hangup(struct ast_channel *ast); + +static struct ast_channel_tech websocket_tech = { + .type = "WebSocket", + .description = "Media over WebSocket Channel Driver", + .requester = webchan_request, + .call = webchan_call, + .read = webchan_read, + .write = webchan_write, + .hangup = webchan_hangup, +}; + +static void set_channel_format(struct websocket_pvt * instance, + struct ast_format *fmt) +{ + if (ast_format_cmp(ast_channel_rawreadformat(instance->channel), fmt) + == AST_FORMAT_CMP_NOT_EQUAL) { + ast_channel_set_rawreadformat(instance->channel, fmt); + ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt)); + } +} + +/* + * Reminder... This function gets called by webchan_read which is + * triggered by the channel timer firing. It always gets called + * every 20ms (or whatever the timer is set to) even if there are + * no frames in the queue. + */ +static struct ast_frame *dequeue_frame(struct websocket_pvt *instance) +{ + struct ast_frame *queued_frame = NULL; + SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, + AST_LIST_UNLOCK); + + /* + * If the queue is paused, don't read a frame. Processing + * will continue down the function and a silence frame will + * be sent in its place. + */ + if (instance->queue_paused) { + return NULL; + } + + /* + * We need to check if we need to send an XON before anything + * else because there are multiple escape paths in this function + * and we don't want to accidentally keep the queue in a "full" + * state. + */ + if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) { + instance->queue_full = 0; + ast_debug(4, "%s: WebSocket sending MEDIA_XON\n", + ast_channel_name(instance->channel)); + ast_websocket_write_string(instance->websocket, MEDIA_XON); + } + + queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list); + + /* + * If there are no frames in the queue, we need to + * return NULL so we can send a silence frame. We also need + * to send the QUEUE_DRAINED notification if we were requested + * to do so. + */ + if (!queued_frame) { + if (instance->report_queue_drained) { + instance->report_queue_drained = 0; + ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n", + ast_channel_name(instance->channel)); + ast_websocket_write_string(instance->websocket, QUEUE_DRAINED); + } + return NULL; + } + + /* + * The only way a control frame could be present here is as + * a result of us calling queue_option_frame() in response + * to an incoming TEXT command from the websocket. + * We'll be safe and make sure it's a AST_CONTROL_OPTION + * frame anyway. + * + * It's quite possible that there are multiple control frames + * in a row in the queue so we need to process consecutive ones + * immediately. + * + * In any case, processing a control frame MUST not use up + * a media timeslot so after all control frames have been + * processed, we need to read an audio frame and process it. + */ + while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) { + if (queued_frame->subclass.integer == AST_CONTROL_OPTION) { + /* + * We just need to send the data to the websocket. + * The data should already be NULL terminated. + */ + ast_websocket_write_string(instance->websocket, + queued_frame->data.ptr); + ast_debug(4, "%s: WebSocket sending %s\n", + ast_channel_name(instance->channel), (char *)queued_frame->data.ptr); + } + /* + * We do NOT send these to the core so we need to free + * the frame and grab the next one. If it's also a + * control frame, we need to process it otherwise + * continue down in the function. + */ + ast_frame_free(queued_frame, 0); + queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list); + /* + * Jut FYI... We didn't bump the queue length when we added the control + * frames so we don't need to decrement it here. + */ + } + + /* + * If, after reading all control frames, there are no frames + * left in the queue, we need to return NULL so we can send + * a silence frame. + */ + if (!queued_frame) { + return NULL; + } + + instance->frame_queue_length--; + + return queued_frame; +} +/*! + * \internal + * + * Called by the core channel thread each time the instance timer fires. + * + */ +static struct ast_frame *webchan_read(struct ast_channel *ast) +{ + struct websocket_pvt *instance = NULL; + struct ast_frame *native_frame = NULL; + struct ast_frame *slin_frame = NULL; + + instance = ast_channel_tech_pvt(ast); + if (!instance) { + return NULL; + } + + if (ast_timer_get_event(instance->timer) == AST_TIMING_EVENT_EXPIRED) { + ast_timer_ack(instance->timer, 1); + } + + native_frame = dequeue_frame(instance); + + /* + * No frame when the timer fires means we have to create and + * return a silence frame in its place. + */ + if (!native_frame) { + ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast)); + set_channel_format(instance, instance->slin_format); + slin_frame = ast_frdup(&instance->silence); + return slin_frame; + } + + /* + * If the frame length is already optimal_frame_size, we can just + * return it. + */ + if (native_frame->datalen == instance->optimal_frame_size) { + set_channel_format(instance, instance->native_format); + return native_frame; + } + + /* + * If we're here, we have a short frame that we need to pad + * with silence. + */ + + if (instance->translator) { + slin_frame = ast_translate(instance->translator, native_frame, 0); + if (!slin_frame) { + ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n", + ast_channel_name(ast), native_frame->datalen); + return NULL; + } + ast_frame_free(native_frame, 0); + } else { + /* + * If there was no translator then the native format + * was already slin. + */ + slin_frame = native_frame; + } + + set_channel_format(instance, instance->slin_format); + + /* + * So now we have an slin frame but it's probably still short + * so we create a new data buffer with the correct length + * which is filled with zeros courtesy of ast_calloc. + * We then copy the short frame data into the new buffer + * and set the offset to AST_FRIENDLY_OFFSET so that + * the core can read the data without any issues. + * If the original frame data was mallocd, we need to free the old + * data buffer so we don't leak memory and we need to set + * mallocd to AST_MALLOCD_DATA so that the core knows + * it needs to free the new data buffer when it's done. + */ + + if (slin_frame->datalen != instance->silence.datalen) { + char *old_data = slin_frame->data.ptr; + int old_len = slin_frame->datalen; + int old_offset = slin_frame->offset; + ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n", + ast_channel_name(ast), instance->silence.datalen, + slin_frame->datalen); + + slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET); + if (!slin_frame->data.ptr) { + ast_frame_free(slin_frame, 0); + return NULL; + } + slin_frame->data.ptr += AST_FRIENDLY_OFFSET; + slin_frame->offset = AST_FRIENDLY_OFFSET; + memcpy(slin_frame->data.ptr, old_data, old_len); + if (slin_frame->mallocd & AST_MALLOCD_DATA) { + ast_free(old_data - old_offset); + } + slin_frame->mallocd |= AST_MALLOCD_DATA; + slin_frame->datalen = instance->silence.datalen; + slin_frame->samples = instance->silence.samples; + } + + return slin_frame; +} + +static int queue_frame_from_buffer(struct websocket_pvt *instance, + char *buffer, size_t len) +{ + struct ast_frame fr = { 0, }; + struct ast_frame *duped_frame = NULL; + + AST_FRAME_SET_BUFFER(&fr, buffer, 0, len); + fr.frametype = AST_FRAME_VOICE; + fr.subclass.format = instance->native_format; + fr.samples = instance->native_codec->samples_count(&fr); + + duped_frame = ast_frisolate(&fr); + if (!duped_frame) { + ast_log(LOG_WARNING, "%s: Failed to isolate frame\n", + ast_channel_name(instance->channel)); + return -1; + } + + { + SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, + AST_LIST_UNLOCK); + AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list); + instance->frame_queue_length++; + if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) { + instance->queue_full = 1; + ast_debug(4, "%s: WebSocket sending %s\n", + ast_channel_name(instance->channel), MEDIA_XOFF); + ast_websocket_write_string(instance->websocket, MEDIA_XOFF); + } + } + + ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel), + duped_frame->datalen); + + return 0; +} + +static int queue_option_frame(struct websocket_pvt *instance, + char *buffer) +{ + struct ast_frame fr = { 0, }; + struct ast_frame *duped_frame = NULL; + + AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1); + fr.frametype = AST_FRAME_CONTROL; + fr.subclass.integer = AST_CONTROL_OPTION; + + duped_frame = ast_frisolate(&fr); + if (!duped_frame) { + ast_log(LOG_WARNING, "%s: Failed to isolate frame\n", + ast_channel_name(instance->channel)); + return -1; + } + + AST_LIST_LOCK(&instance->frame_queue); + AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list); + AST_LIST_UNLOCK(&instance->frame_queue); + + ast_debug(4, "%s: Queued '%s' option frame\n", + ast_channel_name(instance->channel), buffer); + + return 0; +} + +static int process_text_message(struct websocket_pvt *instance, + char *payload, uint64_t payload_len) +{ + int res = 0; + char *command; + + if (payload_len > MAX_TEXT_MESSAGE_LEN) { + ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n", + ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN); + return 0; + } + + /* + * This is safe because the payload buffer is always >= 8K + * even with LOW_MEMORY defined and we've already made sure the + * command is less than 128 bytes. + */ + payload[payload_len] = '\0'; + command = ast_strip(ast_strdupa(payload)); + + ast_debug(4, "%s: WebSocket %s command received\n", + ast_channel_name(instance->channel), command); + + if (ast_strings_equal(command, ANSWER_CHANNEL)) { + ast_queue_control(instance->channel, AST_CONTROL_ANSWER); + + } else if (ast_strings_equal(command, HANGUP_CHANNEL)) { + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + + } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) { + AST_LIST_LOCK(&instance->frame_queue); + instance->bulk_media_in_progress = 1; + AST_LIST_UNLOCK(&instance->frame_queue); + + } else if (ast_begins_with(command, STOP_MEDIA_BUFFERING)) { + char *id; + char *option; + SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, + AST_LIST_UNLOCK); + + id = ast_strip(command + strlen(STOP_MEDIA_BUFFERING)); + + ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n", + ast_channel_name(instance->channel), STOP_MEDIA_BUFFERING, id, + (int)instance->leftover_len); + + instance->bulk_media_in_progress = 0; + if (instance->leftover_len > 0) { + res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len); + if (res != 0) { + return res; + } + } + instance->leftover_len = 0; + res = ast_asprintf(&option, "%s%s%s", MEDIA_BUFFERING_COMPLETED, + S_COR(!ast_strlen_zero(id), " ", ""), S_OR(id, "")); + if (res <= 0 || !option) { + return res; + } + res = queue_option_frame(instance, option); + ast_free(option); + + } else if (ast_strings_equal(command, FLUSH_MEDIA)) { + struct ast_frame *frame = NULL; + AST_LIST_LOCK(&instance->frame_queue); + while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) { + ast_frfree(frame); + } + instance->bulk_media_in_progress = 0; + instance->leftover_len = 0; + AST_LIST_UNLOCK(&instance->frame_queue); + + } else if (ast_strings_equal(payload, REPORT_QUEUE_DRAINED)) { + AST_LIST_LOCK(&instance->frame_queue); + instance->report_queue_drained = 1; + AST_LIST_UNLOCK(&instance->frame_queue); + + } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) { + char *status = NULL; + + res = ast_asprintf(&status, "%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s", + DRIVER_STATUS, + instance->frame_queue_length, QUEUE_LENGTH_XON_LEVEL, + QUEUE_LENGTH_XOFF_LEVEL, + S_COR(instance->queue_full, "true", "false"), + S_COR(instance->bulk_media_in_progress, "true", "false"), + S_COR(instance->queue_paused, "true", "false") + ); + if (res <= 0 || !status) { + ast_free(status); + res = -1; + } else { + ast_debug(4, "%s: WebSocket status: %s\n", + ast_channel_name(instance->channel), status); + res = ast_websocket_write_string(instance->websocket, status); + ast_free(status); + } + + } else if (ast_strings_equal(payload, PAUSE_MEDIA)) { + AST_LIST_LOCK(&instance->frame_queue); + instance->queue_paused = 1; + AST_LIST_UNLOCK(&instance->frame_queue); + + } else if (ast_strings_equal(payload, CONTINUE_MEDIA)) { + AST_LIST_LOCK(&instance->frame_queue); + instance->queue_paused = 0; + AST_LIST_UNLOCK(&instance->frame_queue); + + } else { + ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n", + ast_channel_name(instance->channel), command); + } + + return res; +} + +static int process_binary_message(struct websocket_pvt *instance, + char *payload, uint64_t payload_len) +{ + char *next_frame_ptr = NULL; + size_t bytes_read = 0; + int res = 0; + size_t bytes_left = 0; + + { + SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, + AST_LIST_UNLOCK); + if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) { + ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n", + ast_channel_name(instance->channel)); + return 0; + } + } + + next_frame_ptr = payload; + instance->bytes_read += payload_len; + + if (instance->bulk_media_in_progress && instance->leftover_len > 0) { + /* + * We have leftover data from a previous websocket message. + * Try to make a complete frame by appending data from + * the current message to the leftover data. + */ + char *append_ptr = instance->leftover_data + instance->leftover_len; + size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len; + /* + * It's possible that even the current message doesn't have enough + * data to make a complete frame. + */ + size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len); + + /* + * Append whatever we can to the end of the leftover data + * even if it's not enough to make a complete frame. + */ + memcpy(append_ptr, payload, bytes_avail_to_copy); + + /* + * If leftover data is still short, just return and wait for the + * next websocket message. + */ + if (bytes_avail_to_copy < bytes_needed_for_frame) { + ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n", + ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame); + instance->leftover_len += bytes_avail_to_copy; + return 0; + } + + res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size); + if (res < 0) { + return -1; + } + + /* + * We stole data from the current payload so decrement payload_len + * and set the next frame pointer after the data in payload + * we just copied. + */ + payload_len -= bytes_avail_to_copy; + next_frame_ptr = payload + bytes_avail_to_copy; + + ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n", + ast_channel_name(instance->channel), + instance->frame_queue_length, + (int)instance->bytes_read, + (int)(payload_len + bytes_avail_to_copy), + (int)instance->leftover_len, + payload, + next_frame_ptr, + (int)(next_frame_ptr - payload), + (int)payload_len, + (int)bytes_avail_to_copy + ); + + + instance->leftover_len = 0; + } + + if (!instance->bulk_media_in_progress && instance->leftover_len > 0) { + instance->leftover_len = 0; + } + + bytes_left = payload_len; + while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) { + res = queue_frame_from_buffer(instance, next_frame_ptr, + instance->optimal_frame_size); + if (res < 0) { + break; + } + bytes_read += instance->optimal_frame_size; + next_frame_ptr += instance->optimal_frame_size; + bytes_left -= instance->optimal_frame_size; + } + + if (instance->bulk_media_in_progress && bytes_left > 0) { + /* + * We have a partial frame. Save the leftover data. + */ + ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n", + ast_channel_name(instance->channel), + (int)instance->bytes_read, + instance->frame_queue_length, + (int)payload_len, + (int)instance->leftover_len, + payload, + next_frame_ptr, + (int)(next_frame_ptr - payload), + (int)bytes_left + ); + memcpy(instance->leftover_data, next_frame_ptr, bytes_left); + instance->leftover_len = bytes_left; + } + + return 0; +} + +static int read_from_ws_and_queue(struct websocket_pvt *instance) +{ + uint64_t payload_len = 0; + char *payload = NULL; + enum ast_websocket_opcode opcode; + int fragmented = 0; + int res = 0; + + if (!instance || !instance->websocket) { + ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", + ast_channel_name(instance->channel)); + return -1; + } + + ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel)); + res = ast_wait_for_input( + ast_websocket_fd(instance->websocket), -1); + if (res <= 0) { + ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n", + ast_channel_name(instance->channel), strerror(errno)); + return -1; + } + + /* + * We need to lock here to prevent the websocket handle from + * being pulled out from under us if the core sends us a + * hangup request. + */ + ao2_lock(instance); + if (!instance->websocket) { + ao2_unlock(instance); + return -1; + } + + res = ast_websocket_read(instance->websocket, &payload, &payload_len, + &opcode, &fragmented); + ao2_unlock(instance); + if (res) { + return -1; + } + ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel), + (int)payload_len); + + if (opcode == AST_WEBSOCKET_OPCODE_TEXT) { + return process_text_message(instance, payload, payload_len); + } + + if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) { + ast_debug(5, "%s: WebSocket closed by remote\n", + ast_channel_name(instance->channel)); + return -1; + } + + if (opcode != AST_WEBSOCKET_OPCODE_BINARY) { + ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n", + ast_channel_name(instance->channel), (int)opcode); + return 0; + } + + return process_binary_message(instance, payload, payload_len); +} + +/*! + * \internal + * + * For incoming websocket connections, this function gets called by + * incoming_ws_established_cb() and is run in the http server thread + * handling the websocket connection. + * + * For outgoing websocket connections, this function gets started as + * a background thread by webchan_call(). + */ +static void *read_thread_handler(void *obj) +{ + RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup); + RAII_VAR(char *, command, NULL, ast_free); + int res = 0; + + ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel)); + + /* + * We need to tell the remote app what channel this media is for. + * This is especially important for outbound connections otherwise + * the app won't know who the media is for. + */ + res = ast_asprintf(&command, "%s connection_id:%s channel:%s format:%s optimal_frame_size:%d", MEDIA_START, + instance->connection_id, ast_channel_name(instance->channel), + ast_format_get_name(instance->native_format), + instance->optimal_frame_size); + if (res <= 0 || !command) { + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + ast_log(LOG_ERROR, "%s: Failed to create MEDIA_START\n", ast_channel_name(instance->channel)); + return NULL; + } + res = ast_websocket_write_string(instance->websocket, command); + if (res != 0) { + ast_log(LOG_ERROR, "%s: Failed to send MEDIA_START\n", ast_channel_name(instance->channel)); + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + return NULL; + } + ast_debug(3, "%s: Sent %s\n", ast_channel_name(instance->channel), + command); + + if (!instance->no_auto_answer) { + ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel)); + ast_queue_control(instance->channel, AST_CONTROL_ANSWER); + } + + while (read_from_ws_and_queue(instance) == 0) + { + } + + /* + * websocket_hangup will take care of closing the websocket if needed. + */ + ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel)); + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + + return NULL; +} + +/*! \brief Function called when we should write a frame to the channel */ +static int webchan_write(struct ast_channel *ast, struct ast_frame *f) +{ + struct websocket_pvt *instance = ast_channel_tech_pvt(ast); + + if (!instance || !instance->websocket) { + ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n", + ast_channel_name(ast)); + return -1; + } + + if (f->frametype != AST_FRAME_VOICE) { + ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n", + ast_channel_name(ast)); + return -1; + } + if (f->subclass.format != instance->native_format) { + ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format\n", + ast_channel_name(ast), ast_format_get_name(instance->native_format)); + return -1; + } + + return ast_websocket_write(instance->websocket, AST_WEBSOCKET_OPCODE_BINARY, + (char *)f->data.ptr, (uint64_t)f->datalen); +} + +/*! + * \internal + * + * Called by the core to actually call the remote. + */ +static int webchan_call(struct ast_channel *ast, const char *dest, + int timeout) +{ + struct websocket_pvt *instance = ast_channel_tech_pvt(ast); + int nodelay = 1; + enum ast_websocket_result result; + + if (!instance) { + ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", + ast_channel_name(ast)); + return -1; + } + + if (instance->type == AST_WS_TYPE_SERVER) { + ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel)); + return 0; + } + ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel)); + + if (!instance->client) { + ast_log(LOG_WARNING, "%s: WebSocket client not found\n", + ast_channel_name(ast)); + return -1; + } + + ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n", + ast_channel_name(ast), dest, instance->connection_id); + + instance->websocket = ast_websocket_client_connect(instance->client, + instance, ast_channel_name(ast), &result); + if (!instance->websocket || result != WS_OK) { + ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n", + ast_channel_name(ast), dest, ast_websocket_result_to_str(result)); + return -1; + } + + if (setsockopt(ast_websocket_fd(instance->websocket), + IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) { + ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno)); + } + + ast_debug(3, "%s: WebSocket connection to %s established\n", + ast_channel_name(ast), dest); + + /* read_thread_handler() will clean up the bump */ + if (ast_pthread_create_detached_background(&instance->outbound_read_thread, NULL, + read_thread_handler, ao2_bump(instance))) { + ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast)); + ao2_cleanup(instance); + return -1; + } + + return 0; +} + +static void websocket_destructor(void *data) +{ + struct websocket_pvt *instance = data; + struct ast_frame *frame = NULL; + ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id); + + AST_LIST_LOCK(&instance->frame_queue); + while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) { + ast_frfree(frame); + } + AST_LIST_UNLOCK(&instance->frame_queue); + + if (instance->timer) { + ast_timer_close(instance->timer); + instance->timer = NULL; + } + + if (instance->channel) { + ast_channel_unref(instance->channel); + instance->channel = NULL; + } + if (instance->websocket) { + ast_websocket_unref(instance->websocket); + instance->websocket = NULL; + } + + ao2_cleanup(instance->client); + instance->client = NULL; + + ao2_cleanup(instance->native_codec); + instance->native_codec = NULL; + + ao2_cleanup(instance->native_format); + instance->native_format = NULL; + + ao2_cleanup(instance->slin_codec); + instance->slin_codec = NULL; + + ao2_cleanup(instance->slin_format); + instance->slin_format = NULL; + + if (instance->silence.data.ptr) { + ast_free(instance->silence.data.ptr); + instance->silence.data.ptr = NULL; + } + + if (instance->translator) { + ast_translator_free_path(instance->translator); + instance->translator = NULL; + } + + if (instance->leftover_data) { + ast_free(instance->leftover_data); + instance->leftover_data = NULL; + } +} + +struct instance_proxy { + AO2_WEAKPROXY(); + /*! \brief The name of the module owning this sorcery instance */ + char connection_id[0]; +}; + +static void instance_proxy_cb(void *weakproxy, void *data) +{ + struct instance_proxy *proxy = weakproxy; + ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id); + ao2_unlink(instances, weakproxy); +} + +static struct websocket_pvt* websocket_new(const char *chan_name, + const char *connection_id, struct ast_format *fmt) +{ + RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup); + RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup); + char uuid[AST_UUID_STR_LEN]; + enum ast_websocket_type ws_type; + + SCOPED_AO2WRLOCK(locker, instances); + + if (ast_strings_equal(connection_id, INCOMING_CONNECTION_ID)) { + connection_id = ast_uuid_generate_str(uuid, sizeof(uuid)); + ws_type = AST_WS_TYPE_SERVER; + } else { + ws_type = AST_WS_TYPE_CLIENT; + } + + proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL); + if (!proxy) { + return NULL; + } + strcpy(proxy->connection_id, connection_id); /* Safe */ + + instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1, + websocket_destructor); + if (!instance) { + return NULL; + } + strcpy(instance->connection_id, connection_id); /* Safe */ + + instance->type = ws_type; + if (ws_type == AST_WS_TYPE_CLIENT) { + instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id); + if (!instance->client) { + ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n", + chan_name, instance->connection_id); + return NULL; + } + } + + AST_LIST_HEAD_INIT(&instance->frame_queue); + + /* + * We need the codec to calculate the number of samples in a frame + * so we'll get it once and store it in the instance. + * + * References for native_format and native_codec are now held by the + * instance and will be released when the instance is destroyed. + */ + instance->native_format = fmt; + instance->native_codec = ast_format_get_codec(instance->native_format); + /* + * References for native_format and native_codec are now held by the + * instance and will be released when the instance is destroyed. + */ + instance->optimal_frame_size = + (instance->native_codec->default_ms * instance->native_codec->minimum_bytes) + / instance->native_codec->minimum_ms; + + instance->leftover_data = ast_calloc(1, instance->optimal_frame_size); + if (!instance->leftover_data) { + return NULL; + } + + /* We have exclusive access to proxy and sorcery, no need for locking here. */ + if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) { + return NULL; + } + + if (ao2_weakproxy_subscribe(proxy, instance_proxy_cb, NULL, OBJ_NOLOCK)) { + return NULL; + } + + if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) { + ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n", + proxy->connection_id); + return NULL; + } + ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id); + + return ao2_bump(instance); +} + +static int set_instance_translator(struct websocket_pvt *instance) +{ + if (ast_format_cache_is_slinear(instance->native_format)) { + instance->slin_format = ao2_bump(instance->native_format); + instance->slin_codec = ast_format_get_codec(instance->slin_format); + return 0; + } + + instance->slin_format = ao2_bump(ast_format_cache_get_slin_by_rate(instance->native_codec->sample_rate)); + if (!instance->slin_format) { + ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n", + ast_channel_name(instance->channel), instance->native_codec->sample_rate); + return -1; + } + ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n", + ast_channel_name(instance->channel), ast_format_get_name(instance->slin_format), + ast_format_get_sample_rate(instance->slin_format), + ast_format_get_default_ms(instance->slin_format)); + + instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format); + if (!instance->translator) { + ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n", + ast_channel_name(instance->channel), ast_format_get_name(instance->native_format), + ast_format_get_name(instance->slin_format)); + return -1; + } + + instance->slin_codec = ast_format_get_codec(instance->slin_format); + return 0; +} + +static int set_instance_silence_frame(struct websocket_pvt *instance) +{ + instance->silence.frametype = AST_FRAME_VOICE; + instance->silence.datalen = + (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms; + instance->silence.samples = instance->silence.datalen / sizeof(uint16_t); + /* + * Even though we'll calloc the data pointer, we don't mark it as + * mallocd because this frame will be around for a while and we don't + * want it accidentally freed before we're done with it. + */ + instance->silence.mallocd = 0; + instance->silence.offset = 0; + instance->silence.src = __PRETTY_FUNCTION__; + instance->silence.subclass.format = instance->slin_format; + instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen); + if (!instance->silence.data.ptr) { + return -1; + } + + return 0; +} + +static int set_channel_timer(struct websocket_pvt *instance) +{ + int rate = 0; + instance->timer = ast_timer_open(); + if (!instance->timer) { + return -1; + } + /* Rate is the number of ticks per second, not the interval. */ + rate = 1000 / ast_format_get_default_ms(instance->native_format); + ast_debug(3, "%s: WebSocket timer rate %d\n", + ast_channel_name(instance->channel), rate); + ast_timer_set_rate(instance->timer, rate); + /* + * Calling ast_channel_set_fd will cause the channel thread to call + * webchan_read at 'rate' times per second. + */ + ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer)); + + return 0; +} + +static int set_channel_variables(struct websocket_pvt *instance) +{ + char *pkt_size = NULL; + int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size); + if (res <= 0) { + return -1; + } + + pbx_builtin_setvar_helper(instance->channel, MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE, + pkt_size); + ast_free(pkt_size); + pbx_builtin_setvar_helper(instance->channel, MEDIA_WEBSOCKET_CONNECTION_ID, + instance->connection_id); + + return 0; +} + +enum { + OPT_WS_CODEC = (1 << 0), + OPT_WS_NO_AUTO_ANSWER = (1 << 1), +}; + +enum { + OPT_ARG_WS_CODEC, + OPT_ARG_WS_NO_AUTO_ANSWER, + OPT_ARG_ARRAY_SIZE +}; + +AST_APP_OPTIONS(websocket_options, BEGIN_OPTIONS + AST_APP_OPTION_ARG('c', OPT_WS_CODEC, OPT_ARG_WS_CODEC), + AST_APP_OPTION('n', OPT_WS_NO_AUTO_ANSWER), + END_OPTIONS ); + +static struct ast_channel *webchan_request(const char *type, + struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, + const struct ast_channel *requestor, const char *data, int *cause) +{ + char *parse; + RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup); + struct ast_channel *chan = NULL; + struct ast_format *fmt = NULL; + struct ast_format_cap *caps = NULL; + AST_DECLARE_APP_ARGS(args, + AST_APP_ARG(connection_id); + AST_APP_ARG(options); + ); + struct ast_flags opts = { 0, }; + char *opt_args[OPT_ARG_ARRAY_SIZE]; + const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel"; + + ast_debug(3, "%s: WebSocket channel requested\n", + requestor_name); + + if (ast_strlen_zero(data)) { + ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n", + requestor_name); + goto failure; + } + parse = ast_strdupa(data); + AST_NONSTANDARD_APP_ARGS(args, parse, '/'); + + if (ast_strlen_zero(args.connection_id)) { + ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n", + requestor_name); + goto failure; + } + + if (!ast_strlen_zero(args.options) + && ast_app_parse_options(websocket_options, &opts, opt_args, + ast_strdupa(args.options))) { + ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n", + requestor_name, args.options); + goto failure; + } + + if (ast_test_flag(&opts, OPT_WS_CODEC) + && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) { + ast_debug(3, "%s: Using specified format %s\n", + requestor_name, opt_args[OPT_ARG_WS_CODEC]); + fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]); + } else { + /* + * If codec wasn't specified in the dial string, + * use the first format in the capabilities. + */ + ast_debug(3, "%s: Using format %s from requesting channel\n", + requestor_name, opt_args[OPT_ARG_WS_CODEC]); + fmt = ast_format_cap_get_format(cap, 0); + } + + if (!fmt) { + ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n", + requestor_name, args.connection_id); + goto failure; + } + + instance = websocket_new(requestor_name, args.connection_id, fmt); + if (!instance) { + ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n", + requestor_name); + goto failure; + } + + instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER); + + chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids, + requestor, 0, "WebSocket/%s/%p", args.connection_id, instance); + if (!chan) { + ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor)); + goto failure; + } + + ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n", + ast_channel_name(chan), requestor_name, + instance->connection_id); + + instance->channel = ao2_bump(chan); + ast_channel_tech_set(instance->channel, &websocket_tech); + + if (set_instance_translator(instance) != 0) { + goto failure; + } + + if (set_instance_silence_frame(instance) != 0) { + goto failure; + } + + if (set_channel_timer(instance) != 0) { + goto failure; + } + + if (set_channel_variables(instance) != 0) { + goto failure; + } + + caps = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT); + if (!caps) { + ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name); + goto failure; + } + + ast_format_cap_append(caps, instance->native_format, 0); + ast_channel_nativeformats_set(instance->channel, caps); + ast_channel_set_writeformat(instance->channel, instance->native_format); + ast_channel_set_rawwriteformat(instance->channel, instance->native_format); + ast_channel_set_readformat(instance->channel, instance->native_format); + ast_channel_set_rawreadformat(instance->channel, instance->native_format); + ast_channel_tech_pvt_set(chan, ao2_bump(instance)); + ast_channel_unlock(chan); + ao2_cleanup(caps); + + ast_debug(3, "%s: WebSocket channel created to %s\n", + ast_channel_name(chan), args.connection_id); + + return chan; + +failure: + if (chan) { + ast_channel_unlock(chan); + } + *cause = AST_CAUSE_FAILURE; + return NULL; +} + + +/*! + * \internal + * + * Called by the core to hang up the channel. + */ +static int webchan_hangup(struct ast_channel *ast) +{ + struct websocket_pvt *instance = ast_channel_tech_pvt(ast); + + if (!instance) { + return -1; + } + ast_debug(3, "%s: WebSocket call hangup. cid: %s\n", + ast_channel_name(ast), instance->connection_id); + + /* + * We need to lock because read_from_ws_and_queue() is probably waiting + * on the websocket file descriptor and will unblock and immediately try to + * check the websocket and read from it. We don't want to pull the + * websocket out from under it between the check and read. + */ + ao2_lock(instance); + if (instance->websocket) { + ast_websocket_close(instance->websocket, 1000); + ast_websocket_unref(instance->websocket); + instance->websocket = NULL; + } + ast_channel_tech_pvt_set(ast, NULL); + ao2_unlock(instance); + + /* Clean up the reference from adding the instance to the channel */ + ao2_cleanup(instance); + + return 0; +} + +/*! + * \internal + * + * Called by res_http_websocket after a client has connected and + * successfully upgraded from HTTP to WebSocket. + * + * Depends on incoming_ws_http_callback parsing the connection_id from + * the HTTP request and storing it in get_params. + */ +static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, + struct ast_variable *get_params, struct ast_variable *upgrade_headers) +{ + RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref); + struct ast_variable *v; + const char *connection_id = NULL; + struct websocket_pvt *instance = NULL; + int nodelay = 1; + + ast_debug(3, "WebSocket established\n"); + + for (v = upgrade_headers; v; v = v->next) { + ast_debug(4, "Header-> %s: %s\n", v->name, v->value); + } + for (v = get_params; v; v = v->next) { + ast_debug(4, " Param-> %s: %s\n", v->name, v->value); + } + + connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID"); + if (!connection_id) { + /* + * This can't really happen because websocket_http_callback won't + * let it get this far if it can't add the connection_id to the + * get_params. + * Just in case though... + */ + ast_log(LOG_WARNING, "WebSocket connection id not found\n"); + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + ast_websocket_close(ast_ws_session, 1000); + return; + } + + instance = ao2_weakproxy_find(instances, connection_id, OBJ_SEARCH_KEY | OBJ_NOLOCK, ""); + if (!instance) { + /* + * This also can't really happen because websocket_http_callback won't + * let it get this far if it can't find the instance. + * Just in case though... + */ + ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id); + ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + ast_websocket_close(ast_ws_session, 1000); + return; + } + instance->websocket = ao2_bump(ast_ws_session); + + if (setsockopt(ast_websocket_fd(instance->websocket), + IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) { + ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno)); + } + + /* read_thread_handler cleans up the bump */ + read_thread_handler(ao2_bump(instance)); + + ao2_cleanup(instance); + ast_debug(3, "WebSocket closed\n"); +} + +/*! + * \internal + * + * Called by the core http server after a client connects but before + * the upgrade from HTTP to Websocket. We need to save the URI in + * the CONNECTION_ID in a get_param because it contains the connection UUID + * we gave to the client when they used externalMedia to create the channel. + * incoming_ws_established_cb() will use this to retrieve the chan_websocket + * instance. + */ +static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, + const struct ast_http_uri *urih, const char *uri, + enum ast_http_method method, struct ast_variable *get_params, + struct ast_variable *headers) +{ + struct ast_http_uri fake_urih = { + .data = ast_ws_server, + }; + int res = 0; + /* + * Normally the http server will destroy the get_params + * when the session ends but if there weren't any initially + * and we create some and add them to the list, the http server + * won't know about it so we have to destroy it ourselves. + */ + int destroy_get_params = (get_params == NULL); + struct ast_variable *v = NULL; + RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup); + + ast_debug(2, "URI: %s Starting\n", uri); + + /* + * The client will have issued the GET request with a URI of + * /media/ + * + * Since this callback is registered for the /media URI prefix the + * http server will strip that off the front of the URI passing in + * only the path components after that in the 'uri' parameter. + * This should leave only the connection id without a leading '/'. + */ + instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, ""); + if (!instance) { + ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri); + ast_http_error(ser, 404, "Not found", "WebSocket instance not found"); + return -1; + } + + /* + * We don't allow additional connections using the same connection id. + */ + if (instance->websocket) { + ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n", + uri, instance->channel ? ast_channel_name(instance->channel) : "unknown"); + ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id"); + return -1; + } + + v = ast_variable_new("CONNECTION_ID", uri, ""); + if (!v) { + ast_http_error(ser, 500, "Server error", ""); + return -1; + } + ast_variable_list_append(&get_params, v); + + for (v = get_params; v; v = v->next) { + ast_debug(4, " Param-> %s: %s\n", v->name, v->value); + } + + /* + * This will ultimately call internal_ws_established_cb() so + * this function will block until the websocket is closed and + * internal_ws_established_cb() returns; + */ + res = ast_websocket_uri_cb(ser, &fake_urih, uri, method, + get_params, headers); + if (destroy_get_params) { + ast_variables_destroy(get_params); + } + + ast_debug(2, "URI: %s DONE\n", uri); + + return res; +} + +static struct ast_http_uri http_uri = { + .callback = incoming_ws_http_callback, + .description = "Media over Websocket", + .uri = "media", + .has_subtree = 1, + .data = NULL, + .key = __FILE__, + .no_decode_uri = 1, +}; + +/*! \brief Function called when our module is unloaded */ +static int unload_module(void) +{ + ast_http_uri_unlink(&http_uri); + ao2_cleanup(ast_ws_server); + ast_ws_server = NULL; + + ast_channel_unregister(&websocket_tech); + ao2_cleanup(websocket_tech.capabilities); + websocket_tech.capabilities = NULL; + + ao2_cleanup(instances); + instances = NULL; + + return 0; +} + +AO2_STRING_FIELD_HASH_FN(instance_proxy, connection_id) +AO2_STRING_FIELD_CMP_FN(instance_proxy, connection_id) +AO2_STRING_FIELD_SORT_FN(instance_proxy, connection_id) + +/*! \brief Function called when our module is loaded */ +static int load_module(void) +{ + int res = 0; + struct ast_websocket_protocol *protocol; + + if (!(websocket_tech.capabilities = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT))) { + return AST_MODULE_LOAD_DECLINE; + } + + ast_format_cap_append_by_type(websocket_tech.capabilities, AST_MEDIA_TYPE_UNKNOWN); + if (ast_channel_register(&websocket_tech)) { + ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n"); + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + + instances = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, + AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn, + instance_proxy_sort_fn, instance_proxy_cmp_fn); + if (!instances) { + ast_log(LOG_WARNING, + "Failed to allocate the chan_websocket instance registry\n"); + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + + ast_ws_server = ast_websocket_server_create(); + if (!ast_ws_server) { + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + + protocol = ast_websocket_sub_protocol_alloc("media"); + if (!protocol) { + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + protocol->session_established = incoming_ws_established_cb; + res = ast_websocket_server_add_protocol2(ast_ws_server, protocol); + + ast_http_uri_link(&http_uri); + + return res == 0 ? AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Websocket Media Channel", + .support_level = AST_MODULE_SUPPORT_CORE, + .load = load_module, + .unload = unload_module, + .load_pri = AST_MODPRI_CHANNEL_DRIVER, + .requires = "res_http_websocket,res_websocket_client", +); diff --git a/include/asterisk/http_websocket.h b/include/asterisk/http_websocket.h index 150639fcb5..062f710377 100644 --- a/include/asterisk/http_websocket.h +++ b/include/asterisk/http_websocket.h @@ -77,6 +77,14 @@ enum ast_websocket_opcode { AST_WEBSOCKET_OPCODE_CONTINUATION = 0x0, /*!< Continuation of a previous frame */ }; +#ifdef LOW_MEMORY +/*! \brief Size of the pre-determined buffer for WebSocket frames */ +#define AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE 8192 +#else +/*! \brief Size of the pre-determined buffer for WebSocket frames */ +#define AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE 65535 +#endif + /*! * \brief Opaque structure for WebSocket server. * \since 12 diff --git a/main/channel.c b/main/channel.c index 8c6d8e76fd..374d6974c5 100644 --- a/main/channel.c +++ b/main/channel.c @@ -3527,16 +3527,12 @@ static struct ast_frame *__ast_read(struct ast_channel *chan, int dropaudio, int * The ast_waitfor() code records which of the channel's file * descriptors reported that data is available. In theory, * ast_read() should only be called after ast_waitfor() reports - * that a channel has data available for reading. However, - * there still may be some edge cases throughout the code where - * ast_read() is called improperly. This can potentially cause - * problems, so if this is a developer build, make a lot of - * noise if this happens so that it can be addressed. - * - * One of the potential problems is blocking on a dead channel. + * that a channel has data available for reading but certain + * situations with stasis and ARI could give a false indication. + * For this reason, we don't stop any processing. */ if (ast_channel_fdno(chan) == -1) { - ast_log(LOG_ERROR, + ast_debug(3, "ast_read() on chan '%s' called with no recorded file descriptor.\n", ast_channel_name(chan)); } diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index bcb2871c4e..1aa83165ca 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -44,6 +44,7 @@ #include "asterisk/dial.h" #include "asterisk/max_forwards.h" #include "asterisk/rtp_engine.h" +#include "asterisk/websocket_client.h" #include "resource_channels.h" #include @@ -2179,6 +2180,53 @@ static int external_media_audiosocket_tcp(struct ast_ari_channels_external_media return 0; } +static int external_media_websocket(struct ast_ari_channels_external_media_args *args, + struct ast_variable *variables, + struct ast_ari_response *response) +{ + char *endpoint; + struct ast_channel *chan; + struct varshead *vars; + + if (ast_asprintf(&endpoint, "WebSocket/%s/c(%s)", + args->external_host, + args->format) == -1) { + return 1; + } + + chan = ari_channels_handle_originate_with_id( + endpoint, + NULL, + NULL, + 0, + NULL, + args->app, + args->data, + NULL, + 0, + variables, + args->channel_id, + NULL, + NULL, + args->format, + response); + + ast_free(endpoint); + + if (!chan) { + return 1; + } + + ast_channel_lock(chan); + vars = ast_channel_varshead(chan); + if (vars && !AST_LIST_EMPTY(vars)) { + ast_json_object_set(response->message, "channelvars", ast_json_channel_vars(vars)); + } + ast_channel_unlock(chan); + ast_channel_unref(chan); + return 0; +} + #include "asterisk/config.h" #include "asterisk/netsock2.h" @@ -2209,31 +2257,77 @@ void ast_ari_channels_external_media(struct ast_variable *headers, return; } - if (ast_strlen_zero(args->external_host)) { - ast_ari_response_error(response, 400, "Bad Request", "external_host cannot be empty"); - return; + if (ast_strlen_zero(args->transport)) { + args->transport = "udp"; } - external_host = ast_strdupa(args->external_host); - if (!ast_sockaddr_split_hostport(external_host, &host, &port, PARSE_PORT_REQUIRE)) { - ast_ari_response_error(response, 400, "Bad Request", "external_host must be :"); - return; + if (ast_strlen_zero(args->encapsulation)) { + args->encapsulation = "rtp"; } - - if (ast_strlen_zero(args->format)) { - ast_ari_response_error(response, 400, "Bad Request", "format cannot be empty"); - return; + if (ast_strings_equal(args->transport, "websocket")) { + if (!ast_strings_equal(args->encapsulation, "none")) { + ast_ari_response_error(response, 400, "Bad Request", "encapsulation must be 'none' for websocket transport"); + return; + } } - if (ast_strlen_zero(args->encapsulation)) { - args->encapsulation = "rtp"; + if (ast_strings_equal(args->encapsulation, "rtp")) { + if (!ast_strings_equal(args->transport, "udp")) { + ast_ari_response_error(response, 400, "Bad Request", "transport must be 'udp' for rtp encapsulation"); + return; + } } - if (ast_strlen_zero(args->transport)) { - args->transport = "udp"; + + if (ast_strings_equal(args->encapsulation, "audiosocket")) { + if (!ast_strings_equal(args->transport, "tcp")) { + ast_ari_response_error(response, 400, "Bad Request", "transport must be 'tcp' for audiosocket encapsulation"); + return; + } } + if (ast_strlen_zero(args->connection_type)) { args->connection_type = "client"; } + if (!ast_strings_equal(args->transport, "websocket")) { + if (ast_strings_equal(args->connection_type, "server")) { + ast_ari_response_error(response, 400, "Bad Request", "'server' connection_type can only be used with the websocket transport"); + return; + } + } + + if (ast_strlen_zero(args->external_host)) { + if (ast_strings_equal(args->connection_type, "client")) { + ast_ari_response_error(response, 400, "Bad Request", "external_host is required for all but websocket server connections"); + return; + } else { + /* server is only valid for websocket, enforced above */ + args->external_host = "INCOMING"; + } + } + + if (ast_strings_equal(args->transport, "websocket")) { + if (ast_strings_equal(args->connection_type, "client")) { + struct ast_websocket_client *ws_client = + ast_websocket_client_retrieve_by_id(args->external_host); + ao2_cleanup(ws_client); + if (!ws_client) { + ast_ari_response_error(response, 400, "Bad Request", "external_host must be a valid websocket_client connection id."); + return; + } + } + } else { + external_host = ast_strdupa(args->external_host); + if (!ast_sockaddr_split_hostport(external_host, &host, &port, PARSE_PORT_REQUIRE)) { + ast_ari_response_error(response, 400, "Bad Request", "external_host must be : for all transports other than websocket"); + return; + } + } + + if (ast_strlen_zero(args->format)) { + ast_ari_response_error(response, 400, "Bad Request", "format cannot be empty"); + return; + } + if (ast_strlen_zero(args->direction)) { args->direction = "both"; } @@ -2250,6 +2344,12 @@ void ast_ari_channels_external_media(struct ast_variable *headers, response, 500, "Internal Server Error", "An internal error prevented this request from being handled"); } + } else if (strcasecmp(args->encapsulation, "none") == 0 && strcasecmp(args->transport, "websocket") == 0) { + if (external_media_websocket(args, variables, response)) { + ast_ari_response_error( + response, 500, "Internal Server Error", + "An internal error prevented this request from being handled"); + } } else { ast_ari_response_error( response, 501, "Not Implemented", diff --git a/res/ari/resource_channels.h b/res/ari/resource_channels.h index 8dc7fc9e19..96e2695fa3 100644 --- a/res/ari/resource_channels.h +++ b/res/ari/resource_channels.h @@ -834,13 +834,13 @@ struct ast_ari_channels_external_media_args { const char *app; /*! The "variables" key in the body object holds variable key/value pairs to set on the channel on creation. Other keys in the body object are interpreted as query parameters. Ex. { "endpoint": "SIP/Alice", "variables": { "CALLERID(name)": "Alice" } } */ struct ast_json *variables; - /*! Hostname/ip:port of external host */ + /*! Hostname/ip:port or websocket_client connection ID of external host. May be empty for a websocket server connection. */ const char *external_host; - /*! Payload encapsulation protocol */ + /*! Payload encapsulation protocol. Must be 'none' for the websocket transport. */ const char *encapsulation; /*! Transport protocol */ const char *transport; - /*! Connection type (client/server) */ + /*! Connection type (client/server). 'server' is only valid for the websocket transport. */ const char *connection_type; /*! Format to encode audio in */ const char *format; @@ -863,7 +863,7 @@ int ast_ari_channels_external_media_parse_body( /*! * \brief Start an External Media session. * - * Create a channel to an External Media source/sink. + * Create a channel to an External Media source/sink. The combination of transport and encapsulation will select one of chan_rtp(udp/rtp), chan_audiosocket(tcp/audiosocket) or chan_websocket(websocket/none) channel drivers. * * \param headers HTTP headers * \param args Swagger parameters diff --git a/res/res_ari_channels.c b/res/res_ari_channels.c index 33f50624c9..9c7b25182f 100644 --- a/res/res_ari_channels.c +++ b/res/res_ari_channels.c @@ -2299,7 +2299,7 @@ static void ast_ari_channels_record_cb( case 501: /* Not Implemented */ case 400: /* Invalid parameters */ case 404: /* Channel not found */ - case 409: /* Channel is not in a Stasis application; the channel is currently bridged with other hcannels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail */ + case 409: /* Channel is not in a Stasis application; the channel is currently bridged with other channels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail */ case 422: /* The format specified is unknown on this system */ is_valid = 1; break; diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index b65d9a0cfa..60caf41fef 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -51,27 +51,21 @@ #define MAX_PROTOCOL_BUCKETS 7 #ifdef LOW_MEMORY -/*! \brief Size of the pre-determined buffer for WebSocket frames */ -#define MAXIMUM_FRAME_SIZE 8192 - /*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a * payload. */ -#define DEFAULT_RECONSTRUCTION_CEILING 8192 +#define DEFAULT_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE /*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */ -#define MAXIMUM_RECONSTRUCTION_CEILING 8192 +#define MAXIMUM_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE #else -/*! \brief Size of the pre-determined buffer for WebSocket frames */ -#define MAXIMUM_FRAME_SIZE 65535 - /*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a * payload. */ -#define DEFAULT_RECONSTRUCTION_CEILING MAXIMUM_FRAME_SIZE +#define DEFAULT_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE /*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */ -#define MAXIMUM_RECONSTRUCTION_CEILING MAXIMUM_FRAME_SIZE +#define MAXIMUM_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE #endif /*! \brief Maximum size of a websocket frame header @@ -100,7 +94,7 @@ struct ast_websocket { struct websocket_client *client; /*!< Client object when connected as a client websocket */ char session_id[AST_UUID_STR_LEN]; /*!< The identifier for the websocket session */ uint16_t close_status_code; /*!< Status code sent in a CLOSE frame upon shutdown */ - char buf[MAXIMUM_FRAME_SIZE]; /*!< Fixed buffer for reading data into */ + char buf[AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE]; /*!< Fixed buffer for reading data into */ }; const char *ast_websocket_type_to_str(enum ast_websocket_type type) @@ -201,7 +195,7 @@ static void session_destroy_fn(void *obj) if (session->stream) { ast_iostream_close(session->stream); session->stream = NULL; - ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", + ast_debug(3, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->remote_address)); } } @@ -279,7 +273,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_server_add_protocol2)(struct ast_websock ao2_link_flags(server->protocols, protocol, OBJ_NOLOCK); ao2_unlock(server->protocols); - ast_verb(5, "WebSocket registered sub-protocol '%s'\n", protocol->name); + ast_debug(1, "WebSocket registered sub-protocol '%s'\n", protocol->name); ao2_ref(protocol, -1); return 0; @@ -301,7 +295,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_server_remove_protocol)(struct ast_webso ao2_unlink(server->protocols, protocol); ao2_ref(protocol, -1); - ast_verb(5, "WebSocket unregistered sub-protocol '%s'\n", name); + ast_debug(1, "WebSocket unregistered sub-protocol '%s'\n", name); return 0; } @@ -672,7 +666,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha /* Now read the rest of the payload */ *payload = &session->buf[frame_size]; /* payload will start here, at the end of the options, if any */ frame_size = frame_size + (*payload_len); /* final frame size is header + optional headers + payload data */ - if (frame_size > MAXIMUM_FRAME_SIZE) { + if (frame_size > AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE) { ast_log(LOG_WARNING, "Cannot fit huge websocket frame of %zu bytes\n", frame_size); /* The frame won't fit :-( */ ast_websocket_close(session, 1009); @@ -992,7 +986,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan return 0; } - ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); + ast_debug(3, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); /* Populate the session with all the needed details */ session->stream = ser->stream; diff --git a/rest-api/api-docs/channels.json b/rest-api/api-docs/channels.json index e4d436632e..04f475b4d4 100644 --- a/rest-api/api-docs/channels.json +++ b/rest-api/api-docs/channels.json @@ -1473,7 +1473,7 @@ }, { "code": 409, - "reason": "Channel is not in a Stasis application; the channel is currently bridged with other hcannels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail" + "reason": "Channel is not in a Stasis application; the channel is currently bridged with other channels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail" }, { "code": 422, @@ -1870,7 +1870,7 @@ "17.1.0" ], "summary": "Start an External Media session.", - "notes": "Create a channel to an External Media source/sink.", + "notes": "Create a channel to an External Media source/sink. The combination of transport and encapsulation will select one of chan_rtp(udp/rtp), chan_audiosocket(tcp/audiosocket) or chan_websocket(websocket/none) channel drivers.", "nickname": "externalMedia", "responseClass": "Channel", "parameters": [ @@ -1900,15 +1900,15 @@ }, { "name": "external_host", - "description": "Hostname/ip:port of external host", + "description": "Hostname/ip:port or websocket_client connection ID of external host. May be empty for a websocket server connection.", "paramType": "query", - "required": true, + "required": false, "allowMultiple": false, "dataType": "string" }, { "name": "encapsulation", - "description": "Payload encapsulation protocol", + "description": "Payload encapsulation protocol. Must be 'none' for the websocket transport.", "paramType": "query", "required": false, "allowMultiple": false, @@ -1918,7 +1918,8 @@ "valueType": "LIST", "values": [ "rtp", - "audiosocket" + "audiosocket", + "none" ] } }, @@ -1934,13 +1935,14 @@ "valueType": "LIST", "values": [ "udp", - "tcp" + "tcp", + "websocket" ] } }, { "name": "connection_type", - "description": "Connection type (client/server)", + "description": "Connection type (client/server). 'server' is only valid for the websocket transport.", "paramType": "query", "required": false, "allowMultiple": false, @@ -1949,7 +1951,8 @@ "allowableValues": { "valueType": "LIST", "values": [ - "client" + "client", + "server" ] } },