]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fifo up
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Jul 2010 06:23:06 +0000 (01:23 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Jul 2010 06:23:06 +0000 (01:23 -0500)
src/mod/applications/mod_fifo/mod_fifo.c

index d4e572030a70ee37ab2f6f926ffc7e4352cc9a93..9af039e86494d3435b659a96cb5b31da0567022f 100644 (file)
@@ -127,7 +127,6 @@ static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
 
        queue->data[queue->idx++] = ptr;
 
-
        switch_mutex_unlock(queue->mutex);
 
        return SWITCH_STATUS_SUCCESS;
@@ -144,7 +143,7 @@ static int fifo_queue_size(fifo_queue_t *queue)
        return s;
 }
 
-static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
+static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
 {
        int i, j;
 
@@ -157,7 +156,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
 
        for (j = 0; j < queue->idx; j++) {
                const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
-               if (uuid && !check_caller_outbound_call(uuid)) {
+               if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) {
                        if (remove) {
                                *pop = queue->data[j];
                        } else {
@@ -189,7 +188,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
 }
 
 
-static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
+static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
 {
        int i, j, force = 0;
 
@@ -200,6 +199,10 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
                force = 1;
        }
 
+       if (remove == 2) {
+               force = 1;
+       }
+
        if (queue->idx == 0 || zstr(name) || zstr(val)) {
                switch_mutex_unlock(queue->mutex);
                return SWITCH_STATUS_FALSE;
@@ -252,7 +255,10 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
 
        for (j = 0; j < queue->idx; j++) {
                const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
-               if (j_uuid && !strcmp(j_uuid, uuid)) break;
+               if (j_uuid && !strcmp(j_uuid, uuid)) {
+                       switch_event_destroy(&queue->data[j]);
+                       break;
+               }
        }
 
        if (j == queue->idx) {
@@ -556,6 +562,7 @@ static struct {
        char *odbc_pass;
        int node_thread_running;
        switch_odbc_handle_t *master_odbc;
+       int threads;
 } globals;
 
 
@@ -1109,8 +1116,15 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        struct call_helper *rows[MAX_ROWS] = { 0 };
        int rowcount = 0;
 
+
+       if (!globals.running) return NULL;
+
     switch_uuid_get(&uuid);
     switch_uuid_format(uuid_str, &uuid);
+
+       switch_mutex_lock(globals.mutex);
+       globals.threads++;
+       switch_mutex_unlock(globals.mutex);
        
        if (!cbh->rowcount) {
                goto end;
@@ -1415,6 +1429,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        
        switch_core_destroy_memory_pool(&cbh->pool);
 
+       switch_mutex_lock(globals.mutex);
+       globals.threads--;
+       switch_mutex_unlock(globals.mutex);
+
        return NULL;
 }
 
@@ -1434,7 +1452,14 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
        switch_event_t *event = NULL;
        char *sql = NULL;
        int connected = 0;
-       
+
+       if (!globals.running) return NULL;      
+
+       switch_mutex_lock(globals.mutex);
+       globals.threads++;
+       switch_mutex_unlock(globals.mutex);
+
+
        switch_mutex_lock(globals.mutex);
        node = switch_core_hash_find(globals.fifo_hash, h->node_name);
        switch_mutex_unlock(globals.mutex);
@@ -1549,6 +1574,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
        }
        switch_core_destroy_memory_pool(&h->pool);
 
+       switch_mutex_lock(globals.mutex);
+       globals.threads--;
+       switch_mutex_unlock(globals.mutex);
+
        return NULL;
 }
 
@@ -1639,7 +1668,7 @@ static void find_consumers(fifo_node_t *node)
                        switch_thread_t *thread;
                        switch_threadattr_t *thd_attr = NULL;
                        struct callback_helper *cbh;
-                       switch_memory_pool_t *pool;
+                       switch_memory_pool_t *pool = NULL;
                        
                        switch_core_new_memory_pool(&pool);
                        cbh = switch_core_alloc(pool, sizeof(*cbh));
@@ -2407,7 +2436,6 @@ SWITCH_STANDARD_APP(fifo_function)
                fifo_strategy_t strat = STRAT_WAITING_LONGER;
                const char *url = NULL;
                const char *caller_uuid = NULL;
-               switch_event_t *call_event;
                const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid");
                //const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count");
                //int do_track = switch_true(track_use_count);
@@ -2613,12 +2641,10 @@ SWITCH_STANDARD_APP(fifo_function)
                                continue;
                        }
 
-                       call_event = (switch_event_t *) pop;
-                       pop = NULL;
+                       url = switch_event_get_header(pop, "dial-url");
+                       caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id"));
+                       switch_event_destroy(&pop);
                        
-                       url = switch_event_get_header(call_event, "dial-url");
-                       caller_uuid = switch_event_get_header(call_event, "unique-id");
-
                        if (url) {
                                switch_call_cause_t cause = SWITCH_CAUSE_NONE;
                                const char *o_announce = NULL;
@@ -2679,6 +2705,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                        switch_channel_event_set_data(channel, event);
                                        switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
                                        switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop");
+                                       switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session));
                                        switch_event_fire(&event);
                                }
 
@@ -2883,9 +2910,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                send_presence(node);
                                check_cancel(node);
                                switch_core_session_rwunlock(other_session);
-                               if (call_event) {
-                                       switch_event_destroy(&call_event);
-                               }
+
 
                                if (!do_wait || !switch_channel_ready(channel)) {
                                        break;
@@ -3895,7 +3920,7 @@ static switch_status_t load_config(int reload, int del_all)
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
                                switch_thread_rwlock_wrlock(node->rwlock);
                                for (x = 0; x < MAX_PRI; x++) {
-                                       while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+                                       while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
                                                switch_event_destroy(&pop);
                                        }
                                }
@@ -4171,6 +4196,11 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
        if (globals.node_thread_running) {
                stop_node_thread();
        }
+       
+       while(globals.threads) {
+               switch_cond_next();
+       }
+
 
        while ((hi = switch_hash_first(NULL, globals.fifo_hash))) {
                int x = 0;
@@ -4180,7 +4210,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
                switch_thread_rwlock_wrlock(node->rwlock);
                switch_mutex_lock(node->mutex);
                for (x = 0; x < MAX_PRI; x++) {
-                       while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+                       while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
                                switch_event_destroy(&pop);
                        }
                }