static switch_status_t load_config(int reload, int del_all);
#define MAX_PRI 10
+typedef enum {
+ NODE_STRATEGY_INVALID = -1,
+ NODE_STRATEGY_RINGALL = 0,
+ NODE_STRATEGY_ENTERPRISE
+} outbound_strategy_t;
+
+static outbound_strategy_t default_strategy = NODE_STRATEGY_ENTERPRISE;
+
+
+typedef struct {
+ int nelm;
+ int idx;
+ switch_event_t **data;
+ switch_memory_pool_t *pool;
+ switch_mutex_t *mutex;
+} fifo_queue_t;
+
+switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
+{
+ fifo_queue_t *q;
+
+ q = switch_core_alloc(pool, sizeof(*q));
+ q->pool = pool;
+ q->nelm = size - 1;
+ q->data = switch_core_alloc(pool, size * sizeof(switch_event_t *));
+ switch_mutex_init(&q->mutex, SWITCH_MUTEX_NESTED, pool);
+
+ *queue = q;
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
+static void change_pos(switch_event_t *event, int pos)
+{
+ const char *uuid = switch_event_get_header(event, "unique-id");
+ switch_core_session_t *session;
+ switch_channel_t *channel;
+ char tmp[30] = "";
+
+ if (zstr(uuid)) return;
+
+ if (!(session = switch_core_session_locate(uuid))) {
+ return;
+ }
+
+ channel = switch_core_session_get_channel(session);
+
+ switch_snprintf(tmp, sizeof(tmp), "%d", pos);
+ switch_channel_set_variable(channel, "fifo_position", tmp);
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp);
+
+ switch_core_session_rwunlock(session);
+
+
+}
+
+static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
+{
+ switch_mutex_lock(queue->mutex);
+
+ if (queue->idx == queue->nelm) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ queue->data[queue->idx++] = ptr;
+
+
+ switch_mutex_unlock(queue->mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+
+}
+
+static int fifo_queue_size(fifo_queue_t *queue)
+{
+ int s;
+ switch_mutex_lock(queue->mutex);
+ s = queue->idx;
+ switch_mutex_unlock(queue->mutex);
+
+ return s;
+}
+
+static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
+{
+ int i;
+
+ switch_mutex_lock(queue->mutex);
+
+ if (queue->idx == 0) {
+ switch_mutex_unlock(queue->mutex);
+ *pop = NULL;
+ return SWITCH_STATUS_FALSE;
+ }
+
+ *pop = queue->data[0];
+
+ if (remove) {
+ for (i = 1; i < queue->idx; i++) {
+ queue->data[i-1] = queue->data[i];
+ queue->data[i] = NULL;
+ change_pos(queue->data[i-1], i);
+ }
+
+ queue->idx--;
+ }
+
+ switch_mutex_unlock(queue->mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+
+}
+
+
+static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
+{
+ int i, j;
+
+ switch_mutex_lock(queue->mutex);
+
+ if (queue->idx == 0 || zstr(name) || zstr(val)) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ for (j = 0; j < queue->idx; j++) {
+ const char *j_val = switch_event_get_header(queue->data[j], name);
+ if (j_val && val && !strcmp(j_val, val)) {
+ *pop = queue->data[j];
+ break;
+ }
+ }
+
+ if (j == queue->idx) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ if (remove) {
+ for (i = j+1; i < queue->idx; i++) {
+ queue->data[i-1] = queue->data[i];
+ queue->data[i] = NULL;
+ change_pos(queue->data[i-1], i);
+ }
+
+ queue->idx--;
+ }
+
+ switch_mutex_unlock(queue->mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
+static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
+{
+ int i, j;
+
+ switch_mutex_lock(queue->mutex);
+
+ if (queue->idx == 0 || zstr(uuid)) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ for (j = 0; j < queue->idx; j++) {
+ const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
+ if (j_uuid && !strcmp(j_uuid, uuid)) break;
+ }
+
+ if (j == queue->idx) {
+ switch_mutex_unlock(queue->mutex);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ for (i = j+1; i < queue->idx; i++) {
+ queue->data[i-1] = queue->data[i];
+ queue->data[i] = NULL;
+ change_pos(queue->data[i-1], i);
+ }
+
+ queue->idx--;
+
+ switch_mutex_unlock(queue->mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+
+}
+
+
+
struct fifo_node {
char *name;
switch_mutex_t *mutex;
- switch_queue_t *fifo_list[MAX_PRI];
- switch_hash_t *caller_hash;
+ fifo_queue_t *fifo_list[MAX_PRI];
switch_hash_t *consumer_hash;
int caller_count;
int consumer_count;
switch_memory_pool_t *pool;
int has_outbound;
int ready;
+ int busy;
int is_static;
+ outbound_strategy_t outbound_strategy;
};
typedef struct fifo_node fifo_node_t;
};
typedef struct callback callback_t;
+static const char *strat_parse(outbound_strategy_t s)
+{
+ switch (s) {
+ case NODE_STRATEGY_RINGALL:
+ return "ringall";
+ case NODE_STRATEGY_ENTERPRISE:
+ return "enterprise";
+ default:
+ break;
+ }
+
+ return "invalid";
+}
+
+static outbound_strategy_t parse_strat(const char *name)
+{
+ if (!strcasecmp(name, "ringall")) {
+ return NODE_STRATEGY_RINGALL;
+ }
+
+ if (!strcasecmp(name, "enterprise")) {
+ return NODE_STRATEGY_ENTERPRISE;
+ }
+
+ return NODE_STRATEGY_INVALID;
+}
+
static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
{
callback_t *cbt = (callback_t *) pArg;
int i, len = 0;
for (i = 0; i < MAX_PRI; i++) {
- len += switch_queue_size(node->fifo_list[i]);
+ len += fifo_queue_size(node->fifo_list[i]);
}
return len;
static void node_remove_uuid(fifo_node_t *node, const char *uuid)
{
- int i, len = 0, done = 0;
- void *pop = NULL;
+ int i = 0;
for (i = 0; i < MAX_PRI; i++) {
- if (!(len = switch_queue_size(node->fifo_list[i]))) {
- continue;
- }
- while (len) {
- if (switch_queue_trypop(node->fifo_list[i], &pop) == SWITCH_STATUS_SUCCESS && pop) {
- if (!done && !strcmp((char *) pop, uuid)) {
- free(pop);
- done++;
- } else {
- switch_queue_push(node->fifo_list[i], pop);
- }
- }
- len--;
- }
+ fifo_queue_popfly(node->fifo_list[i], uuid);
}
if (!node_consumer_wait_count(node)) {
break;
}
for (x = 0; x < MAX_PRI; x++) {
- total += switch_queue_size(node->fifo_list[x]);
+ total += fifo_queue_size(node->fifo_list[x]);
}
}
options.odbc_options.user = globals.odbc_user;
options.odbc_options.pass = globals.odbc_pass;
- if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS)
+ if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS) {
dbh = NULL;
+ }
return dbh;
} else {
options.core_db_options.db_path = globals.dbname;
- if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS)
+ if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS) {
dbh = NULL;
+ }
return dbh;
}
}
node = switch_core_alloc(pool, sizeof(*node));
node->pool = pool;
-
+ node->outbound_strategy = default_strategy;
node->name = switch_core_strdup(node->pool, name);
for (x = 0; x < MAX_PRI; x++) {
- switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool);
+ fifo_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool);
switch_assert(node->fifo_list[x]);
}
- switch_core_hash_init(&node->caller_hash, node->pool);
+
switch_core_hash_init(&node->consumer_hash, node->pool);
switch_thread_rwlock_create(&node->rwlock, node->pool);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
switch_memory_pool_t *pool;
};
+#define MAX_ROWS 2048
+struct callback_helper {
+ int need;
+ switch_memory_pool_t *pool;
+ struct call_helper *rows[MAX_ROWS];
+ int rowcount;
+};
+
+
+
+static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
+{
+ struct callback_helper *cbh = (struct callback_helper *) obj;
+ char *node_name;
+ int i = 0;
+ int timeout = 0;
+ char sql[256] = "";
+ switch_stream_handle_t stream = { 0 };
+ fifo_node_t *node = NULL;
+ char *originate_string;
+ switch_event_t *ovars = NULL;
+ switch_status_t status;
+ switch_core_session_t *session = NULL;
+ switch_call_cause_t cause = SWITCH_CAUSE_NONE;
+ char *app_name = NULL, *arg = NULL;
+ switch_caller_extension_t *extension = NULL;
+ switch_channel_t *channel;
+ char *cid_name = NULL, *cid_num = NULL, *id = NULL;
+ switch_event_t *pop = NULL;
+ fifo_queue_t *q = NULL;
+ int x = 0;
+
+ if (!cbh->rowcount) {
+ goto end;
+ }
+
+ node_name = cbh->rows[0]->node_name;
+
+ switch_mutex_lock(globals.mutex);
+ node = switch_core_hash_find(globals.fifo_hash, node_name);
+ switch_mutex_unlock(globals.mutex);
+
+ if (node) {
+ switch_mutex_lock(node->mutex);
+ node->busy = 1;
+ node->ring_consumer_count++;
+ switch_mutex_unlock(node->mutex);
+ } else {
+ goto end;
+ }
+
+ SWITCH_STANDARD_STREAM(stream);
+
+ switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
+ switch_assert(ovars);
+
+
+ for (i = 0; i < cbh->rowcount; i++) {
+ struct call_helper *h = cbh->rows[i];
+ char *parsed = NULL;
+
+ switch_event_create_brackets(h->originate_string, '{', '}', ',', &ovars, &parsed);
+
+ if (!h->timeout) h->timeout = 60;
+ if (timeout < h->timeout) timeout = h->timeout;
+
+ stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s]%s,", h->timeout, h->uuid, parsed ? parsed : h->originate_string);
+ switch_safe_free(parsed);
+
+ switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count+1 where uuid='%s'", h->uuid);
+ fifo_execute_sql(sql, globals.sql_mutex);
+ }
+
+
+ originate_string = (char *) stream.data;
+
+ if (originate_string) {
+ end_of(originate_string) = '\0';
+ }
+
+ if (!timeout) timeout = 60;
+
+ for (x = 0; x < MAX_PRI; x++) {
+ q = node->fifo_list[x];
+ switch_mutex_lock(q->mutex);
+
+ if (fifo_queue_pop_nameval(q, "variable_fifo_vip", "true", &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) {
+ goto found;
+ }
+ switch_mutex_unlock(q->mutex);
+ q = NULL;
+ }
+
+ if (!pop) {
+ for (x = 0; x < MAX_PRI; x++) {
+ q = node->fifo_list[x];
+ switch_mutex_lock(q->mutex);
+
+ if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) {
+ goto found;
+ }
+ }
+ switch_mutex_unlock(q->mutex);
+ q = NULL;
+ }
+
+ found:
+
+
+ if (!q) goto end;
+
+ if (!pop) {
+ if (q) switch_mutex_unlock(q->mutex);
+ goto end;
+ }
+
+ if ((cid_name = switch_event_get_header(pop, "caller-caller-id-name"))) {
+ switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", cid_name);
+ }
+
+ if ((cid_num = switch_event_get_header(pop, "caller-caller-id-number"))) {
+ switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_number", cid_num);
+ }
+
+ if ((id = switch_event_get_header(pop, "unique-id"))) {
+ switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_bridge_uuid", id);
+ }
+
+ switch_mutex_unlock(q->mutex);
+
+ status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
+ free(originate_string);
+
+ if (status != SWITCH_STATUS_SUCCESS) {
+ for (i = 0; i < cbh->rowcount; i++) {
+ struct call_helper *h = cbh->rows[i];
+ switch_snprintf(sql, sizeof(sql),
+ "update fifo_outbound set use_count=use_count-1, outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%s'",
+ (long) switch_epoch_time_now(NULL), h->uuid);
+ fifo_execute_sql(sql, globals.sql_mutex);
+ }
+ goto end;
+ }
+
+ channel = switch_core_session_get_channel(session);
+ switch_channel_set_variable(channel, "fifo_pop_order", NULL);
+
+ switch_core_event_hook_add_state_change(session, hanguphook);
+ app_name = "fifo";
+ arg = switch_core_session_sprintf(session, "%s out wait", node_name);
+ extension = switch_caller_extension_new(session, app_name, arg);
+ switch_caller_extension_add_application(session, extension, app_name, arg);
+ switch_channel_set_caller_extension(channel, extension);
+ switch_channel_set_state(channel, CS_EXECUTE);
+ switch_core_session_rwunlock(session);
+
+ end:
+
+ switch_event_destroy(&ovars);
+ if (node) {
+ switch_mutex_lock(node->mutex);
+ if (node->ring_consumer_count-- < 0) {
+ node->ring_consumer_count = 0;
+ }
+ node->busy = 0;
+ switch_mutex_unlock(node->mutex);
+ }
+
+ switch_core_destroy_memory_pool(&cbh->pool);
+
+ return NULL;
+}
+
static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
{
struct call_helper *h = (struct call_helper *) obj;
return NULL;
}
-static int place_call_callback(void *pArg, int argc, char **argv, char **columnNames)
+static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames)
+{
+ struct callback_helper *cbh = (struct callback_helper *) pArg;
+ struct call_helper *h;
+
+ h = switch_core_alloc(cbh->pool, sizeof(*h));
+ h->pool = cbh->pool;
+ h->uuid = switch_core_strdup(h->pool, argv[0]);
+ h->node_name = switch_core_strdup(h->pool, argv[1]);
+ h->originate_string = switch_core_strdup(h->pool, argv[2]);
+ h->timeout = atoi(argv[5]);
+
+ cbh->rows[cbh->rowcount++] = h;
+
+ cbh->need--;
+
+ if (cbh->rowcount == MAX_ROWS) return -1;
+
+ return cbh->need ? 0 : -1;
+
+}
+
+static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames)
{
int *need = (int *) pArg;
static void find_consumers(fifo_node_t *node)
{
- int need = node_consumer_wait_count(node);
char *sql;
+
+
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
"from fifo_outbound where taking_calls = 1 and "
"(fifo_name = '%q') and (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) "
"order by next_avail", node->name, (long) switch_epoch_time_now(NULL));
- switch_assert(sql);
- fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_callback, &need);
- free(sql);
+ switch(node->outbound_strategy) {
+ case NODE_STRATEGY_ENTERPRISE:
+ {
+ int need = node_consumer_wait_count(node);
+ fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
+
+ }
+ break;
+ case NODE_STRATEGY_RINGALL:
+ {
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr = NULL;
+ struct callback_helper *cbh;
+ switch_memory_pool_t *pool;
+
+ switch_core_new_memory_pool(&pool);
+ cbh = switch_core_alloc(pool, sizeof(*cbh));
+ cbh->pool = pool;
+ cbh->need = node_consumer_wait_count(node);
+ fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
+
+ switch_threadattr_create(&thd_attr, cbh->pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
+
+ }
+ break;
+ default:
+ break;
+ }
+
+
+ switch_safe_free(sql);
}
static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) {
- if (node->has_outbound && node->ready) {
+ if (node->has_outbound && node->ready && !node->busy) {
switch_mutex_lock(node->mutex);
ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count;
static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32_t priority)
{
fifo_node_t *node;
- char *str;
+ switch_event_t *call_event;
if (priority >= MAX_PRI) {
priority = MAX_PRI - 1;
switch_mutex_unlock(globals.mutex);
- str = switch_mprintf("dial:%s", url);
- switch_queue_push(node->fifo_list[priority], (void *) str);
+ switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
+ switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "dial-url", url);
+
+ fifo_queue_push(node->fifo_list[priority], call_event);
+ call_event = NULL;
- return switch_queue_size(node->fifo_list[priority]);
+ return fifo_queue_size(node->fifo_list[priority]);
}
int freq = 30;
int ftmp = 0;
int to = 60;
+ switch_event_t *call_event;
if (orbit_exten) {
char *ot;
switch_mutex_lock(node->mutex);
node->caller_count++;
- switch_core_hash_insert(node->caller_hash, uuid, session);
-
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
p = atoi(pri);
}
switch_event_fire(&event);
}
- switch_queue_push(node->fifo_list[p], (void *) strdup(uuid));
- switch_snprintf(tmp, sizeof(tmp), "%d", switch_queue_size(node->fifo_list[p]));
+ switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
+ switch_channel_event_set_data(channel, call_event);
+
+
+ fifo_queue_push(node->fifo_list[p], call_event);
+ call_event = NULL;
+ switch_snprintf(tmp, sizeof(tmp), "%d", fifo_queue_size(node->fifo_list[p]));
switch_channel_set_variable(channel, "fifo_position", tmp);
if (!pri) {
switch_mutex_lock(node->mutex);
node_remove_uuid(node, uuid);
node->caller_count--;
- switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
goto done;
} else { /* consumer */
- void *pop = NULL;
+ switch_event_t *pop = NULL;
switch_frame_t *read_frame;
switch_status_t status;
- char *uuid;
switch_core_session_t *other_session;
switch_input_args_t args = { 0 };
const char *pop_order = NULL;
char buf[5] = "";
const char *strat_str = switch_channel_get_variable(channel, "fifo_strategy");
fifo_strategy_t strat = STRAT_WAITING_LONGER;
- char *url = NULL;
-
+ const char *url = NULL;
+ const char *caller_uuid = NULL;
+ switch_event_t *call_event;
+
if (!zstr(strat_str)) {
if (!strcasecmp(strat_str, "more_ppl")) {
strat = STRAT_MORE_PPL;
int x = 0, winner = -1;
switch_time_t longest = (0xFFFFFFFFFFFFFFFFULL / 2);
uint32_t importance = 0, waiting = 0, most_waiting = 0;
-
+
pop = NULL;
if (moh && do_wait) {
}
if (node) {
- if (custom_pop) {
+ const char *varval;
+
+ if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
for (x = 0; x < MAX_PRI; x++) {
- if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
- } else {
+ }
+
+ if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
+ for (x = 0; x < MAX_PRI; x++) {
+ if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_skill",
+ varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
+ break;
+ }
+ }
+ }
+
+ if (!pop) {
for (x = 0; x < MAX_PRI; x++) {
- if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_vip", "true",
+ &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
}
+ if (!pop) {
+ if (custom_pop) {
+ for (x = 0; x < MAX_PRI; x++) {
+ if (fifo_queue_pop(node->fifo_list[pop_array[x]], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
+ break;
+ }
+ }
+ } else {
+ for (x = 0; x < MAX_PRI; x++) {
+ if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
+ break;
+ }
+ }
+ }
+ }
+
if (pop && !node_consumer_wait_count(node)) {
switch_mutex_lock(node->mutex);
node->start_waiting = 0;
continue;
}
- uuid = (char *) pop;
+ call_event = (switch_event_t *) pop;
pop = NULL;
+
+ url = switch_event_get_header(call_event, "dial-url");
+ caller_uuid = switch_event_get_header(call_event, "unique-id");
- if (!strncasecmp(uuid, "dial:", 5)) {
+ if (url) {
switch_call_cause_t cause = SWITCH_CAUSE_NONE;
const char *o_announce = NULL;
- url = uuid + 5;
-
+
if ((o_announce = switch_channel_get_variable(channel, "fifo_outbound_announce"))) {
switch_ivr_play_file(session, NULL, o_announce, NULL);
}
switch_event_fire(&event);
}
url = NULL;
- free(uuid);
- uuid = strdup(switch_core_session_get_uuid(other_session));
+ caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session));
}
} else {
- if ((other_session = switch_core_session_locate(uuid))) {
+ if ((other_session = switch_core_session_locate(caller_uuid))) {
switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(other_channel, event);
switch_mutex_lock(node->mutex);
node->caller_count--;
- switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
switch_time_exp_lt(&tm, ts);
switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
switch_channel_set_variable(channel, "fifo_status", "TALKING");
- switch_channel_set_variable(channel, "fifo_target", uuid);
+ switch_channel_set_variable(channel, "fifo_target", caller_uuid);
switch_channel_set_variable(channel, "fifo_timestamp", date);
switch_channel_set_variable(other_channel, "fifo_status", "TALKING");
switch_mutex_lock(node->mutex);
node->caller_count--;
- switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
switch_core_session_rwunlock(other_session);
- switch_safe_free(uuid);
+ if (call_event) {
+ switch_event_destroy(&call_event);
+ }
if (!do_wait || !switch_channel_ready(channel)) {
break;
switch_thread_rwlock_wrlock(node->rwlock);
node->ready = 0;
switch_mutex_lock(node->mutex);
- switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_mutex_unlock(node->mutex);
switch_thread_rwlock_unlock(node->rwlock);
switch_xml_set_attr_d(x_out, "lag", argv[6]);
switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]);
switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]);
- switch_xml_set_attr_d(x_out, "taking-calls", argv[12]);
- switch_xml_set_attr_d(x_out, "status", argv[13]);
+ switch_xml_set_attr_d(x_out, "taking-calls", argv[13]);
+ switch_xml_set_attr_d(x_out, "status", argv[14]);
switch_xml_set_attr_d(x_out, "next-available", expires);
switch_xml_set_txt_d(x_out, argv[2]);
return cc_off;
}
+
+static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
+{
+ switch_xml_t x_tmp, x_caller, x_cp, variables;
+ int i, x;
+ switch_core_session_t *session;
+ switch_channel_t *channel;
+
+ x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
+ switch_assert(x_tmp);
+
+ for (x = 0; x < MAX_PRI; x++) {
+ fifo_queue_t *q = node->fifo_list[x];
+
+ switch_mutex_lock(q->mutex);
+
+ for (i = 0; i < q->idx; i++) {
+
+ int c_off = 0, d_off = 0;
+ const char *status;
+ const char *ts;
+ const char *uuid = switch_event_get_header(q->data[i], "unique-id");
+
+ if (!uuid) {
+ continue;
+ }
+
+ if (!(session = switch_core_session_locate(uuid))) {
+ continue;
+ }
+
+ channel = switch_core_session_get_channel(session);
+ x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
+ switch_assert(x_caller);
+
+ switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
+
+ if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
+ switch_xml_set_attr_d(x_caller, "status", status);
+ }
+
+ if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
+ switch_xml_set_attr_d(x_caller, "timestamp", ts);
+ }
+
+ if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
+ switch_xml_set_attr_d(x_caller, "target", ts);
+ }
+
+ if ((ts = switch_channel_get_variable(channel, "fifo_position"))) {
+ switch_xml_set_attr_d(x_caller, "position", ts);
+ }
+
+ if (!(x_cp = switch_xml_add_child_d(x_caller, "caller_profile", d_off++))) {
+ abort();
+ }
+ if (verbose) {
+ d_off += switch_ivr_set_xml_profile_data(x_cp, switch_channel_get_caller_profile(channel), d_off);
+
+ if (!(variables = switch_xml_add_child_d(x_caller, "variables", c_off++))) {
+ abort();
+ }
+
+ switch_ivr_set_xml_chan_vars(variables, channel, c_off);
+ }
+
+ switch_core_session_rwunlock(session);
+ session = NULL;
+ }
+
+ switch_mutex_unlock(q->mutex);
+ }
+
+ return cc_off;
+}
+
static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int verbose)
{
switch_xml_t x_fifo;
switch_snprintf(tmp, sizeof(buffer), "%u", node->importance);
switch_xml_set_attr_d(x_fifo, "importance", tmp);
+ switch_xml_set_attr_d(x_fifo, "outbound_strategy", strat_parse(node->outbound_strategy));
+
cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose);
- cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose);
+ cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose);
cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose);
}
var = (char *) switch_xml_attr_soft(param, "name");
val = (char *) switch_xml_attr_soft(param, "value");
+ if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) {
+ default_strategy = parse_strat(val);
+ }
+
if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
if (switch_odbc_available()) {
globals.odbc_dsn = switch_core_strdup(globals.pool, val);
if ((fifos = switch_xml_child(cfg, "fifos"))) {
for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) {
- const char *name;
+ const char *name, *outbound_strategy;
const char *importance;
int imp = 0;
int simo_i = 1;
fifo_node_t *node;
name = switch_xml_attr(fifo, "name");
+ outbound_strategy = switch_xml_attr(fifo, "outbound_strategy");
if ((importance = switch_xml_attr(fifo, "importance"))) {
if ((imp = atoi(importance)) < 0) {
switch_mutex_lock(node->mutex);
+ if (outbound_strategy) {
+ node->outbound_strategy = parse_strat(outbound_strategy);
+ }
+
for (member = switch_xml_child(fifo, "member"); member; member = member->next) {
const char *simo = switch_xml_attr_soft(member, "simo");
const char *lag = switch_xml_attr_soft(member, "lag");
if (reload) {
switch_hash_index_t *hi;
- void *val, *pop;
+ void *val;
+ switch_event_t *pop;
fifo_node_t *node;
switch_mutex_lock(globals.mutex);
top:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
- while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
+ while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+ switch_event_destroy(&pop);
}
}
switch_core_hash_delete(globals.fifo_hash, node->name);
- switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
{
switch_hash_index_t *hi;
- void *val, *pop;
+ void *val;
+ switch_event_t *pop = NULL;
fifo_node_t *node;
switch_memory_pool_t *pool = globals.pool;
switch_mutex_t *mutex = globals.mutex;
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
- while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
+ while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+ switch_event_destroy(&pop);
}
}
switch_core_hash_delete(globals.fifo_hash, node->name);
- switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);