pthread_t outbound_read_thread;
size_t bytes_read;
size_t leftover_len;
+ char *remote_addr;
char *uri_params;
char *leftover_data;
enum webchan_control_msg_format control_msg_format;
char connection_id[0];
};
+/*
+ * These are the indexes in the channel's file descriptor array
+ * not the file descriptors themselves.
+ */
+#define WS_TIMER_FDNO (AST_EXTENDED_FDS + 1)
+#define WS_WEBSOCKET_FDNO (AST_EXTENDED_FDS + 2)
+
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
#define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID"
#define INCOMING_CONNECTION_ID "INCOMING"
#define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
/* Forward declarations */
+static int read_from_ws_and_queue(struct websocket_pvt *instance);
+static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause,
+ enum ast_websocket_status_code tech_cause, int line, const char *function);
static struct ast_channel *webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout);
static struct ast_frame *webchan_read(struct ast_channel *ast);
static int webchan_hangup(struct ast_channel *ast);
static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration);
+#define websocket_request_hangup(_instance, _cause, _tech) \
+ _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__)
+
static struct ast_channel_tech websocket_tech = {
.type = "WebSocket",
.description = "Media over WebSocket Channel Driver",
({ \
int _res = -1; \
char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
- ao2_lock(instance); \
if (_payload && _instance->websocket) { \
_res = ast_websocket_write_string(_instance->websocket, _payload); \
if (_res != 0) { \
ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
ast_channel_name(instance->channel), _payload); \
} else { \
- ast_debug(4, "%s: Sent %s\n", \
+ ast_debug(3, "%s: Sent %s\n", \
ast_channel_name(instance->channel), _payload); \
}\
ast_free(_payload); \
} \
- ao2_unlock(instance); \
(_res); \
})
/*!
* \internal
*
- * Called by the core channel thread each time the instance timer fires.
+ * There are two file descriptors on this channel that can trigger
+ * this function...
+ *
+ * The timer fd (WS_TIMER_FDNO) which gets triggered at a constant
+ * rate determined by the format. In this case, we need to pull a
+ * frame OFF the queue and return it to the core.
+ *
+ * The websocket fd (WS_WEBSOCKET_FDNO) which gets triggered when
+ * there's incoming data to read from the websocket. In this case,
+ * we read the data and put it ON the queue. We'll return a null frame.
*
*/
static struct ast_frame *webchan_read(struct ast_channel *ast)
struct websocket_pvt *instance = NULL;
struct ast_frame *native_frame = NULL;
struct ast_frame *slin_frame = NULL;
+ int fdno = ast_channel_fdno(ast);
instance = ast_channel_tech_pvt(ast);
if (!instance) {
return NULL;
}
+ if (fdno == WS_WEBSOCKET_FDNO) {
+ read_from_ws_and_queue(instance);
+ return &ast_null_frame;
+ }
+ if (fdno != WS_TIMER_FDNO) {
+ return &ast_null_frame;
+ }
+
if (ast_timer_get_event(instance->timer) == AST_TIMING_EVENT_EXPIRED) {
ast_timer_ack(instance->timer, 1);
}
ast_queue_control(instance->channel, AST_CONTROL_ANSWER);
} else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
- ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
+ websocket_request_hangup(instance, AST_CAUSE_NORMAL, AST_WEBSOCKET_STATUS_NORMAL);
} else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
if (instance->passthrough) {
int fragmented = 0;
int res = 0;
- if (!instance || !instance->websocket) {
- ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
- ast_channel_name(instance->channel));
- return -1;
- }
-
- ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel));
- res = ast_wait_for_input(
- ast_websocket_fd(instance->websocket), -1);
- if (res <= 0) {
- ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n",
- ast_channel_name(instance->channel), strerror(errno));
- return -1;
- }
-
- /*
- * We need to lock here to prevent the websocket handle from
- * being pulled out from under us if the core sends us a
- * hangup request.
- */
- ao2_lock(instance);
if (!instance->websocket) {
- ao2_unlock(instance);
+ ast_log(LOG_WARNING, "%s: WebSocket session not found\n",
+ ast_channel_name(instance->channel));
return -1;
}
res = ast_websocket_read(instance->websocket, &payload, &payload_len,
&opcode, &fragmented);
- ao2_unlock(instance);
+
if (res) {
+ ast_debug(3, "%s: WebSocket read error\n",
+ ast_channel_name(instance->channel));
+ websocket_request_hangup(instance, AST_CAUSE_NETWORK_OUT_OF_ORDER, AST_WEBSOCKET_STATUS_GOING_AWAY);
return -1;
}
ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
}
if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
- ast_debug(5, "%s: WebSocket closed by remote\n",
+ ast_debug(3, "%s: WebSocket closed by remote\n",
ast_channel_name(instance->channel));
+ websocket_request_hangup(instance, AST_CAUSE_NORMAL, AST_WEBSOCKET_STATUS_GOING_AWAY);
return -1;
}
if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
- ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n",
+ ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n",
ast_channel_name(instance->channel), (int)opcode);
+ websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA);
return 0;
}
return process_binary_message(instance, payload, payload_len);
}
-/*!
- * \internal
- *
- * For incoming websocket connections, this function gets called by
- * incoming_ws_established_cb() and is run in the http server thread
- * handling the websocket connection.
- *
- * For outgoing websocket connections, this function gets started as
- * a background thread by webchan_call().
- */
-static void *read_thread_handler(void *obj)
+static int websocket_handoff_to_channel(struct websocket_pvt *instance)
{
- RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup);
int res = 0;
+ int nodelay = 1;
+ struct ast_sockaddr *remote_addr = ast_websocket_remote_address(instance->websocket);
- ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel));
+ instance->remote_addr = ast_strdup(ast_sockaddr_stringify(remote_addr));
+ ast_debug(3, "%s: WebSocket connection with %s established\n",
+ ast_channel_name(instance->channel), instance->remote_addr);
+
+ if (setsockopt(ast_websocket_fd(instance->websocket),
+ IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
+ ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
+ }
+
+ ast_channel_set_fd(instance->channel, WS_WEBSOCKET_FDNO, ast_websocket_fd(instance->websocket));
res = send_event(instance, MEDIA_START);
if (res != 0 ) {
- ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
- return NULL;
+ if (instance->type == AST_WS_TYPE_SERVER) {
+ websocket_request_hangup(instance, AST_CAUSE_NETWORK_OUT_OF_ORDER, AST_WEBSOCKET_STATUS_GOING_AWAY);
+ } else {
+ /*
+ * We were called by webchan_call so just need to set causes.
+ * The core will hangup the channel.
+ */
+ ast_channel_tech_hangupcause_set(instance->channel, AST_WEBSOCKET_STATUS_GOING_AWAY);
+ ast_channel_hangupcause_set(instance->channel, AST_CAUSE_NETWORK_OUT_OF_ORDER);
+ }
+ return -1;
}
if (!instance->no_auto_answer) {
ast_queue_control(instance->channel, AST_CONTROL_ANSWER);
}
- while (read_from_ws_and_queue(instance) == 0)
- {
- }
+ return 0;
+}
- /*
- * websocket_hangup will take care of closing the websocket if needed.
- */
- ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel));
- ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
+static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause,
+ enum ast_websocket_status_code tech_cause, int line, const char *function)
+{
+ if (!instance || !instance->channel) {
+ return;
+ }
+ ast_debug(3, "%s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d)",
+ ast_channel_name(instance->channel), instance->remote_addr,
+ function, line,
+ ast_cause2str(ast_cause), ast_cause, ast_websocket_status_to_str(tech_cause), tech_cause);
- return NULL;
+ if (tech_cause) {
+ ast_channel_tech_hangupcause_set(instance->channel, tech_cause);
+ }
+ ast_queue_hangup_with_cause(instance->channel, ast_cause);
}
/*! \brief Function called when we should write a frame to the channel */
* \internal
*
* Called by the core to actually call the remote.
+ * The core will hang up the channel if a non-zero is returned.
+ * We just need to set hangup causes if appropriate.
*/
static int webchan_call(struct ast_channel *ast, const char *dest,
int timeout)
{
struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
- int nodelay = 1;
enum ast_websocket_result result;
if (!instance) {
ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
ast_channel_name(ast));
+ ast_channel_hangupcause_set(ast, AST_CAUSE_FAILURE);
return -1;
}
if (!instance->client) {
ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
ast_channel_name(ast));
+ ast_channel_hangupcause_set(ast, AST_CAUSE_FAILURE);
return -1;
}
if (!instance->websocket || result != WS_OK) {
ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
ast_channel_name(ast), dest, ast_websocket_result_to_str(result));
+ ast_channel_hangupcause_set(ast, AST_CAUSE_NO_ROUTE_DESTINATION);
return -1;
}
- if (setsockopt(ast_websocket_fd(instance->websocket),
- IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
- ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
- }
-
- ast_debug(3, "%s: WebSocket connection to %s established\n",
- ast_channel_name(ast), dest);
-
- /* read_thread_handler() will clean up the bump */
- if (ast_pthread_create_detached_background(&instance->outbound_read_thread, NULL,
- read_thread_handler, ao2_bump(instance))) {
- ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast));
- ao2_cleanup(instance);
- return -1;
- }
-
- return 0;
+ return websocket_handoff_to_channel(instance);
}
static void websocket_destructor(void *data)
}
ast_free(instance->uri_params);
+ ast_free(instance->remote_addr);
}
struct instance_proxy {
* Calling ast_channel_set_fd will cause the channel thread to call
* webchan_read at 'rate' times per second.
*/
- ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer));
+ ast_channel_set_fd(instance->channel, WS_TIMER_FDNO, ast_timer_fd(instance->timer));
return 0;
}
ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
ast_channel_name(ast), instance->connection_id);
- /*
- * We need to lock because read_from_ws_and_queue() is probably waiting
- * on the websocket file descriptor and will unblock and immediately try to
- * check the websocket and read from it. We don't want to pull the
- * websocket out from under it between the check and read.
- */
- ao2_lock(instance);
if (instance->websocket) {
- ast_websocket_close(instance->websocket, 1000);
+ ast_websocket_close(instance->websocket, ast_channel_tech_hangupcause(ast) ?: 1000);
ast_websocket_unref(instance->websocket);
instance->websocket = NULL;
}
ast_channel_tech_pvt_set(ast, NULL);
- ao2_unlock(instance);
/* Clean up the reference from adding the instance to the channel */
ao2_cleanup(instance);
struct ast_variable *v;
const char *connection_id = NULL;
struct websocket_pvt *instance = NULL;
- int nodelay = 1;
ast_debug(3, "WebSocket established\n");
* Just in case though...
*/
ast_log(LOG_WARNING, "WebSocket connection id not found\n");
- ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
- ast_websocket_close(ast_ws_session, 1000);
+ websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_INTERNAL_ERROR);
+ ast_websocket_close(ast_ws_session, AST_WEBSOCKET_STATUS_INTERNAL_ERROR);
return;
}
* Just in case though...
*/
ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
- ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
- ast_websocket_close(ast_ws_session, 1000);
+ websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_INTERNAL_ERROR);
+ ast_websocket_close(ast_ws_session, AST_WEBSOCKET_STATUS_INTERNAL_ERROR);
return;
}
instance->websocket = ao2_bump(ast_ws_session);
- if (setsockopt(ast_websocket_fd(instance->websocket),
- IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
- ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno));
- }
-
- /* read_thread_handler cleans up the bump */
- read_thread_handler(ao2_bump(instance));
-
+ websocket_handoff_to_channel(instance);
ao2_cleanup(instance);
- ast_debug(3, "WebSocket closed\n");
+ /*
+ * The instance is the channel's responsibility now.
+ * We just return here.
+ */
}
/*!