]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fix race deleting queues at a bad time
authorAnthony Minessale <anthm@freeswitch.org>
Tue, 25 Oct 2011 16:54:01 +0000 (11:54 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Tue, 25 Oct 2011 16:54:01 +0000 (11:54 -0500)
src/mod/applications/mod_fifo/mod_fifo.c

index f86c7686b84bf50ba890922de9858d2b92f1add2..e45684134ad9bf75bb03288d1a24fa76020be708 100644 (file)
@@ -37,7 +37,7 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
 
 #define MANUAL_QUEUE_NAME "manual_calls"
 #define FIFO_EVENT "fifo::info"
-#define FIFO_DELAY_DESTROY 100
+
 static switch_status_t load_config(int reload, int del_all);
 #define MAX_PRI 10
 
@@ -312,6 +312,7 @@ struct fifo_node {
        int ring_timeout;
        int default_lag;
        char *domain_name;
+       struct fifo_node *next;
 };
 
 typedef struct fifo_node fifo_node_t;
@@ -564,6 +565,8 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
        return SWITCH_STATUS_SUCCESS;
 }
 
+struct fifo_node;
+
 static struct {
        switch_hash_t *caller_orig_hash;
        switch_hash_t *consumer_orig_hash;
@@ -587,6 +590,7 @@ static struct {
        int threads;
        switch_thread_t *node_thread;
        int debug;
+       struct fifo_node *nodes;
 } globals;
 
 
@@ -856,6 +860,8 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
 
        switch_mutex_lock(globals.mutex);
        switch_core_hash_insert(globals.fifo_hash, name, node);
+       node->next = globals.nodes;
+       globals.nodes = node;
        switch_mutex_unlock(globals.mutex);
 
        switch_safe_free(domain_name);
@@ -1213,6 +1219,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
 
        switch_mutex_lock(globals.mutex);
        node = switch_core_hash_find(globals.fifo_hash, node_name);
+       switch_thread_rwlock_rdlock(node->rwlock);
        switch_mutex_unlock(globals.mutex);
 
        for (i = 0; i < cbh->rowcount; i++) {
@@ -1498,6 +1505,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                node->ring_consumer_count = 0;
                node->busy = 0;
                switch_mutex_unlock(node->update_mutex);
+               switch_thread_rwlock_unlock(node->rwlock);
        }
 
 
@@ -1554,6 +1562,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
 
        switch_mutex_lock(globals.mutex);
        node = switch_core_hash_find(globals.fifo_hash, h->node_name);
+       switch_thread_rwlock_rdlock(node->rwlock);
        switch_mutex_unlock(globals.mutex);
 
        if (node) {
@@ -1661,6 +1670,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
                }
                node->busy = 0;
                switch_mutex_unlock(node->update_mutex);
+               switch_thread_rwlock_unlock(node->rwlock);
        }
        switch_core_destroy_memory_pool(&h->pool);
 
@@ -1792,73 +1802,93 @@ static void find_consumers(fifo_node_t *node)
 
 static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
 {
-       fifo_node_t *node;
+       fifo_node_t *node, *last, *this_node;
        int cur_priority = 1;
 
        globals.node_thread_running = 1;
 
        while (globals.node_thread_running == 1) {
-               switch_hash_index_t *hi;
-               void *val;
-               const void *var;
                int ppl_waiting, consumer_total, idle_consumers, found = 0;
 
                switch_mutex_lock(globals.mutex);
 
                if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
 
+               last = NULL;
+               node = globals.nodes;
 
-       restart:
+               while(node) {
+                       int x = 0;
+                       switch_event_t *pop;
+                       int nuke = 0;
+                       
+                       this_node = node;
+                       node = node->next;
+
+                       if (this_node->ready == 0) {
+                               
+                               for (x = 0; x < MAX_PRI; x++) {
+                                       while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+                                               const char *caller_uuid = switch_event_get_header(pop, "unique-id");
+                                               switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST);
+                                               switch_event_destroy(&pop);
+                                       }
+                               }
 
-               for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
-                       switch_hash_this(hi, &var, NULL, &val);
-                       if ((node = (fifo_node_t *) val)) {
-                               int x = 0;
-                               switch_event_t *pop;
+                       }
 
-                               if (node->ready == FIFO_DELAY_DESTROY) {
-                                       if (switch_thread_rwlock_trywrlock(node->rwlock) == SWITCH_STATUS_SUCCESS) {
-                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
-                                               switch_core_hash_delete(globals.fifo_hash, node->name);
 
-                                               for (x = 0; x < MAX_PRI; x++) {
-                                                       while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
-                                                               switch_event_destroy(&pop);
-                                                       }
-                                               }
+                       if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
 
-                                               node->ready = 0;
-                                               switch_core_hash_destroy(&node->consumer_hash);
-                                               switch_mutex_unlock(node->mutex);
-                                               switch_mutex_unlock(node->update_mutex);
-                                               switch_thread_rwlock_unlock(node->rwlock);
-                                               switch_core_destroy_memory_pool(&node->pool);
-                                               goto restart;
+                               for (x = 0; x < MAX_PRI; x++) {
+                                       while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+                                               switch_event_destroy(&pop);
                                        }
                                }
 
+                               if (last) {
+                                       last->next = this_node->next;
+                               } else {
+                                       globals.nodes = this_node->next;
+                               }
 
-                               if (node->outbound_priority == 0) node->outbound_priority = 5;
-                               if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
-                                       ppl_waiting = node_caller_count(node);
-                                       consumer_total = node->consumer_count;
-                                       idle_consumers = node_idle_consumers(node);
+                               switch_core_hash_destroy(&this_node->consumer_hash);
+                               switch_mutex_unlock(this_node->mutex);
+                               switch_mutex_unlock(this_node->update_mutex);
+                               switch_thread_rwlock_unlock(this_node->rwlock);
+                               switch_core_destroy_memory_pool(&this_node->pool);
+                               nuke++;
+                       }
 
-                                       if (globals.debug) {
-                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
-                                                                                 "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
-                                                                                 node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count, node->outbound_priority);
-                                       }
+                       last = this_node;
 
+                       if (nuke) continue;
 
-                                       if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
-                                               found++;
-                                               find_consumers(node);
-                                               switch_yield(1000000);
-                                       }
+                       if (this_node->outbound_priority == 0) this_node->outbound_priority = 5;
+
+                       globals.debug = 1;
+
+                       if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) {
+                               ppl_waiting = node_caller_count(this_node);
+                               consumer_total = this_node->consumer_count;
+                               idle_consumers = node_idle_consumers(this_node);
+
+                               if (globals.debug) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+                                                                         "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
+                                                                         this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
+                               }
+
+
+                               if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
+                                       found++;
+                                       find_consumers(this_node);
+                                       switch_yield(1000000);
                                }
                        }
                }
+       
 
                if (++cur_priority > 10) {
                        cur_priority = 1;
@@ -2001,7 +2031,9 @@ static void pres_event_handler(switch_event_t *event)
                node->ready = 1;
        }
 
+       switch_thread_rwlock_rdlock(node->rwlock);
        send_presence(node);
+       switch_thread_rwlock_unlock(node->rwlock);
 
        switch_mutex_unlock(globals.mutex);
 
@@ -2013,6 +2045,7 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
 {
        fifo_node_t *node;
        switch_event_t *call_event;
+       uint32_t i = 0;
 
        if (priority >= MAX_PRI) {
                priority = MAX_PRI - 1;
@@ -2026,6 +2059,8 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
                node = create_node(node_name, 0, globals.sql_mutex);
        }
 
+       switch_thread_rwlock_rdlock(node->rwlock);
+
        switch_mutex_unlock(globals.mutex);
 
        switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
@@ -2034,7 +2069,11 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
        fifo_queue_push(node->fifo_list[priority], call_event);
        call_event = NULL;
 
-       return fifo_queue_size(node->fifo_list[priority]);
+       i = fifo_queue_size(node->fifo_list[priority]);
+
+       switch_thread_rwlock_unlock(node->rwlock);
+
+       return i;
 
 }
 
@@ -3190,7 +3229,7 @@ SWITCH_STANDARD_APP(fifo_function)
                }
                switch_thread_rwlock_unlock(node->rwlock);
                if (node->ready == 1 && do_destroy) {
-                       node->ready = FIFO_DELAY_DESTROY;
+                       node->ready = 0;
                }
        }
        switch_mutex_unlock(globals.mutex);
@@ -4005,8 +4044,8 @@ static switch_status_t load_config(int reload, int del_all)
                switch_mutex_lock(globals.mutex);
                for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
                        switch_hash_this(hi, NULL, NULL, &val);
-                       if ((node = (fifo_node_t *) val) && node->is_static) {
-                               node->ready = 0;
+                       if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) {
+                               node->ready = -1;
                        }
                }
                switch_mutex_unlock(globals.mutex);
@@ -4190,41 +4229,18 @@ static switch_status_t load_config(int reload, int del_all)
   done:
 
        if (reload) {
-               switch_hash_index_t *hi;
-               void *val;
-               switch_event_t *pop;
                fifo_node_t *node;
-               switch_mutex_lock(globals.mutex);
-         top:
-               for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
-                       int x = 0;
-                       switch_hash_this(hi, NULL, NULL, &val);
-                       if (!(node = (fifo_node_t *) val) || !node->is_static || node->ready) {
-                               continue;
-                       }
-
-                       if (node_caller_count(node) || node->consumer_count || node_idle_consumers(node)) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s removal delayed, still in use.\n", node->name);
-                               node->ready = FIFO_DELAY_DESTROY;
-                       } else {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
-                               switch_mutex_lock(node->update_mutex);
-                               for (x = 0; x < MAX_PRI; x++) {
-                                       while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
-                                               switch_event_destroy(&pop);
-                                       }
-                               }
 
+               switch_mutex_lock(globals.mutex);
+               for (node = globals.nodes; node; node = node->next) {
+                       if (node->ready == -1) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name);
                                switch_core_hash_delete(globals.fifo_hash, node->name);
-                               switch_core_hash_destroy(&node->consumer_hash);
-                               switch_mutex_unlock(node->update_mutex);
-                               switch_core_destroy_memory_pool(&node->pool);
-                               goto top;
+                               node->ready = 0;
                        }
                }
-
-               fifo_caller_del(NULL);
                switch_mutex_unlock(globals.mutex);
+               fifo_caller_del(NULL);
        }
 
 
@@ -4472,10 +4488,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
 */
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
 {
-       switch_hash_index_t *hi;
-       void *val;
        switch_event_t *pop = NULL;
-       fifo_node_t *node;
+       fifo_node_t *node, *this_node;
        switch_mutex_t *mutex = globals.mutex;
 
        switch_event_unbind(&globals.node);
@@ -4492,24 +4506,27 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
                switch_cond_next();
        }
 
+       node = globals.nodes;
 
-       while ((hi = switch_hash_first(NULL, globals.fifo_hash))) {
+       while(node) {
                int x = 0;
-               switch_hash_this(hi, NULL, NULL, &val);
-               node = (fifo_node_t *) val;
 
-               switch_mutex_lock(node->update_mutex);
-               switch_mutex_lock(node->mutex);
+               this_node = node;
+               node = node->next;
+
+               
+               switch_mutex_lock(this_node->update_mutex);
+               switch_mutex_lock(this_node->mutex);
                for (x = 0; x < MAX_PRI; x++) {
-                       while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+                       while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
                                switch_event_destroy(&pop);
                        }
                }
-               switch_mutex_unlock(node->mutex);
-               switch_core_hash_delete(globals.fifo_hash, node->name);
-               switch_core_hash_destroy(&node->consumer_hash);
-               switch_mutex_unlock(node->update_mutex);
-               switch_core_destroy_memory_pool(&node->pool);
+               switch_mutex_unlock(this_node->mutex);
+               switch_core_hash_delete(globals.fifo_hash, this_node->name);
+               switch_core_hash_destroy(&this_node->consumer_hash);
+               switch_mutex_unlock(this_node->update_mutex);
+               switch_core_destroy_memory_pool(&this_node->pool);
        }
 
        switch_core_hash_destroy(&globals.fifo_hash);