]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fifo up
authorAnthony Minessale <anthm@freeswitch.org>
Wed, 21 Jul 2010 07:46:35 +0000 (02:46 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 21 Jul 2010 07:46:35 +0000 (02:46 -0500)
src/mod/applications/mod_fifo/mod_fifo.c
src/switch_loadable_module.c

index 0deeff2a311aa607082b7c89ebe9c436e0d9b87f..e5771b5a11316090ec73e8c5502c27a282d3b714 100644 (file)
@@ -48,6 +48,7 @@ typedef enum {
 
 static outbound_strategy_t default_strategy = NODE_STRATEGY_RINGALL;
 
+static int marker = 1;
 
 typedef struct {
        int nelm;
@@ -57,6 +58,24 @@ typedef struct {
        switch_mutex_t *mutex;
 } fifo_queue_t;
 
+
+
+
+static int check_caller_outbound_call(const char *key);
+static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
+static void del_caller_outbound_call(const char *key);
+static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause);
+static int check_consumer_outbound_call(const char *key);
+static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
+static void del_consumer_outbound_call(const char *key);
+static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
+
+
+static int check_bridge_call(const char *key);
+static void add_bridge_call(const char *key);
+static void del_bridge_call(const char *key);
+
+
 switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool) 
 {
        fifo_queue_t *q;
@@ -127,24 +146,34 @@ static int fifo_queue_size(fifo_queue_t *queue)
 
 static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
 {
-       int i;
+       int i, j;
 
        switch_mutex_lock(queue->mutex);
 
        if (queue->idx == 0) {
                switch_mutex_unlock(queue->mutex);
-               *pop = NULL;
                return SWITCH_STATUS_FALSE;
        }
 
-       if (remove) {
-               *pop = queue->data[0];
-       } else {
-               switch_event_dup(pop, queue->data[0]);
+       for (j = 0; j < queue->idx; j++) {
+               const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
+               if (uuid && !check_caller_outbound_call(uuid)) {
+                       if (remove) {
+                               *pop = queue->data[j];
+                       } else {
+                               switch_event_dup(pop, queue->data[j]);
+                       }
+                       break;
+               }
        }
 
+       if (j == queue->idx) {
+               switch_mutex_unlock(queue->mutex);
+               return SWITCH_STATUS_FALSE;
+       }
+       
        if (remove) {
-               for (i = 1; i < queue->idx; i++) {
+               for (i = j+1; i < queue->idx; i++) {
                        queue->data[i-1] = queue->data[i];
                        queue->data[i] = NULL;
                        change_pos(queue->data[i-1], i);
@@ -162,10 +191,15 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
 
 static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
 {
-       int i, j;
+       int i, j, force = 0;
 
        switch_mutex_lock(queue->mutex);
 
+       if (name && *name == '+') {
+               name++;
+               force = 1;
+       }
+
        if (queue->idx == 0 || zstr(name) || zstr(val)) {
                switch_mutex_unlock(queue->mutex);
                return SWITCH_STATUS_FALSE;
@@ -173,7 +207,8 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
 
        for (j = 0; j < queue->idx; j++) {
                const char *j_val = switch_event_get_header(queue->data[j], name);
-               if (j_val && val && !strcmp(j_val, val)) {
+               const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
+               if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
 
                        if (remove) {
                                *pop = queue->data[j];
@@ -502,8 +537,12 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
 }
 
 static struct {
-       switch_hash_t *orig_hash;
-       switch_mutex_t *orig_mutex;
+       switch_hash_t *caller_orig_hash;
+       switch_hash_t *consumer_orig_hash;
+       switch_hash_t *bridge_hash;
+       switch_mutex_t *caller_orig_mutex;
+       switch_mutex_t *consumer_orig_mutex;
+       switch_mutex_t *bridge_mutex;
        switch_hash_t *fifo_hash;
        switch_mutex_t *mutex;
        switch_mutex_t *sql_mutex;
@@ -520,6 +559,115 @@ static struct {
 } globals;
 
 
+
+static int check_caller_outbound_call(const char *key)
+{
+       int x = 0;
+
+       switch_mutex_lock(globals.caller_orig_mutex);
+       x = !!switch_core_hash_find(globals.caller_orig_hash, key);
+       switch_mutex_unlock(globals.caller_orig_mutex);
+       return x;
+
+}
+
+
+static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
+{
+       switch_mutex_lock(globals.caller_orig_mutex);
+       switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause);
+       switch_mutex_unlock(globals.caller_orig_mutex);
+}
+
+static void del_caller_outbound_call(const char *key)
+{
+       switch_mutex_lock(globals.caller_orig_mutex);
+       switch_core_hash_delete(globals.caller_orig_hash, key);
+       switch_mutex_unlock(globals.caller_orig_mutex);
+}
+
+static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause)
+{
+       switch_call_cause_t *cancel_cause = NULL;
+
+       switch_mutex_lock(globals.caller_orig_mutex);
+    if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) {
+               *cancel_cause = cause;
+       }
+    switch_mutex_unlock(globals.caller_orig_mutex);
+
+       fifo_caller_del(key);
+
+}
+
+
+
+static int check_bridge_call(const char *key)
+{
+       int x = 0;
+
+       switch_mutex_lock(globals.bridge_mutex);
+       x = !!switch_core_hash_find(globals.bridge_hash, key);
+       switch_mutex_unlock(globals.bridge_mutex);
+       return x;
+
+}
+
+
+static void add_bridge_call(const char *key)
+{
+       switch_mutex_lock(globals.bridge_mutex);
+       switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker);
+       switch_mutex_unlock(globals.bridge_mutex);
+}
+
+static void del_bridge_call(const char *key)
+{
+       switch_mutex_lock(globals.bridge_mutex);
+       switch_core_hash_delete(globals.bridge_hash, key);
+       switch_mutex_unlock(globals.bridge_mutex);
+}
+
+
+static int check_consumer_outbound_call(const char *key)
+{
+       int x = 0;
+
+       switch_mutex_lock(globals.consumer_orig_mutex);
+       x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
+       switch_mutex_unlock(globals.consumer_orig_mutex);
+       return x;
+
+}
+
+static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
+{
+       switch_mutex_lock(globals.consumer_orig_mutex);
+       switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause);
+       switch_mutex_unlock(globals.consumer_orig_mutex);
+}
+
+static void del_consumer_outbound_call(const char *key)
+{
+       switch_mutex_lock(globals.consumer_orig_mutex);
+       switch_core_hash_delete(globals.consumer_orig_hash, key);
+       switch_mutex_unlock(globals.consumer_orig_mutex);
+}
+
+static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause)
+{
+       switch_call_cause_t *cancel_cause = NULL;
+
+       switch_mutex_lock(globals.consumer_orig_mutex);
+    if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) {
+               *cancel_cause = cause;
+       }
+    switch_mutex_unlock(globals.consumer_orig_mutex);
+
+}
+
+
+
 switch_cache_db_handle_t *fifo_get_db_handle(void)
 {
        switch_cache_db_connection_options_t options = { {0} };
@@ -633,7 +781,7 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
        switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
        cbt.buf = outbound_count;
        cbt.len = sizeof(outbound_count);
-       sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", name);
+       sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
        fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
        if (atoi(outbound_count) > 0) {
                node->has_outbound = 1;
@@ -686,6 +834,7 @@ struct callback_helper {
        switch_memory_pool_t *pool;
        struct call_helper *rows[MAX_ROWS];
        int rowcount;
+       int ready;
 };
 
 
@@ -699,6 +848,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
 
        consumer_session = session;
        consumer_channel = switch_core_session_get_channel(consumer_session);
+       outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
 
        switch (msg->message_id) {
     case SWITCH_MESSAGE_INDICATE_BRIDGE:
@@ -706,6 +856,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
                if ((caller_session = switch_core_session_locate(msg->string_arg))) {
                        caller_channel = switch_core_session_get_channel(caller_session);
                        if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) {
+                               cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
                                switch_core_session_soft_lock(caller_session, 5);
                        } else {
                                switch_core_session_soft_unlock(caller_session);
@@ -723,8 +874,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
     default:
                goto end;
     }
-       
-       outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
+
        
        switch (msg->message_id) {
        case SWITCH_MESSAGE_INDICATE_BRIDGE:
@@ -927,33 +1077,6 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
        return SWITCH_STATUS_SUCCESS;
 }
 
-static void add_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
-{
-       switch_mutex_lock(globals.orig_mutex);
-       switch_core_hash_insert(globals.orig_hash, key, cancel_cause);
-       switch_mutex_unlock(globals.orig_mutex);
-}
-
-static void del_outbound_call(const char *key)
-{
-       switch_mutex_lock(globals.orig_mutex);
-       switch_core_hash_delete(globals.orig_hash, key);
-       switch_mutex_unlock(globals.orig_mutex);
-}
-
-static void cancel_outbound_call(const char *key, switch_call_cause_t cause)
-{
-       switch_call_cause_t *cancel_cause = NULL;
-
-       switch_mutex_lock(globals.orig_mutex);
-    if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.orig_hash, key))) {
-               *cancel_cause = cause;
-       }
-    switch_mutex_unlock(globals.orig_mutex);
-
-       fifo_caller_del(key);
-
-}
 
 static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
 {
@@ -981,8 +1104,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
     char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
        switch_call_cause_t cancel_cause = 0;
        char *uuid_list = NULL;
-       int connected = 0;
+       int connected = 0, total = 0;
        const char *codec;
+       struct call_helper *rows[MAX_ROWS] = { 0 };
+       int rowcount = 0;
 
     switch_uuid_get(&uuid);
     switch_uuid_format(uuid_str, &uuid);
@@ -997,9 +1122,35 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        node = switch_core_hash_find(globals.fifo_hash, node_name);
        switch_mutex_unlock(globals.mutex);
 
+       for (i = 0; i < cbh->rowcount; i++) {
+               struct call_helper *h = cbh->rows[i];
+               
+               if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) {
+                       continue;
+               }
+
+               rows[rowcount++] = h;
+               add_consumer_outbound_call(h->uuid, &cancel_cause);
+               total++;
+       }
+
+       for (i = 0; i < rowcount; i++) {
+               struct call_helper *h = rows[i];
+               cbh->rows[i] = h;
+       }
+
+       cbh->rowcount = rowcount;
+       
+       cbh->ready = 1;
+       
+       if (!total) {
+               goto end;
+       }
+
+
        if (node) {
                switch_mutex_lock(node->mutex);
-               node->busy = switch_epoch_time_now(NULL) + 600;
+               //node->busy = switch_epoch_time_now(NULL) + 600;
                node->ring_consumer_count = 1;
                switch_mutex_unlock(node->mutex);
        } else {
@@ -1113,11 +1264,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        for (i = 0; i < cbh->rowcount; i++) {
                struct call_helper *h = cbh->rows[i];
                char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
+               
                fifo_execute_sql(sql, globals.sql_mutex);
                switch_safe_free(sql);
                
        }
 
+       if (!total) goto end;
+
        if ((codec = switch_event_get_header(pop, "variable_sip_use_codec_name"))) {
                const char *rate = switch_event_get_header(pop, "variable_sip_use_codec_rate");
                const char *ptime = switch_event_get_header(pop, "variable_sip_use_codec_ptime");
@@ -1132,11 +1286,11 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr);
        }
 
-       add_outbound_call(id, &cancel_cause);
+       add_caller_outbound_call(id, &cancel_cause);
 
        status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause);
 
-       del_outbound_call(id);
+       del_caller_outbound_call(id);
 
        
        if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
@@ -1217,7 +1371,12 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        switch_channel_set_caller_extension(channel, extension);
        switch_channel_set_state(channel, CS_EXECUTE);
        switch_channel_wait_for_state(channel, NULL, CS_EXECUTE);
+       switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL);
+
        switch_core_session_rwunlock(session);
+
+
+
        
        for (i = 0; i < cbh->rowcount; i++) {
                struct call_helper *h = cbh->rows[i];
@@ -1228,10 +1387,19 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
 
   end:
 
+       cbh->ready = 1;
+
+       for (i = 0; i < cbh->rowcount; i++) {
+               struct call_helper *h = cbh->rows[i];
+               del_consumer_outbound_call(h->uuid);
+       }
+
        switch_safe_free(originate_string);
        switch_safe_free(uuid_list);
 
-       switch_event_destroy(&ovars);
+       if (ovars) {
+               switch_event_destroy(&ovars);
+       }
 
        if (pop_dup) {
                switch_event_destroy(&pop_dup);
@@ -1240,7 +1408,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        if (node) {
                switch_mutex_lock(node->mutex);
                node->ring_consumer_count = 0;
-               node->busy = switch_epoch_time_now(NULL) + connected;
+               //node->busy = switch_epoch_time_now(NULL) + connected;
                switch_mutex_unlock(node->mutex);
        }
 
@@ -1273,7 +1441,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
        if (node) {
                switch_mutex_lock(node->mutex);
                node->ring_consumer_count++;
-               node->busy = switch_epoch_time_now(NULL) + 600;
+               //node->busy = switch_epoch_time_now(NULL) + 600;
                switch_mutex_unlock(node->mutex);
        }
 
@@ -1375,7 +1543,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
                if (node->ring_consumer_count-- < 0) {
                        node->ring_consumer_count = 0;
                }
-               node->busy = switch_epoch_time_now(NULL) + connected;
+               //node->busy = switch_epoch_time_now(NULL) + connected;
                switch_mutex_unlock(node->mutex);
        }
        switch_core_destroy_memory_pool(&h->pool);
@@ -1484,10 +1652,15 @@ static void find_consumers(fifo_node_t *node)
                        fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
 
                        if (cbh->rowcount) {
+                               int sanity = 40;
+
                                switch_threadattr_create(&thd_attr, cbh->pool);
                                switch_threadattr_detach_set(thd_attr, 1);
                                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                                switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
+                               while(--sanity > 0 && !cbh->ready) {
+                                       switch_yield(100000);
+                               }
                        }
 
                }
@@ -1517,16 +1690,18 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
                        switch_hash_this(hi, &var, NULL, &val);
                        if ((node = (fifo_node_t *) val)) {
                                switch_mutex_lock(node->mutex);
-                               if (node->has_outbound && node->ready && switch_epoch_time_now(NULL) > node->busy) {
+                               if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) {
                                        ppl_waiting = node_consumer_wait_count(node);
                                        consumer_total = node->consumer_count;
                                        idle_consumers = node_idle_consumers(node);
-
-                                       /* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, 
-                                          "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */
+                                       
+                                       //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG5, 
+                                       //"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", 
+                                       //node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); 
 
                                        if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
                                                find_consumers(node);
+                                               switch_yield(1000000);
                                        }
                                }
                                switch_mutex_unlock(node->mutex);
@@ -1579,7 +1754,7 @@ static void check_ocancel(switch_core_session_t *session)
 
        //channel = switch_core_session_get_channel(session);
 
-       cancel_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
+       cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
 
 }
 
@@ -1693,6 +1868,13 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
 
 }
 
+SWITCH_STANDARD_API(fifo_check_bridge_function)
+{
+       stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false");
+
+       return SWITCH_STATUS_SUCCESS;
+}
+
 SWITCH_STANDARD_API(fifo_add_outbound_function)
 {
        char *data = NULL, *argv[4] = { 0 };
@@ -1739,6 +1921,7 @@ static void dec_use_count(switch_channel_t *channel)
        long now = (long) switch_epoch_time_now(NULL);
 
        if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
+               del_bridge_call(outbound_id);
                sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'", 
                                                         now, now, outbound_id);
                fifo_execute_sql(sql, globals.sql_mutex);
@@ -1780,6 +1963,8 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
                return;
        }
 
+       add_bridge_call(data);
+
        switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
        
        if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
@@ -2344,15 +2529,29 @@ SWITCH_STANDARD_APP(fifo_function)
                        }
 
                        if (node) {
-                               const char *varval;
+                               const char *varval, *check = NULL;
 
+                               check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
+                               
                                if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
+                                       if (check_bridge_call(varval) && switch_true(check)) {
+                                               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
+                                                                                 switch_channel_get_name(channel));
+                                               goto done;
+                                       }
+
                                        for (x = 0; x < MAX_PRI; x++) {
-                                               if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
-                                                       cancel_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
+                                               if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
+                                                       cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
                                                        break;
                                                }
                                        }
+                                       if (!pop && switch_true(check)) {
+                                               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
+                                                                                 switch_channel_get_name(channel));
+                                               
+                                               goto done;
+                                       }
                                }
 
                                if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
@@ -2582,12 +2781,18 @@ SWITCH_STANDARD_APP(fifo_function)
                                }
 
                                if (outbound_id) {
+                                       cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
+                                       add_bridge_call(outbound_id);
+
                                        sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'", 
                                                                                 switch_epoch_time_now(NULL), outbound_id);
                                        fifo_execute_sql(sql, globals.sql_mutex);
                                        switch_safe_free(sql);
                                }
 
+                               add_bridge_call(switch_core_session_get_uuid(other_session));
+                               add_bridge_call(switch_core_session_get_uuid(session));
+
                                sql = switch_mprintf("insert into fifo_bridge "
                                                                         "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
                                                                         "values ('%q','%q','%q','%q','%q','%q',%ld)",
@@ -2604,6 +2809,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                fifo_execute_sql(sql, globals.sql_mutex);
                                switch_safe_free(sql);
 
+                               
                                switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
 
                                if (outbound_id) {
@@ -2616,8 +2822,13 @@ SWITCH_STANDARD_APP(fifo_function)
 
                                        fifo_execute_sql(sql, globals.sql_mutex);
                                        switch_safe_free(sql);
+
+                                       del_bridge_call(outbound_id);
+
                                }
 
+                               del_bridge_call(switch_core_session_get_uuid(session));
+                               del_bridge_call(switch_core_session_get_uuid(other_session));
 
 
                                if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
@@ -3779,7 +3990,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
 
        cbt.buf = outbound_count;
        cbt.len = sizeof(outbound_count);
-       sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", node->name);
+       sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
        fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
        if (atoi(outbound_count) > 0) {
                node->has_outbound = 1;
@@ -3895,8 +4106,12 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
        switch_core_new_memory_pool(&globals.pool);
        switch_core_hash_init(&globals.fifo_hash, globals.pool);
 
-       switch_core_hash_init(&globals.orig_hash, globals.pool);
-       switch_mutex_init(&globals.orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+       switch_core_hash_init(&globals.caller_orig_hash, globals.pool);
+       switch_core_hash_init(&globals.consumer_orig_hash, globals.pool);
+       switch_core_hash_init(&globals.bridge_hash, globals.pool);
+       switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+       switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+       switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool);
 
        switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
        switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool);
@@ -3919,11 +4134,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
        SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX);
        SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX);
        SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, "<node> <url> [<priority>]");
+       SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "<uuid>|<outbound_id>");
        switch_console_set_complete("add fifo list");
        switch_console_set_complete("add fifo list_verbose");
        switch_console_set_complete("add fifo count");
        switch_console_set_complete("add fifo has_outbound");
        switch_console_set_complete("add fifo importance");
+       switch_console_set_complete("add fifo_check_bridge ::console::list_uuid");
 
        start_node_thread(globals.pool);
 
@@ -3960,12 +4177,13 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
                node = (fifo_node_t *) val;
 
                switch_thread_rwlock_wrlock(node->rwlock);
+               switch_mutex_lock(node->mutex);
                for (x = 0; x < MAX_PRI; x++) {
                        while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
                                switch_event_destroy(&pop);
                        }
                }
-
+               switch_mutex_unlock(node->mutex);
                switch_core_hash_delete(globals.fifo_hash, node->name);
                switch_core_hash_destroy(&node->consumer_hash);
                switch_thread_rwlock_unlock(node->rwlock);
index 97cb5e8d701bac925844655f97ae5f49c33d9c80..811383817ca55398624b8c7f0d7e15a2316fe319 100644 (file)
@@ -1627,7 +1627,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
 {
        switch_api_interface_t *api;
        switch_status_t status;
-
+       char *myarg = NULL, *argp = NULL;
+       
        switch_assert(stream != NULL);
        switch_assert(stream->data != NULL);
        switch_assert(stream->write_function != NULL);
@@ -1636,18 +1637,25 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
                switch_event_create(&stream->param_event, SWITCH_EVENT_API);
        }
 
+       if (arg) {
+               myarg = strdup(arg);
+               argp = myarg;
+               while(*argp == ' ') argp++;
+               while(end_of(argp) == ' ') end_of(argp) = '\0';
+       }
+
        if (stream->param_event) {
                if (cmd) {
                        switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command", cmd);
                }
-               if (arg) {
-                       switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", arg);
+               if (argp) {
+                       switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", argp);
                }
        }
 
 
        if (cmd && (api = switch_loadable_module_get_api_interface(cmd)) != 0) {
-               if ((status = api->function(arg, session, stream)) != SWITCH_STATUS_SUCCESS) {
+               if ((status = api->function(argp, session, stream)) != SWITCH_STATUS_SUCCESS) {
                        stream->write_function(stream, "COMMAND RETURNED ERROR!\n");
                }
                UNPROTECT_INTERFACE(api);
@@ -1660,7 +1668,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
                switch_event_fire(&stream->param_event);
        }
 
-
+       switch_safe_free(myarg);
+       
        return status;
 }