]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
listener r/w locking FS-3432
authorTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 08:30:48 +0000 (10:30 +0200)
committerTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 08:30:48 +0000 (10:30 +0200)
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

index cfa1d631151f0a3ad0d99183d53f2936a973900a..c2f204ef0add69ff62792b32486d4d3b597757e9 100644 (file)
@@ -44,6 +44,7 @@ SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_eve
 static switch_memory_pool_t *module_pool = NULL;
 
 static void remove_listener(listener_t *listener);
+static void destroy_listener(listener_t *listener);
 static switch_status_t state_handler(switch_core_session_t *session);
 
 SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
@@ -59,7 +60,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
 
        switch_thread_rwlock_rdlock(globals.listener_rwlock);
        for (l = listen_list.listeners; l; l = l->next) {
-               /* TODO listener read lock */
+
                if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
 
                        switch_log_node_t *dnode = switch_log_node_dup(node);
@@ -80,6 +81,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
                                l->lost_logs++;
                        }
                }
+
        }
        switch_thread_rwlock_unlock(globals.listener_rwlock);
 
@@ -192,8 +194,6 @@ static void event_handler(switch_event_t *event)
                   one of them should receive the event as well
                 */
 
-               /* TODO need read locking */
-               
                send_event_to_attached_sessions(l, event);
 
                if (!switch_test_flag(l, LFLAG_EVENTS)) {
@@ -255,21 +255,21 @@ static void close_socket(int *sock)
 }
 
 
-/*static void add_listener(listener_t *listener)*/
-/*{*/
-       /* add me to the listeners so I get events */
-       /*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/
-       /*listener->next = listen_list.listeners;*/
-       /*listen_list.listeners = listener;*/
-       /*switch_thread_rwlock_unlock(globals.listener_rwlock);*/
-/*}*/
+static void add_listener(listener_t *listener)
+{
+       /*      add me to the listeners so I get events */
+       switch_thread_rwlock_wrlock(globals.listener_rwlock);
+       listener->next = listen_list.listeners;
+       listen_list.listeners = listener;
+       switch_thread_rwlock_unlock(globals.listener_rwlock);
+}
 
 
-/* TODO lock */
 static void remove_listener(listener_t *listener)
 {
        listener_t *l, *last = NULL;
 
+       switch_thread_rwlock_wrlock(globals.listener_rwlock);
        for (l = listen_list.listeners; l; l = l->next) {
                if (l == listener) {
                        if (last) {
@@ -280,17 +280,18 @@ static void remove_listener(listener_t *listener)
                }
                last = l;
        }
+       switch_thread_rwlock_unlock(globals.listener_rwlock);
 }
 
-/* Search for a listener already talking to the specified node */
-static listener_t *find_listener(char *nodename)
+/* Search for a listener already talking to the specified node and lock for reading*/
+static listener_t *find_listener_locked(char *nodename)
 {
        listener_t *l = NULL;
 
        switch_thread_rwlock_rdlock(globals.listener_rwlock);
        for (l = listen_list.listeners; l; l = l->next) {
                if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
-                       /* TODO listener rwlock */
+                       switch_thread_rwlock_rdlock(l->rwlock);
                        break;
                }
        }
@@ -976,10 +977,6 @@ static switch_bool_t check_inbound_acl(listener_t *listener)
 static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
 {
        listener_t *listener = (listener_t *) obj;
-       session_elem_t *s = NULL;
-       const void *key;
-       void *value;
-       switch_hash_index_t *iter;
 
        switch_mutex_lock(globals.listener_count_mutex);
        prefs.threads++;
@@ -994,50 +991,14 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);      /*, listener->remote_port); */
                }
 
-               /*add_listener(listener);*/
+               add_listener(listener);
                listener_main_loop(listener);
        }
 
-       /* clean up */
-
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
-
-       listener->dead = 1; /* mark it as dead */
-
-       /* TODO - release write lock */
-       switch_thread_rwlock_wrlock(globals.listener_rwlock);
-       remove_listener(listener);
-       switch_thread_rwlock_unlock(globals.listener_rwlock);
-
-       switch_thread_rwlock_wrlock(listener->rwlock);
-
-       if (listener->sockfd) {
-               close_socket(&listener->sockfd);
-       }
-
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
-       /* TODO make listener destroy function and move there */
-       switch_core_hash_destroy(&listener->event_hash);
 
-       /* remove any bindings for this connection */
-       remove_binding(listener, NULL);
-
-       /* clean up all the attached sessions */
-       switch_thread_rwlock_wrlock(listener->session_rwlock);
-       for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
-               switch_hash_this(iter, &key, NULL, &value);
-               s = (session_elem_t*)value;
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
-               remove_session_elem_from_listener(listener, s);
-               destroy_session_elem(s);
-       }
-       switch_thread_rwlock_unlock(listener->session_rwlock);
-       switch_thread_rwlock_unlock(listener->rwlock);
-
-       if (listener->pool) {
-               switch_memory_pool_t *pool = listener->pool;
-               switch_core_destroy_memory_pool(&pool);
-       }
+       remove_listener(listener);
+       destroy_listener(listener);
 
        switch_mutex_lock(globals.listener_count_mutex);
        prefs.threads--;
@@ -1220,8 +1181,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
        switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
        switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
 
-       /* TODO remove */
-       listener->dead = 0; /* born alive */
        listener->sockfd = clientfd;
        listener->pool = pool;
        listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode));
@@ -1233,49 +1192,16 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
        switch_core_hash_init(&listener->event_hash, listener->pool);
        switch_core_hash_init(&listener->sessions, listener->pool);
 
-       /* TODO listener rdlock */
-       listener->next = listen_list.listeners;
-       listen_list.listeners = listener;
-
        return listener;
 }
 
-/*TODO we don't need bottleneck*/
-static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd)
-{
-       listener_t *res;
-       switch_thread_rwlock_wrlock(globals.listener_rwlock);
-       res = new_listener(ec, clientfd);
-       switch_thread_rwlock_unlock(globals.listener_rwlock);
-       return res;
-}
 
-/* TODO new session??? */
-static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
+static listener_t *new_outbound_listener_locked(char *node)
 {
        listener_t *listener = NULL;
        struct ei_cnode_s ec;
        int clientfd;
 
-       /* TODO find listener func */
-       switch_thread_rwlock_wrlock(globals.listener_rwlock);
-       for (listener = listen_list.listeners; listener; listener = listener->next) {
-               if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) {
-                       break;
-               }
-       }
-
-       if (listener && listener->dead) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node);
-               remove_listener(listener); /* remove the dead listener and continue adding one */
-       } else if (listener) {
-               switch_thread_rwlock_unlock(globals.listener_rwlock);
-
-               *new_session = SWITCH_FALSE;
-               return listener;
-       }
-
-
        if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) {
 #ifdef WIN32
                WSASetLastError(0);
@@ -1284,7 +1210,7 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
 #endif
                if ((clientfd = ei_connect(&ec, node)) < 0) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno);
-                       switch_thread_rwlock_unlock(globals.listener_rwlock);
+
                        return NULL;
                }
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
@@ -1292,12 +1218,49 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
                listener->peer_nodename = switch_core_strdup(listener->pool, node);
        }
 
-       switch_thread_rwlock_unlock(globals.listener_rwlock);
-       *new_session = SWITCH_TRUE;
+       switch_thread_rwlock_rdlock(listener->rwlock);
 
        return listener;
 }
 
+void destroy_listener(listener_t * listener)
+{
+       session_elem_t *s = NULL;
+       const void *key;
+       void *value;
+       switch_hash_index_t *iter;
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
+       switch_thread_rwlock_wrlock(listener->rwlock);
+
+       if (listener->sockfd) {
+               close_socket(&listener->sockfd);
+       }
+
+       switch_core_hash_destroy(&listener->event_hash);
+
+       /* remove any bindings for this connection */
+       remove_binding(listener, NULL);
+
+       /* clean up all the attached sessions */
+       switch_thread_rwlock_wrlock(listener->session_rwlock);
+       for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
+               switch_hash_this(iter, &key, NULL, &value);
+               s = (session_elem_t*)value;
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
+               remove_session_elem_from_listener(listener, s);
+               destroy_session_elem(s);
+       }
+       switch_thread_rwlock_unlock(listener->session_rwlock);
+       switch_thread_rwlock_unlock(listener->rwlock);
+
+       if (listener->pool) {
+               switch_memory_pool_t *pool = listener->pool;
+               switch_core_destroy_memory_pool(&pool);
+       }
+
+}
+
 static switch_status_t state_handler(switch_core_session_t *session)
 {
        switch_channel_t *channel = switch_core_session_get_channel(session);
@@ -1499,7 +1462,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
        char *argv[80] = { 0 }, *argv2[80] = { 0 };
        char *mydata, *myarg;
        char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1];
-       switch_bool_t new_session = SWITCH_FALSE;
        session_elem_t *session_element = NULL;
 
        /* process app arguments */
@@ -1541,31 +1503,19 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
 
        /* first work out if there is a listener already talking to the node we want to talk to */
-       listener = find_listener(node);
+       listener = find_listener_locked(node);
        /* if there is no listener, then create one */
        if (!listener) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
-               listener = new_outbound_listener(node, &new_session);
-               /* XXX new_session isn't accurate now */
+               if ((listener = new_outbound_listener_locked(node))) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
+                       launch_listener_thread(listener);
+               }
        } else {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n");
-               /* TODO don't we need to connect ? */
        }
 
-       /* TODO it's too late */
-       switch_thread_rwlock_rdlock(globals.listener_rwlock);
-
-       if (listener && !listener->dead) {
-               /* prevent the listener_run thread from destroying the listener out from under us */
-               /* get the listener lock */
-               switch_thread_rwlock_rdlock(listener->rwlock);
-               /* release the global listener lock, since the listener can't be freed without the listener lock */
-               switch_thread_rwlock_unlock(globals.listener_rwlock);
-
-               if (new_session == SWITCH_TRUE) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
-                       launch_listener_thread(listener);
-               }
+       if (listener) {
 
                if (module && function) {
                        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
@@ -1575,10 +1525,8 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
                        session_element = attach_call_to_registered_process(listener, reg_name, session);
                }
 
-               /* should be safe now */
                switch_thread_rwlock_unlock(listener->rwlock);
 
-
                if (session_element) {
                        switch_ivr_park(session, NULL);
                }
@@ -1600,7 +1548,6 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
        char *mydata;
        ei_x_buff buf;
        listener_t *listener;
-       switch_bool_t new_session;
 
        ei_x_new_with_version(&buf);
 
@@ -1623,16 +1570,18 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
        ei_x_encode_atom(&buf, "freeswitch_sendmsg");
        _ei_x_encode_string(&buf, argv[2]);
 
-       listener = find_listener(node);
+       listener = find_listener_locked(node);
        if (!listener) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
-               listener = new_outbound_listener(node, &new_session);
+               listener = new_outbound_listener_locked(node);
        } else {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node);
        }
 
-       if (listener && !listener->dead) {
+       if (listener) {
                ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index);
+
+               switch_thread_rwlock_unlock(listener->rwlock);
        }
 }
 
@@ -1927,7 +1876,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
                        continue;
                }
 
-               listener = new_listener_locked(&ec, clientfd);
+               listener = new_listener(&ec, clientfd);
                if (listener) {
                        /* store the IP and node name we are talking with */
                        switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));
index 775cdf74f3e7ff04d9f5b1c027cb268e9e39f859..016734dd8083cf166251a41d01c6bab44b5cce37 100644 (file)
@@ -114,7 +114,6 @@ struct listener {
 #else
        int sockfd;
 #endif
-       uint8_t dead;
        struct ei_cnode_s *ec;
        struct erlang_process log_process;
        struct erlang_process event_process;