]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
add session event r/w locking FS-3432
authorTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 09:44:12 +0000 (11:44 +0200)
committerTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 09:44:12 +0000 (11:44 +0200)
src/mod/event_handlers/mod_erlang_event/handle_msg.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

index 8e6ba46a62973cf9cc1e35bc57fa53a0c223ab70..944263b2f5c41d28547af7c48f3e75641009b82b 100644 (file)
@@ -337,10 +337,11 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg
                        switch_event_types_t type;
                        int i = 0;
 
+                       switch_thread_rwlock_wrlock(session->event_rwlock);
+
                        for (i = 1; i < arity; i++) {
                                if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
 
-                                       /* TODO session write locking */
                                        if (custom) {
                                                switch_core_hash_insert(session->event_hash, atom, MARKER);
                                        } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
@@ -363,6 +364,9 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
                                }
                        }
+
+                       switch_thread_rwlock_unlock(session->event_rwlock);
+                       
                        ei_x_encode_atom(rbuf, "ok");
                } else {
                        ei_x_encode_tuple_header(rbuf, 2);
@@ -437,7 +441,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
                        int i = 0;
                        switch_event_types_t type;
 
-                       /* TODO session write lock */
+                       switch_thread_rwlock_wrlock(session->event_rwlock);
+
                        for (i = 1; i < arity; i++) {
                                if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
 
@@ -464,6 +469,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
                                        }
                                }
                        }
+                       switch_thread_rwlock_unlock(session->event_rwlock);
+
                        ei_x_encode_atom(rbuf, "ok");
                } else { /* no session for this pid */
                        ei_x_encode_tuple_header(rbuf, 2);
@@ -590,12 +597,15 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
                                }
                        }
+
                        /* update the event subscriptions with the new ones */
+                       switch_thread_rwlock_wrlock(session->event_rwlock);
                        memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
                        /* wipe the old hash, and point the pointer at the new one */
-                       /* TODO make thread safe*/
                        switch_core_hash_destroy(&session->event_hash);
                        session->event_hash = event_hash;
+                       switch_thread_rwlock_unlock(session->event_rwlock);
+
                        /* TODO - we should flush any non-matching events from the queue */
                        ei_x_encode_atom(rbuf, "ok");
                } else { /* no session for this pid */
@@ -1045,7 +1055,6 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
                        }
 
                        switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
-                       switch_core_hash_init(&listener->event_hash, listener->pool);
 
                        switch_thread_rwlock_unlock(listener->event_rwlock);
                        ei_x_encode_atom(rbuf, "ok");
@@ -1062,13 +1071,15 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
 
                        /*purge the event queue */
                        while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
+
+                       switch_thread_rwlock_wrlock(session->event_rwlock);
                        for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
                                session->event_list[x] = 0;
                        }
                        /* wipe the hash */
-                       /* TODO make thread safe*/
-                       switch_core_hash_destroy(&session->event_hash);
-                       switch_core_hash_init(&session->event_hash, session->pool);
+                       switch_core_hash_delete_multi(session->event_hash, NULL, NULL);
+                       switch_thread_rwlock_unlock(session->event_rwlock);
+
                        ei_x_encode_atom(rbuf, "ok");
                } else {
                        ei_x_encode_tuple_header(rbuf, 2);
index daa31d773af6cc1eea573f8c1a93dc0d0bc20922..fa1ea869821a8d45be45bf8ad2a72e15821933d0 100644 (file)
@@ -144,6 +144,9 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
 
        if (s) {
                int send = 0;
+
+               switch_thread_rwlock_rdlock(s->event_rwlock);
+
                if (s->event_list[SWITCH_EVENT_ALL]) {
                        send = 1;
                } else if ((s->event_list[event->event_id])) {
@@ -152,6 +155,8 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
                        }
                }
 
+               switch_thread_rwlock_unlock(s->event_rwlock);
+
                if (send) {
                        switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session %s\n",
                                        switch_event_name(event->event_id), s->uuid_str);
@@ -1316,6 +1321,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
        }
 
        switch_thread_rwlock_create(&session_element->rwlock, session_element->pool);
+       switch_thread_rwlock_create(&session_element->event_rwlock, session_element->pool);
 
        session_element->event_list[SWITCH_EVENT_ALL] = 1; /* defaults to everything */
 
index 2e9fed865f9655fde752685eea4bd761f2df216b..d5f890d0a1e9addd6274879bc22f131618eed9aa 100644 (file)
@@ -84,6 +84,7 @@ struct session_elem {
        struct erlang_process process;
        switch_queue_t *event_queue;
        switch_thread_rwlock_t *rwlock;
+       switch_thread_rwlock_t *event_rwlock;
        switch_channel_state_t channel_state;
        switch_memory_pool_t *pool;
        uint8_t event_list[SWITCH_EVENT_ALL + 1];