struct fifo_node {
char *name;
switch_mutex_t *mutex;
+ switch_mutex_t *update_mutex;
fifo_queue_t *fifo_list[MAX_PRI];
switch_hash_t *consumer_hash;
int outbound_priority;
switch_core_hash_init(&node->consumer_hash, node->pool);
switch_thread_rwlock_create(&node->rwlock, node->pool);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
+ switch_mutex_init(&node->update_mutex, SWITCH_MUTEX_NESTED, node->pool);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
if (node) {
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
node->busy = 0;
node->ring_consumer_count = 1;
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
} else {
goto end;
}
cbh->ready = 1;
if (node) {
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
node->ring_consumer_count = 0;
node->busy = 0;
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
}
switch_mutex_unlock(globals.mutex);
if (node) {
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
node->ring_consumer_count++;
node->busy = 0;
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
}
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
switch_event_destroy(&ovars);
if (node) {
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
if (node->ring_consumer_count-- < 0) {
node->ring_consumer_count = 0;
}
node->busy = 0;
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
}
switch_core_destroy_memory_pool(&h->pool);
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
+ restart:
+
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) {
+
+ if (node->ready == FIFO_DELAY_DESTROY) {
+ int doit = 0;
+
+ switch_mutex_lock(node->update_mutex);
+ doit = node->consumer_count == 0 && node_caller_count(node) == 0;
+ switch_mutex_unlock(node->update_mutex);
+
+ if (doit) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
+ switch_core_hash_delete(globals.fifo_hash, node->name);
+
+ node->ready = 0;
+ switch_mutex_lock(node->mutex);
+ switch_core_hash_destroy(&node->consumer_hash);
+ switch_mutex_unlock(node->mutex);
+ switch_mutex_unlock(node->update_mutex);
+ switch_core_destroy_memory_pool(&node->pool);
+ goto restart;
+ }
+
+ }
+
+
if (node->outbound_priority == 0) node->outbound_priority = 5;
if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
ppl_waiting = node_caller_count(node);
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
node = create_node(nlist[i], importance, globals.sql_mutex);
node->ready = 1;
+ switch_thread_rwlock_rdlock(node->rwlock);
}
node_list[node_count++] = node;
}
switch_channel_answer(channel);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
p = atoi(pri);
switch_channel_set_variable(channel, "fifo_priority", tmp);
}
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
ts = switch_micro_time_now();
switch_time_exp_lt(&tm, ts);
}
switch_mutex_lock(globals.mutex);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
node_remove_uuid(node, uuid);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
send_presence(node);
check_cancel(node);
switch_mutex_unlock(globals.mutex);
}
if (pop && !node_caller_count(node)) {
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
node->start_waiting = 0;
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
}
}
done:
- switch_mutex_lock(globals.mutex);
- if (node && node->ready == FIFO_DELAY_DESTROY) {
- int doit = 0;
-
- switch_thread_rwlock_wrlock(node->rwlock);
- doit = node->consumer_count == 0 && node_caller_count(node) == 0;
+ if (node) {
switch_thread_rwlock_unlock(node->rwlock);
-
- if (doit) {
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
- switch_core_hash_delete(globals.fifo_hash, node->name);
-
- node->ready = 0;
- switch_mutex_lock(node->mutex);
- switch_core_hash_destroy(&node->consumer_hash);
- switch_mutex_unlock(node->mutex);
- switch_thread_rwlock_unlock(node->rwlock);
- switch_core_destroy_memory_pool(&node->pool);
- }
-
}
- switch_mutex_unlock(globals.mutex);
-
switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val;
len = node_caller_count(node);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), len);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
x++;
}
}
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
len = node_caller_count(node);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), len);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
} else {
stream->write_function(stream, "none\n");
}
switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val;
len = node_caller_count(node);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
x++;
}
}
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
len = node_caller_count(node);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
} else {
stream->write_function(stream, "none\n");
}
node->ready = FIFO_DELAY_DESTROY;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->consumer_hash);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
switch_core_destroy_memory_pool(&node->pool);
goto top;
}
switch_hash_this(hi, NULL, NULL, &val);
node = (fifo_node_t *) val;
- switch_thread_rwlock_wrlock(node->rwlock);
+ switch_mutex_lock(node->update_mutex);
switch_mutex_lock(node->mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_mutex_unlock(node->mutex);
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->consumer_hash);
- switch_thread_rwlock_unlock(node->rwlock);
+ switch_mutex_unlock(node->update_mutex);
switch_core_destroy_memory_pool(&node->pool);
}