static outbound_strategy_t default_strategy = NODE_STRATEGY_RINGALL;
+static int marker = 1;
typedef struct {
int nelm;
switch_mutex_t *mutex;
} fifo_queue_t;
+
+
+
+static int check_caller_outbound_call(const char *key);
+static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
+static void del_caller_outbound_call(const char *key);
+static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause);
+static int check_consumer_outbound_call(const char *key);
+static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
+static void del_consumer_outbound_call(const char *key);
+static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
+
+
+static int check_bridge_call(const char *key);
+static void add_bridge_call(const char *key);
+static void del_bridge_call(const char *key);
+
+
switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
{
fifo_queue_t *q;
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
{
- int i;
+ int i, j;
switch_mutex_lock(queue->mutex);
if (queue->idx == 0) {
switch_mutex_unlock(queue->mutex);
- *pop = NULL;
return SWITCH_STATUS_FALSE;
}
- if (remove) {
- *pop = queue->data[0];
- } else {
- switch_event_dup(pop, queue->data[0]);
+ for (j = 0; j < queue->idx; j++) {
+ const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
+ if (uuid && !check_caller_outbound_call(uuid)) {
+ if (remove) {
+ *pop = queue->data[j];
+ } else {
+ switch_event_dup(pop, queue->data[j]);
+ }
+ break;
+ }
}
+ if (j == queue->idx) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
if (remove) {
- for (i = 1; i < queue->idx; i++) {
+ for (i = j+1; i < queue->idx; i++) {
queue->data[i-1] = queue->data[i];
queue->data[i] = NULL;
change_pos(queue->data[i-1], i);
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
{
- int i, j;
+ int i, j, force = 0;
switch_mutex_lock(queue->mutex);
+ if (name && *name == '+') {
+ name++;
+ force = 1;
+ }
+
if (queue->idx == 0 || zstr(name) || zstr(val)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
for (j = 0; j < queue->idx; j++) {
const char *j_val = switch_event_get_header(queue->data[j], name);
- if (j_val && val && !strcmp(j_val, val)) {
+ const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
+ if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
if (remove) {
*pop = queue->data[j];
}
static struct {
- switch_hash_t *orig_hash;
- switch_mutex_t *orig_mutex;
+ switch_hash_t *caller_orig_hash;
+ switch_hash_t *consumer_orig_hash;
+ switch_hash_t *bridge_hash;
+ switch_mutex_t *caller_orig_mutex;
+ switch_mutex_t *consumer_orig_mutex;
+ switch_mutex_t *bridge_mutex;
switch_hash_t *fifo_hash;
switch_mutex_t *mutex;
switch_mutex_t *sql_mutex;
} globals;
+
+static int check_caller_outbound_call(const char *key)
+{
+ int x = 0;
+
+ switch_mutex_lock(globals.caller_orig_mutex);
+ x = !!switch_core_hash_find(globals.caller_orig_hash, key);
+ switch_mutex_unlock(globals.caller_orig_mutex);
+ return x;
+
+}
+
+
+static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
+{
+ switch_mutex_lock(globals.caller_orig_mutex);
+ switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause);
+ switch_mutex_unlock(globals.caller_orig_mutex);
+}
+
+static void del_caller_outbound_call(const char *key)
+{
+ switch_mutex_lock(globals.caller_orig_mutex);
+ switch_core_hash_delete(globals.caller_orig_hash, key);
+ switch_mutex_unlock(globals.caller_orig_mutex);
+}
+
+static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause)
+{
+ switch_call_cause_t *cancel_cause = NULL;
+
+ switch_mutex_lock(globals.caller_orig_mutex);
+ if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) {
+ *cancel_cause = cause;
+ }
+ switch_mutex_unlock(globals.caller_orig_mutex);
+
+ fifo_caller_del(key);
+
+}
+
+
+
+static int check_bridge_call(const char *key)
+{
+ int x = 0;
+
+ switch_mutex_lock(globals.bridge_mutex);
+ x = !!switch_core_hash_find(globals.bridge_hash, key);
+ switch_mutex_unlock(globals.bridge_mutex);
+ return x;
+
+}
+
+
+static void add_bridge_call(const char *key)
+{
+ switch_mutex_lock(globals.bridge_mutex);
+ switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker);
+ switch_mutex_unlock(globals.bridge_mutex);
+}
+
+static void del_bridge_call(const char *key)
+{
+ switch_mutex_lock(globals.bridge_mutex);
+ switch_core_hash_delete(globals.bridge_hash, key);
+ switch_mutex_unlock(globals.bridge_mutex);
+}
+
+
+static int check_consumer_outbound_call(const char *key)
+{
+ int x = 0;
+
+ switch_mutex_lock(globals.consumer_orig_mutex);
+ x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
+ switch_mutex_unlock(globals.consumer_orig_mutex);
+ return x;
+
+}
+
+static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
+{
+ switch_mutex_lock(globals.consumer_orig_mutex);
+ switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause);
+ switch_mutex_unlock(globals.consumer_orig_mutex);
+}
+
+static void del_consumer_outbound_call(const char *key)
+{
+ switch_mutex_lock(globals.consumer_orig_mutex);
+ switch_core_hash_delete(globals.consumer_orig_hash, key);
+ switch_mutex_unlock(globals.consumer_orig_mutex);
+}
+
+static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause)
+{
+ switch_call_cause_t *cancel_cause = NULL;
+
+ switch_mutex_lock(globals.consumer_orig_mutex);
+ if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) {
+ *cancel_cause = cause;
+ }
+ switch_mutex_unlock(globals.consumer_orig_mutex);
+
+}
+
+
+
switch_cache_db_handle_t *fifo_get_db_handle(void)
{
switch_cache_db_connection_options_t options = { {0} };
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
- sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", name);
+ sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
if (atoi(outbound_count) > 0) {
node->has_outbound = 1;
switch_memory_pool_t *pool;
struct call_helper *rows[MAX_ROWS];
int rowcount;
+ int ready;
};
consumer_session = session;
consumer_channel = switch_core_session_get_channel(consumer_session);
+ outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_BRIDGE:
if ((caller_session = switch_core_session_locate(msg->string_arg))) {
caller_channel = switch_core_session_get_channel(caller_session);
if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) {
+ cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
switch_core_session_soft_lock(caller_session, 5);
} else {
switch_core_session_soft_unlock(caller_session);
default:
goto end;
}
-
- outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
+
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_BRIDGE:
return SWITCH_STATUS_SUCCESS;
}
-static void add_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
-{
- switch_mutex_lock(globals.orig_mutex);
- switch_core_hash_insert(globals.orig_hash, key, cancel_cause);
- switch_mutex_unlock(globals.orig_mutex);
-}
-
-static void del_outbound_call(const char *key)
-{
- switch_mutex_lock(globals.orig_mutex);
- switch_core_hash_delete(globals.orig_hash, key);
- switch_mutex_unlock(globals.orig_mutex);
-}
-
-static void cancel_outbound_call(const char *key, switch_call_cause_t cause)
-{
- switch_call_cause_t *cancel_cause = NULL;
-
- switch_mutex_lock(globals.orig_mutex);
- if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.orig_hash, key))) {
- *cancel_cause = cause;
- }
- switch_mutex_unlock(globals.orig_mutex);
-
- fifo_caller_del(key);
-
-}
static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
{
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_call_cause_t cancel_cause = 0;
char *uuid_list = NULL;
- int connected = 0;
+ int connected = 0, total = 0;
const char *codec;
+ struct call_helper *rows[MAX_ROWS] = { 0 };
+ int rowcount = 0;
switch_uuid_get(&uuid);
switch_uuid_format(uuid_str, &uuid);
node = switch_core_hash_find(globals.fifo_hash, node_name);
switch_mutex_unlock(globals.mutex);
+ for (i = 0; i < cbh->rowcount; i++) {
+ struct call_helper *h = cbh->rows[i];
+
+ if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) {
+ continue;
+ }
+
+ rows[rowcount++] = h;
+ add_consumer_outbound_call(h->uuid, &cancel_cause);
+ total++;
+ }
+
+ for (i = 0; i < rowcount; i++) {
+ struct call_helper *h = rows[i];
+ cbh->rows[i] = h;
+ }
+
+ cbh->rowcount = rowcount;
+
+ cbh->ready = 1;
+
+ if (!total) {
+ goto end;
+ }
+
+
if (node) {
switch_mutex_lock(node->mutex);
- node->busy = switch_epoch_time_now(NULL) + 600;
+ //node->busy = switch_epoch_time_now(NULL) + 600;
node->ring_consumer_count = 1;
switch_mutex_unlock(node->mutex);
} else {
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
+
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
}
+ if (!total) goto end;
+
if ((codec = switch_event_get_header(pop, "variable_sip_use_codec_name"))) {
const char *rate = switch_event_get_header(pop, "variable_sip_use_codec_rate");
const char *ptime = switch_event_get_header(pop, "variable_sip_use_codec_ptime");
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr);
}
- add_outbound_call(id, &cancel_cause);
+ add_caller_outbound_call(id, &cancel_cause);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause);
- del_outbound_call(id);
+ del_caller_outbound_call(id);
if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
switch_channel_set_caller_extension(channel, extension);
switch_channel_set_state(channel, CS_EXECUTE);
switch_channel_wait_for_state(channel, NULL, CS_EXECUTE);
+ switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL);
+
switch_core_session_rwunlock(session);
+
+
+
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
end:
+ cbh->ready = 1;
+
+ for (i = 0; i < cbh->rowcount; i++) {
+ struct call_helper *h = cbh->rows[i];
+ del_consumer_outbound_call(h->uuid);
+ }
+
switch_safe_free(originate_string);
switch_safe_free(uuid_list);
- switch_event_destroy(&ovars);
+ if (ovars) {
+ switch_event_destroy(&ovars);
+ }
if (pop_dup) {
switch_event_destroy(&pop_dup);
if (node) {
switch_mutex_lock(node->mutex);
node->ring_consumer_count = 0;
- node->busy = switch_epoch_time_now(NULL) + connected;
+ //node->busy = switch_epoch_time_now(NULL) + connected;
switch_mutex_unlock(node->mutex);
}
if (node) {
switch_mutex_lock(node->mutex);
node->ring_consumer_count++;
- node->busy = switch_epoch_time_now(NULL) + 600;
+ //node->busy = switch_epoch_time_now(NULL) + 600;
switch_mutex_unlock(node->mutex);
}
if (node->ring_consumer_count-- < 0) {
node->ring_consumer_count = 0;
}
- node->busy = switch_epoch_time_now(NULL) + connected;
+ //node->busy = switch_epoch_time_now(NULL) + connected;
switch_mutex_unlock(node->mutex);
}
switch_core_destroy_memory_pool(&h->pool);
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
if (cbh->rowcount) {
+ int sanity = 40;
+
switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
+ while(--sanity > 0 && !cbh->ready) {
+ switch_yield(100000);
+ }
}
}
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) {
switch_mutex_lock(node->mutex);
- if (node->has_outbound && node->ready && switch_epoch_time_now(NULL) > node->busy) {
+ if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) {
ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count;
idle_consumers = node_idle_consumers(node);
-
- /* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
- "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */
+
+ //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG5,
+ //"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n",
+ //node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count);
if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
find_consumers(node);
+ switch_yield(1000000);
}
}
switch_mutex_unlock(node->mutex);
//channel = switch_core_session_get_channel(session);
- cancel_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
+ cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
}
}
+SWITCH_STANDARD_API(fifo_check_bridge_function)
+{
+ stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false");
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
SWITCH_STANDARD_API(fifo_add_outbound_function)
{
char *data = NULL, *argv[4] = { 0 };
long now = (long) switch_epoch_time_now(NULL);
if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
+ del_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'",
now, now, outbound_id);
fifo_execute_sql(sql, globals.sql_mutex);
return;
}
+ add_bridge_call(data);
+
switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
}
if (node) {
- const char *varval;
+ const char *varval, *check = NULL;
+ check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
+
if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
+ if (check_bridge_call(varval) && switch_true(check)) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
+ switch_channel_get_name(channel));
+ goto done;
+ }
+
for (x = 0; x < MAX_PRI; x++) {
- if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
- cancel_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
+ if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
+ cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
break;
}
}
+ if (!pop && switch_true(check)) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
+ switch_channel_get_name(channel));
+
+ goto done;
+ }
}
if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
}
if (outbound_id) {
+ cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
+ add_bridge_call(outbound_id);
+
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'",
switch_epoch_time_now(NULL), outbound_id);
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
}
+ add_bridge_call(switch_core_session_get_uuid(other_session));
+ add_bridge_call(switch_core_session_get_uuid(session));
+
sql = switch_mprintf("insert into fifo_bridge "
"(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
"values ('%q','%q','%q','%q','%q','%q',%ld)",
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
+
switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
if (outbound_id) {
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
+
+ del_bridge_call(outbound_id);
+
}
+ del_bridge_call(switch_core_session_get_uuid(session));
+ del_bridge_call(switch_core_session_get_uuid(other_session));
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
- sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", node->name);
+ sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
if (atoi(outbound_count) > 0) {
node->has_outbound = 1;
switch_core_new_memory_pool(&globals.pool);
switch_core_hash_init(&globals.fifo_hash, globals.pool);
- switch_core_hash_init(&globals.orig_hash, globals.pool);
- switch_mutex_init(&globals.orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+ switch_core_hash_init(&globals.caller_orig_hash, globals.pool);
+ switch_core_hash_init(&globals.consumer_orig_hash, globals.pool);
+ switch_core_hash_init(&globals.bridge_hash, globals.pool);
+ switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+ switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
+ switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool);
SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX);
SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX);
SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, "<node> <url> [<priority>]");
+ SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "<uuid>|<outbound_id>");
switch_console_set_complete("add fifo list");
switch_console_set_complete("add fifo list_verbose");
switch_console_set_complete("add fifo count");
switch_console_set_complete("add fifo has_outbound");
switch_console_set_complete("add fifo importance");
+ switch_console_set_complete("add fifo_check_bridge ::console::list_uuid");
start_node_thread(globals.pool);
node = (fifo_node_t *) val;
switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
-
+ switch_mutex_unlock(node->mutex);
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);