static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
static void launch_listener_thread(listener_t *listener);
+session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid);
+
static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{
listener_t *l;
return;
}
-
- switch_thread_rwlock_rdlock(listener->session_rwlock);
- if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
- switch_thread_rwlock_rdlock(s->rwlock);
- }
- switch_thread_rwlock_unlock(listener->session_rwlock);
-
- if (s) {
+ if ((s = (session_elem_t*)find_session_elem_by_uuid(listener, uuid))) {
int send = 0;
switch_thread_rwlock_rdlock(s->event_rwlock);
}
/* Search for a listener already talking to the specified node and lock for reading*/
-static listener_t *find_listener_locked(char *nodename)
+static listener_t *find_listener(char *nodename)
{
listener_t *l = NULL;
static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
-{
- switch_core_hash_delete(listener->sessions, session_element->uuid_str);
-}
-
-static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
{
switch_thread_rwlock_wrlock(listener->session_rwlock);
- remove_session_elem_from_listener(listener, session_element);
+ switch_core_hash_delete(listener->sessions, session_element->uuid_str);
switch_thread_rwlock_unlock(listener->session_rwlock);
}
if ((session = switch_core_session_locate(session_element->uuid_str))) {
switch_channel_t *channel = switch_core_session_get_channel(session);
- if (switch_channel_get_state(channel) < CS_HANGUP) {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Outbound session for %s exited unexpectedly!\n", session_element->uuid_str);
- }
-
switch_channel_set_private(channel, "_erlang_session_", NULL);
switch_channel_clear_flag(channel, CF_CONTROLLED);
switch_core_session_rwunlock(session);
switch_core_destroy_memory_pool(&session_element->pool);
}
+session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid)
+{
+ session_elem_t *session = NULL;
+
+ switch_thread_rwlock_rdlock(listener->session_rwlock);
+ if ((session = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
+ switch_thread_rwlock_rdlock(session->rwlock);
+ }
+
+ switch_thread_rwlock_unlock(listener->session_rwlock);
+
+ return session;
+ }
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
{
void *val = NULL;
session_elem_t *session = NULL;
+ switch_thread_rwlock_rdlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &val);
if (((session_elem_t*)val)->process.type == ERLANG_PID && !ei_compare_pids(pid, &((session_elem_t*)val)->process.pid)) {
session = (session_elem_t*)val;
+ switch_thread_rwlock_rdlock(session->rwlock);
break;
}
}
+ switch_thread_rwlock_unlock(listener->session_rwlock);
return session;
}
if (!(session = switch_core_session_locate(session_element->uuid_str))) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Can't locate session %s\n", session_element->uuid_str);
+ switch_event_destroy(&call_event);
return SWITCH_STATUS_FALSE;
}
}
}
switch_thread_rwlock_unlock(listener->session_rwlock);
- /* release the read lock and get a write lock */
- switch_thread_rwlock_wrlock(listener->session_rwlock);
- /* do the deferred remove */
+ /* do the deferred remove */
for (header = event->headers; header; header = header->next) {
- if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
+ if ((sp = (session_elem_t*)find_session_elem_by_uuid(listener, header->value))) {
remove_session_elem_from_listener(listener, sp);
+ switch_thread_rwlock_unlock(sp->rwlock);
destroy_session_elem(sp);
}
}
- switch_thread_rwlock_unlock(listener->session_rwlock);
-
/* remove the temporary event */
switch_event_destroy(&event);
remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */
- /* TODO - eliminate session destroy races and we shouldn't lock the session hash */
- switch_thread_rwlock_wrlock(listener->session_rwlock);
if ((s = find_session_elem_by_pid(listener, pid))) {
- remove_session_elem_from_listener(listener, s);
- destroy_session_elem(s);
+ switch_core_session_t *session = NULL;
+
+ if ((session = switch_core_session_locate(s->uuid_str))) {
+ switch_channel_t *channel = switch_core_session_get_channel(session);
+
+ if (switch_channel_get_state(channel) < CS_HANGUP) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Outbound session exited unexpectedly %s!\n", s->uuid_str);
+ }
+
+ switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
+ switch_core_session_rwunlock(session);
+ }
+
+ switch_thread_rwlock_unlock(s->rwlock);
+
}
- switch_thread_rwlock_wrlock(listener->session_rwlock);
if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) {
return NULL;
}
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
listener = new_listener(&ec, clientfd);
listener->peer_nodename = switch_core_strdup(listener->pool, node);
}
switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000);
if (!p->pid) {
+ switch_channel_t *channel = switch_core_session_get_channel(session);
+
p->state = reply_timeout;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str);
- remove_session_elem_from_listener_locked(listener, session_element);
- destroy_session_elem(session_element);
+ switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
return NULL;
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
/* first work out if there is a listener already talking to the node we want to talk to */
- listener = find_listener_locked(node);
+ listener = find_listener(node);
/* if there is no listener, then create one */
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
ei_x_encode_atom(&buf, "freeswitch_sendmsg");
_ei_x_encode_string(&buf, argv[2]);
- listener = find_listener_locked(node);
+ listener = find_listener(node);
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
listener = new_outbound_listener_locked(node);