]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
add listener event r/w locking FS-3432
authorTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 08:57:28 +0000 (10:57 +0200)
committerTamas Cseke <tamas.cseke@virtual-call-center.eu>
Thu, 24 May 2012 08:57:28 +0000 (10:57 +0200)
src/mod/event_handlers/mod_erlang_event/handle_msg.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

index 0b083cd0c050f180b68eddd67b828758f4d20ef2..8e6ba46a62973cf9cc1e35bc57fa53a0c223ab70 100644 (file)
@@ -28,6 +28,7 @@
  * Rob Charlton <rob.charlton@savageminds.com>
  * Darren Schreiber <d@d-man.org>
  * Mike Jerris <mike@jerris.com>
+ * Tamas Cseke <tamas.cseke@virtual-call-center.eu>
  *
  *
  * handle_msg.c -- handle messages received from erlang nodes
@@ -286,7 +287,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
                        switch_set_flag_locked(listener, LFLAG_EVENTS);
                }
 
-               /* TODO - listener write lock */
+               switch_thread_rwlock_wrlock(listener->event_rwlock);
+
                for (i = 1; i < arity; i++) {
                        if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
 
@@ -312,6 +314,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
                        }
                }
+               switch_thread_rwlock_unlock(listener->event_rwlock);
+
                ei_x_encode_atom(rbuf, "ok");
        }
        return SWITCH_STATUS_SUCCESS;
@@ -382,7 +386,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x
                int i = 0;
                switch_event_types_t type;
 
-               /* TODO listener write lock */
+               switch_thread_rwlock_wrlock(listener->event_rwlock);
+
                for (i = 1; i < arity; i++) {
                        if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
 
@@ -410,6 +415,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x
                                }
                        }
                }
+
+               switch_thread_rwlock_unlock(listener->event_rwlock);
                ei_x_encode_atom(rbuf, "ok");
        }
        return SWITCH_STATUS_SUCCESS;
@@ -522,11 +529,11 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg
                        }
                }
                /* update the event subscriptions with the new ones */
+               switch_thread_rwlock_wrlock(listener->event_rwlock);
                memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
-               /* wipe the old hash, and point the pointer at the new one */
-               /* TODO make thread safe */
                switch_core_hash_destroy(&listener->event_hash);
                listener->event_hash = event_hash;
+               switch_thread_rwlock_unlock(listener->event_rwlock);
 
                /* TODO - we should flush any non-matching events from the queue */
                ei_x_encode_atom(rbuf, "ok");
@@ -1031,13 +1038,16 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
                if (switch_test_flag(listener, LFLAG_EVENTS)) {
                        uint8_t x = 0;
                        switch_clear_flag_locked(listener, LFLAG_EVENTS);
+
+                       switch_thread_rwlock_wrlock(listener->event_rwlock);
                        for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
                                listener->event_list[x] = 0;
                        }
-                       /* wipe the hash */
-                       /* TODO make thread safe*/
-                       switch_core_hash_destroy(&listener->event_hash);
+
+                       switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
                        switch_core_hash_init(&listener->event_hash, listener->pool);
+
+                       switch_thread_rwlock_unlock(listener->event_rwlock);
                        ei_x_encode_atom(rbuf, "ok");
                } else {
                        ei_x_encode_tuple_header(rbuf, 2);
index c2f204ef0add69ff62792b32486d4d3b597757e9..f690fe64532f7479bfcb8adf7bd82e7f487ada6d 100644 (file)
@@ -26,6 +26,7 @@
  * Anthony Minessale II <anthm@freeswitch.org>
  * Andrew Thompson <andrew@hijacked.us>
  * Rob Charlton <rob.charlton@savageminds.com>
+ * Tamas Cseke <tamas.cseke@virtual-call-center.eu>
  *
  *
  * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
@@ -200,6 +201,8 @@ static void event_handler(switch_event_t *event)
                        continue;
                }
 
+               switch_thread_rwlock_rdlock(l->event_rwlock);
+
                if (l->event_list[SWITCH_EVENT_ALL]) {
                        send = 1;
                } else if ((l->event_list[event->event_id])) {
@@ -208,6 +211,7 @@ static void event_handler(switch_event_t *event)
                        }
                }
 
+               switch_thread_rwlock_unlock(l->event_rwlock);
 
                if (send) {
                        if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
@@ -815,14 +819,12 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
                        uint8_t x = 0;
                        switch_clear_flag_locked(listener, LFLAG_EVENTS);
 
+                       switch_thread_rwlock_wrlock(listener->event_rwlock);
                        for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
                                listener->event_list[x] = 0;
                        }
-                       /* wipe the hash */
-                       /* XXX this needs to be locked */
-                       /* TODO switch_core_hash_delete_multi_locked  */
-                       switch_core_hash_destroy(&listener->event_hash);
-                       switch_core_hash_init(&listener->event_hash, listener->pool);
+                       switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
+                       switch_thread_rwlock_unlock(listener->event_rwlock);
                }
        }
 }
@@ -1177,7 +1179,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
        }
        memset(listener, 0, sizeof(*listener));
 
-       switch_thread_rwlock_create(&listener->rwlock, pool);
        switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
        switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
 
@@ -1188,7 +1189,11 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
        listener->level = SWITCH_LOG_DEBUG;
        switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
        switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+
+       switch_thread_rwlock_create(&listener->rwlock, pool);
+       switch_thread_rwlock_create(&listener->event_rwlock, pool);
        switch_thread_rwlock_create(&listener->session_rwlock, listener->pool);
+
        switch_core_hash_init(&listener->event_hash, listener->pool);
        switch_core_hash_init(&listener->sessions, listener->pool);
 
index 016734dd8083cf166251a41d01c6bab44b5cce37..538392278240405791492c632e7d831326831766 100644 (file)
@@ -129,6 +129,7 @@ struct listener {
        uint8_t event_list[SWITCH_EVENT_ALL + 1];
        switch_hash_t *event_hash;
        switch_thread_rwlock_t *rwlock;
+       switch_thread_rwlock_t *event_rwlock;
        switch_thread_rwlock_t *session_rwlock;
        //session_elem_t *session_list;
        switch_hash_t *sessions;