]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
move session destroy to one place and hangup in others to eliminate races and minimiz...
authorTamas Cseke <tamas.cseke@virtual-call-center.eu>
Mon, 11 Jun 2012 13:03:58 +0000 (15:03 +0200)
committerTamas Cseke <tamas.cseke@virtual-call-center.eu>
Mon, 11 Jun 2012 13:03:58 +0000 (15:03 +0200)
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c

index b1caa67e3a0ff0b33083d92a448b4c7b12bd8922..9960e2499d727f0ca3cec902701f804723157712 100644 (file)
@@ -55,6 +55,8 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename);
 static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
 static void launch_listener_thread(listener_t *listener);
 
+session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid);
+
 static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
 {
        listener_t *l;
@@ -135,14 +137,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
                return;
        }
 
-
-       switch_thread_rwlock_rdlock(listener->session_rwlock);
-       if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
-               switch_thread_rwlock_rdlock(s->rwlock);
-       }
-       switch_thread_rwlock_unlock(listener->session_rwlock);
-
-       if (s) {
+       if ((s = (session_elem_t*)find_session_elem_by_uuid(listener, uuid))) {
                int send = 0;
 
                switch_thread_rwlock_rdlock(s->event_rwlock);
@@ -296,7 +291,7 @@ static void remove_listener(listener_t *listener)
 }
 
 /* Search for a listener already talking to the specified node and lock for reading*/
-static listener_t *find_listener_locked(char *nodename)
+static listener_t *find_listener(char *nodename)
 {
        listener_t *l = NULL;
 
@@ -321,14 +316,9 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s
 
 
 static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
-{
-       switch_core_hash_delete(listener->sessions, session_element->uuid_str);
-}
-
-static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
 {
        switch_thread_rwlock_wrlock(listener->session_rwlock);
-       remove_session_elem_from_listener(listener, session_element);
+       switch_core_hash_delete(listener->sessions, session_element->uuid_str);
        switch_thread_rwlock_unlock(listener->session_rwlock);
 }
 
@@ -343,10 +333,6 @@ static void destroy_session_elem(session_elem_t *session_element)
        if ((session = switch_core_session_locate(session_element->uuid_str))) {
                switch_channel_t *channel = switch_core_session_get_channel(session);
 
-               if (switch_channel_get_state(channel) < CS_HANGUP) {
-                       switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Outbound session for %s exited unexpectedly!\n", session_element->uuid_str);
-               }
-
                switch_channel_set_private(channel, "_erlang_session_", NULL);
                switch_channel_clear_flag(channel, CF_CONTROLLED);
                switch_core_session_rwunlock(session);
@@ -354,6 +340,19 @@ static void destroy_session_elem(session_elem_t *session_element)
        switch_core_destroy_memory_pool(&session_element->pool);
 }
 
+session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid)
+{
+       session_elem_t *session = NULL;
+       
+       switch_thread_rwlock_rdlock(listener->session_rwlock);
+       if ((session = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
+               switch_thread_rwlock_rdlock(session->rwlock);
+       }
+
+       switch_thread_rwlock_unlock(listener->session_rwlock);
+
+       return session;
+ }
 
 session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
 {
@@ -362,14 +361,17 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
        void *val = NULL;
        session_elem_t *session = NULL;
 
+       switch_thread_rwlock_rdlock(listener->session_rwlock);
        for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
                switch_hash_this(iter, &key, NULL, &val);
                
                if (((session_elem_t*)val)->process.type == ERLANG_PID && !ei_compare_pids(pid, &((session_elem_t*)val)->process.pid)) {
                        session = (session_elem_t*)val;
+                       switch_thread_rwlock_rdlock(session->rwlock);
                        break;
                }
        }
+       switch_thread_rwlock_unlock(listener->session_rwlock);
 
        return session;
 }
@@ -536,6 +538,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
 
        if (!(session = switch_core_session_locate(session_element->uuid_str))) {
                switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Can't locate session %s\n", session_element->uuid_str);
+               switch_event_destroy(&call_event);
                return SWITCH_STATUS_FALSE;
        }
 
@@ -672,19 +675,16 @@ static switch_status_t check_attached_sessions(listener_t *listener)
                }
        }
        switch_thread_rwlock_unlock(listener->session_rwlock);
-       /* release the read lock and get a write lock */
-       switch_thread_rwlock_wrlock(listener->session_rwlock);
-       /* do the deferred remove */
 
+       /* do the deferred remove */
        for (header = event->headers; header; header = header->next) {
-               if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
+               if ((sp = (session_elem_t*)find_session_elem_by_uuid(listener, header->value))) {
                        remove_session_elem_from_listener(listener, sp);
+                       switch_thread_rwlock_unlock(sp->rwlock);
                        destroy_session_elem(sp);
                }
        }
 
-       switch_thread_rwlock_unlock(listener->session_rwlock);
-
        /* remove the temporary event */
        switch_event_destroy(&event);
 
@@ -783,13 +783,23 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
 
        remove_binding(NULL, pid);      /* TODO - why don't we pass the listener as the first argument? */
 
-       /* TODO - eliminate session destroy races and we shouldn't lock the session hash */
-       switch_thread_rwlock_wrlock(listener->session_rwlock);
        if ((s = find_session_elem_by_pid(listener, pid))) {
-               remove_session_elem_from_listener(listener, s);
-               destroy_session_elem(s);
+               switch_core_session_t *session = NULL;
+
+               if ((session = switch_core_session_locate(s->uuid_str))) {
+                       switch_channel_t *channel = switch_core_session_get_channel(session);
+
+                       if (switch_channel_get_state(channel) < CS_HANGUP) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Outbound session exited unexpectedly %s!\n", s->uuid_str);
+                       }
+
+                       switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
+                       switch_core_session_rwunlock(session);
+               }
+
+               switch_thread_rwlock_unlock(s->rwlock);
+
        }
-       switch_thread_rwlock_wrlock(listener->session_rwlock);
 
 
        if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) {
@@ -1214,7 +1224,6 @@ static listener_t *new_outbound_listener_locked(char *node)
 
                        return NULL;
                }
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
                listener = new_listener(&ec, clientfd);
                listener->peer_nodename = switch_core_strdup(listener->pool, node);
        }
@@ -1410,10 +1419,11 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
 
        switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000);
        if (!p->pid) {
+               switch_channel_t *channel = switch_core_session_get_channel(session);
+
                p->state = reply_timeout;
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str);
-               remove_session_elem_from_listener_locked(listener, session_element);
-               destroy_session_elem(session_element);
+               switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
                return NULL;
        }
 
@@ -1500,7 +1510,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
 
        /* first work out if there is a listener already talking to the node we want to talk to */
-       listener = find_listener_locked(node);
+       listener = find_listener(node);
        /* if there is no listener, then create one */
        if (!listener) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
@@ -1567,7 +1577,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
        ei_x_encode_atom(&buf, "freeswitch_sendmsg");
        _ei_x_encode_string(&buf, argv[2]);
 
-       listener = find_listener_locked(node);
+       listener = find_listener(node);
        if (!listener) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
                listener = new_outbound_listener_locked(node);