#define MANUAL_QUEUE_NAME "manual_calls"
#define FIFO_EVENT "fifo::info"
-#define FIFO_DELAY_DESTROY 100
+
static switch_status_t load_config(int reload, int del_all);
#define MAX_PRI 10
int ring_timeout;
int default_lag;
char *domain_name;
+ struct fifo_node *next;
};
typedef struct fifo_node fifo_node_t;
return SWITCH_STATUS_SUCCESS;
}
+struct fifo_node;
+
static struct {
switch_hash_t *caller_orig_hash;
switch_hash_t *consumer_orig_hash;
int threads;
switch_thread_t *node_thread;
int debug;
+ struct fifo_node *nodes;
} globals;
switch_mutex_lock(globals.mutex);
switch_core_hash_insert(globals.fifo_hash, name, node);
+ node->next = globals.nodes;
+ globals.nodes = node;
switch_mutex_unlock(globals.mutex);
switch_safe_free(domain_name);
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, node_name);
+ switch_thread_rwlock_rdlock(node->rwlock);
switch_mutex_unlock(globals.mutex);
for (i = 0; i < cbh->rowcount; i++) {
node->ring_consumer_count = 0;
node->busy = 0;
switch_mutex_unlock(node->update_mutex);
+ switch_thread_rwlock_unlock(node->rwlock);
}
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, h->node_name);
+ switch_thread_rwlock_rdlock(node->rwlock);
switch_mutex_unlock(globals.mutex);
if (node) {
}
node->busy = 0;
switch_mutex_unlock(node->update_mutex);
+ switch_thread_rwlock_unlock(node->rwlock);
}
switch_core_destroy_memory_pool(&h->pool);
static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
{
- fifo_node_t *node;
+ fifo_node_t *node, *last, *this_node;
int cur_priority = 1;
globals.node_thread_running = 1;
while (globals.node_thread_running == 1) {
- switch_hash_index_t *hi;
- void *val;
- const void *var;
int ppl_waiting, consumer_total, idle_consumers, found = 0;
switch_mutex_lock(globals.mutex);
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
+ last = NULL;
+ node = globals.nodes;
- restart:
+ while(node) {
+ int x = 0;
+ switch_event_t *pop;
+ int nuke = 0;
+
+ this_node = node;
+ node = node->next;
+
+ if (this_node->ready == 0) {
+
+ for (x = 0; x < MAX_PRI; x++) {
+ while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+ const char *caller_uuid = switch_event_get_header(pop, "unique-id");
+ switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST);
+ switch_event_destroy(&pop);
+ }
+ }
- for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
- switch_hash_this(hi, &var, NULL, &val);
- if ((node = (fifo_node_t *) val)) {
- int x = 0;
- switch_event_t *pop;
+ }
- if (node->ready == FIFO_DELAY_DESTROY) {
- if (switch_thread_rwlock_trywrlock(node->rwlock) == SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
- switch_core_hash_delete(globals.fifo_hash, node->name);
- for (x = 0; x < MAX_PRI; x++) {
- while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
- switch_event_destroy(&pop);
- }
- }
+ if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
- node->ready = 0;
- switch_core_hash_destroy(&node->consumer_hash);
- switch_mutex_unlock(node->mutex);
- switch_mutex_unlock(node->update_mutex);
- switch_thread_rwlock_unlock(node->rwlock);
- switch_core_destroy_memory_pool(&node->pool);
- goto restart;
+ for (x = 0; x < MAX_PRI; x++) {
+ while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+ switch_event_destroy(&pop);
}
}
+ if (last) {
+ last->next = this_node->next;
+ } else {
+ globals.nodes = this_node->next;
+ }
- if (node->outbound_priority == 0) node->outbound_priority = 5;
- if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
- ppl_waiting = node_caller_count(node);
- consumer_total = node->consumer_count;
- idle_consumers = node_idle_consumers(node);
+ switch_core_hash_destroy(&this_node->consumer_hash);
+ switch_mutex_unlock(this_node->mutex);
+ switch_mutex_unlock(this_node->update_mutex);
+ switch_thread_rwlock_unlock(this_node->rwlock);
+ switch_core_destroy_memory_pool(&this_node->pool);
+ nuke++;
+ }
- if (globals.debug) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
- "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
- node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count, node->outbound_priority);
- }
+ last = this_node;
+ if (nuke) continue;
- if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
- found++;
- find_consumers(node);
- switch_yield(1000000);
- }
+ if (this_node->outbound_priority == 0) this_node->outbound_priority = 5;
+
+ globals.debug = 1;
+
+ if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) {
+ ppl_waiting = node_caller_count(this_node);
+ consumer_total = this_node->consumer_count;
+ idle_consumers = node_idle_consumers(this_node);
+
+ if (globals.debug) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+ "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
+ this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
+ }
+
+
+ if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
+ found++;
+ find_consumers(this_node);
+ switch_yield(1000000);
}
}
}
+
if (++cur_priority > 10) {
cur_priority = 1;
node->ready = 1;
}
+ switch_thread_rwlock_rdlock(node->rwlock);
send_presence(node);
+ switch_thread_rwlock_unlock(node->rwlock);
switch_mutex_unlock(globals.mutex);
{
fifo_node_t *node;
switch_event_t *call_event;
+ uint32_t i = 0;
if (priority >= MAX_PRI) {
priority = MAX_PRI - 1;
node = create_node(node_name, 0, globals.sql_mutex);
}
+ switch_thread_rwlock_rdlock(node->rwlock);
+
switch_mutex_unlock(globals.mutex);
switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
fifo_queue_push(node->fifo_list[priority], call_event);
call_event = NULL;
- return fifo_queue_size(node->fifo_list[priority]);
+ i = fifo_queue_size(node->fifo_list[priority]);
+
+ switch_thread_rwlock_unlock(node->rwlock);
+
+ return i;
}
}
switch_thread_rwlock_unlock(node->rwlock);
if (node->ready == 1 && do_destroy) {
- node->ready = FIFO_DELAY_DESTROY;
+ node->ready = 0;
}
}
switch_mutex_unlock(globals.mutex);
switch_mutex_lock(globals.mutex);
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, NULL, NULL, &val);
- if ((node = (fifo_node_t *) val) && node->is_static) {
- node->ready = 0;
+ if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) {
+ node->ready = -1;
}
}
switch_mutex_unlock(globals.mutex);
done:
if (reload) {
- switch_hash_index_t *hi;
- void *val;
- switch_event_t *pop;
fifo_node_t *node;
- switch_mutex_lock(globals.mutex);
- top:
- for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
- int x = 0;
- switch_hash_this(hi, NULL, NULL, &val);
- if (!(node = (fifo_node_t *) val) || !node->is_static || node->ready) {
- continue;
- }
-
- if (node_caller_count(node) || node->consumer_count || node_idle_consumers(node)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s removal delayed, still in use.\n", node->name);
- node->ready = FIFO_DELAY_DESTROY;
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
- switch_mutex_lock(node->update_mutex);
- for (x = 0; x < MAX_PRI; x++) {
- while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
- switch_event_destroy(&pop);
- }
- }
+ switch_mutex_lock(globals.mutex);
+ for (node = globals.nodes; node; node = node->next) {
+ if (node->ready == -1) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name);
switch_core_hash_delete(globals.fifo_hash, node->name);
- switch_core_hash_destroy(&node->consumer_hash);
- switch_mutex_unlock(node->update_mutex);
- switch_core_destroy_memory_pool(&node->pool);
- goto top;
+ node->ready = 0;
}
}
-
- fifo_caller_del(NULL);
switch_mutex_unlock(globals.mutex);
+ fifo_caller_del(NULL);
}
*/
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
{
- switch_hash_index_t *hi;
- void *val;
switch_event_t *pop = NULL;
- fifo_node_t *node;
+ fifo_node_t *node, *this_node;
switch_mutex_t *mutex = globals.mutex;
switch_event_unbind(&globals.node);
switch_cond_next();
}
+ node = globals.nodes;
- while ((hi = switch_hash_first(NULL, globals.fifo_hash))) {
+ while(node) {
int x = 0;
- switch_hash_this(hi, NULL, NULL, &val);
- node = (fifo_node_t *) val;
- switch_mutex_lock(node->update_mutex);
- switch_mutex_lock(node->mutex);
+ this_node = node;
+ node = node->next;
+
+
+ switch_mutex_lock(this_node->update_mutex);
+ switch_mutex_lock(this_node->mutex);
for (x = 0; x < MAX_PRI; x++) {
- while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
+ while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
- switch_mutex_unlock(node->mutex);
- switch_core_hash_delete(globals.fifo_hash, node->name);
- switch_core_hash_destroy(&node->consumer_hash);
- switch_mutex_unlock(node->update_mutex);
- switch_core_destroy_memory_pool(&node->pool);
+ switch_mutex_unlock(this_node->mutex);
+ switch_core_hash_delete(globals.fifo_hash, this_node->name);
+ switch_core_hash_destroy(&this_node->consumer_hash);
+ switch_mutex_unlock(this_node->update_mutex);
+ switch_core_destroy_memory_pool(&this_node->pool);
}
switch_core_hash_destroy(&globals.fifo_hash);