static void destroy_node_handler(ei_node_t *ei_node) {
int pending = 0;
void *pop;
+ switch_memory_pool_t *pool = ei_node->pool;
switch_clear_flag(ei_node, LFLAG_RUNNING);
switch_mutex_destroy(ei_node->event_streams_mutex);
- switch_core_destroy_memory_pool(&ei_node->pool);
+ switch_core_destroy_memory_pool(&pool);
}
static switch_status_t add_to_ei_nodes(ei_node_t *this_ei_node) {
ei_node_t *ei_node = acs->ei_node;
ei_send_msg_t *send_msg;
- switch_malloc(send_msg, sizeof(*send_msg));
- memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid));
-
if(!switch_test_flag(ei_node, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Ignoring command while shuting down\n");
switch_atomic_dec(&ei_node->pending_bgapi);
return NULL;
}
+ switch_malloc(send_msg, sizeof(*send_msg));
+ memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid));
+
ei_x_new_with_version(&send_msg->buf);
ei_x_encode_tuple_header(&send_msg->buf, 3);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|building xferext extension: %s %s\n", uuid, app_name, app_arg);
+ switch_safe_free(app_name);
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|transfered call to xferext extension\n", uuid);
return erlang_response_badarg(rbuf);
}
+ if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
+ return erlang_response_baduuid(rbuf);
+ }
+
switch_uuid_get(&cmd_uuid);
switch_uuid_format(cmd_uuid_str, &cmd_uuid);
switch_event_create(&event, SWITCH_EVENT_COMMAND);
if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) {
+ switch_core_session_rwunlock(session);
return erlang_response_badarg(rbuf);
}
log_sendmsg_request(uuid_str, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event-uuid", cmd_uuid_str);
- if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
- return erlang_response_baduuid(rbuf);
- }
switch_core_session_queue_private_event(session, &event, SWITCH_FALSE);
switch_core_session_rwunlock(session);
return erlang_response_badarg(rbuf);
}
+ if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
+ return erlang_response_baduuid(rbuf);
+ }
+
switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE);
if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) {
+ switch_core_session_rwunlock(session);
return erlang_response_badarg(rbuf);
}
log_sendmsg_request(uuid_str, event);
- if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
- return erlang_response_baduuid(rbuf);
- }
switch_core_session_queue_private_event(session, &event, SWITCH_FALSE);
switch_core_session_rwunlock(session);
}
static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) {
- char cmd[MAXATOMLEN + 1];
- char *arg;
+ char cmd[MAXATOMLEN + 1];
+ char *arg;
switch_stream_handle_t stream = { 0 };
SWITCH_STANDARD_STREAM(stream);
switch_event_create(&stream.param_event, SWITCH_EVENT_API);
- if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) {
- return erlang_response_badarg(rbuf);
- }
+ if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) {
+ return erlang_response_badarg(rbuf);
+ }
- if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) {
- return erlang_response_badarg(rbuf);
- }
+ if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) {
+ return erlang_response_badarg(rbuf);
+ }
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg);
- if (rbuf) {
- char *reply;
+ if (rbuf) {
+ char *reply;
switch_status_t status;
status = api_exec_stream(cmd, arg, &stream, &reply);
ei_encode_switch_event_headers(rbuf, stream.param_event);
}
- switch_safe_free(reply);
- }
+ switch_safe_free(reply);
+ }
if (stream.param_event) {
switch_event_fire(&stream.param_event);
}
- switch_safe_free(arg);
+ switch_safe_free(arg);
switch_safe_free(stream.data);
- return SWITCH_STATUS_SUCCESS;
+ return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf)
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "parse_error");
_ei_x_encode_string(rbuf, parse_end);
+ switch_safe_free(arg);
return status;
}
if ((uuid = cJSON_GetObjectCstr(jcmd, "uuid"))) {
if (!(session = switch_core_session_locate(uuid))) {
+ cJSON_Delete(jcmd);
+ switch_safe_free(arg);
return erlang_response_baduuid(rbuf);
}
}
}
for (i = 1; i <= length; i++) {
-
if (ei_decode_atom_safe(buf->buff, &buf->index, event_name)) {
switch_mutex_unlock(ei_node->event_streams_mutex);
return erlang_response_badarg(rbuf);
/* {'$gen_call', {_, _}, {_, _}} = Buf */
} else if (arity == 3 && !strncmp(atom, "$gen_call", 9)) {
switch_status_t status;
- ei_send_msg_t *send_msg;
+ ei_send_msg_t *send_msg = NULL;
erlang_ref ref;
switch_malloc(send_msg, sizeof(*send_msg));
-
- ei_x_new(&send_msg->buf);
-
ei_x_new_with_version(&send_msg->buf);
/* ...{_, _}, {_, _}} = Buf */
/* is_tuple(Type) */
if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call message of an unexpected type (ensure you are using Kazoo v2.14+).\n");
+ ei_x_free(&send_msg->buf);
+ switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR;
}
/* ...pid(), _}, {_, _}} = Buf */
if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply pid (ensure you are using Kazoo v2.14+).\n");
+ ei_x_free(&send_msg->buf);
+ switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR;
}
/* ...ref()}, {_, _}} = Buf */
if (ei_decode_ref(buf->buff, &buf->index, &ref)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply tag (ensure you are using Kazoo v2.14+).\n");
+ ei_x_free(&send_msg->buf);
+ switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR;
}
status = handle_kazoo_request(ei_node, &msg->from, buf, &send_msg->buf);
if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error queuing reply\n");
ei_x_free(&send_msg->buf);
switch_safe_free(send_msg);
}
static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) {
int version, size, type, arity;
char atom[MAXATOMLEN + 1];
- ei_send_msg_t *send_msg;
+ ei_send_msg_t *send_msg = NULL;
erlang_ref ref;
- switch_malloc(send_msg, sizeof(*send_msg));
-
- ei_x_new(&send_msg->buf);
-
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message, attempting to reply\n");
+ switch_malloc(send_msg, sizeof(*send_msg));
+ ei_x_new_with_version(&send_msg->buf);
+
buf->index = 0;
ei_decode_version(buf->buff, &buf->index, &version);
ei_get_type(buf->buff, &buf->index, &type, &size);
/* is_tuple(Buff) */
if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message of an unexpected type\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
/* {_, _, _} = Buf */
if (arity != 3) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel tuple has an unexpected arity\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
/* {'$gen_call', _, _} = Buf */
if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", 9)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message tuple does not begin with the atom '$gen_call'\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
/* {_, Sender, _}=Buff, is_tuple(Sender) */
if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel tuple is an unexpected type\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
/* {_, _}=Sender */
if (arity != 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel message has an unexpected arity\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
/* {Pid, Ref}=Sender */
if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to decode erlang pid or ref of the net_kernel tuple second element\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
/* {_, _, Request}=Buff, is_tuple(Request) */
if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message is an unexpected type\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
/* {_, _}=Request */
if (arity != 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message has an unexpected arity\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
/* {is_auth, _}=Request */
if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "The net_kernel message third element does not begin with the atom 'is_auth'\n");
- return SWITCH_STATUS_GENERR;
+ goto error;
}
/* To ! {Tag, Reply} */
- ei_x_new_with_version(&send_msg->buf);
ei_x_encode_tuple_header(&send_msg->buf, 2);
ei_x_encode_ref(&send_msg->buf, &ref);
ei_x_encode_atom(&send_msg->buf, "yes");
if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) {
- ei_x_free(&send_msg->buf);
- switch_safe_free(send_msg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "unable to queue net kernel message\n");
+ goto error;
}
return SWITCH_STATUS_SUCCESS;
+
+error:
+ ei_x_free(&send_msg->buf);
+ switch_safe_free(send_msg);
+ return SWITCH_STATUS_GENERR;
}
static switch_status_t handle_erl_send(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang receive handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port);
while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
- void *pop;
+ void *pop = NULL;
if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) {
ei_received_msg_t *received_msg = (ei_received_msg_t *) pop;
while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
int status;
int send_msg_count = 0;
- void *pop;
+ void *pop = NULL;
if (!received_msg) {
switch_malloc(received_msg, sizeof(*received_msg));
/* create a new buf for the erlang message and a rbuf for the reply */
if(kazoo_globals.receive_msg_preallocate > 0) {
- received_msg->buf.buff = malloc(kazoo_globals.receive_msg_preallocate);
+ switch_malloc(received_msg->buf.buff, kazoo_globals.receive_msg_preallocate);
received_msg->buf.buffsz = kazoo_globals.receive_msg_preallocate;
received_msg->buf.index = 0;
if(received_msg->buf.buff == NULL) {
} else {
ei_x_new(&received_msg->buf);
}
+ } else {
+ received_msg->buf.index = 0;
}
while (++send_msg_count <= kazoo_globals.send_msg_batch
case ERL_MSG:
fault_count = 0;
+ if (kazoo_globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > kazoo_globals.receive_msg_preallocate) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz);
+ }
+
if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "failed to push erlang received message from %s <%d.%d.%d> into queue\n", received_msg->msg.from.node, received_msg->msg.from.creation, received_msg->msg.from.num, received_msg->msg.from.serial);
ei_x_free(&received_msg->buf);
switch_safe_free(received_msg);
}
- if (kazoo_globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > kazoo_globals.receive_msg_preallocate) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz);
- }
-
received_msg = NULL;
break;
case ERL_ERROR:
destroy_node_handler(ei_node);
switch_atomic_dec(&kazoo_globals.threads);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown Complete for erlang node handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port);
+
return NULL;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "New erlang connection from node %s (%s:%d) -> (%s:%d)\n", ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port, ei_node->local_ip, ei_node->local_port);
- for(i = 0; i < kazoo_globals.num_worker_threads; i++) {
+ for(i = 0; i < kazoo_globals.node_worker_threads; i++) {
switch_threadattr_create(&thd_attr, ei_node->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);