queue->data[queue->idx++] = ptr;
-
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
return s;
}
-static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
+static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
{
int i, j;
for (j = 0; j < queue->idx; j++) {
const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
- if (uuid && !check_caller_outbound_call(uuid)) {
+ if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) {
if (remove) {
*pop = queue->data[j];
} else {
}
-static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
+static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
{
int i, j, force = 0;
force = 1;
}
+ if (remove == 2) {
+ force = 1;
+ }
+
if (queue->idx == 0 || zstr(name) || zstr(val)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
for (j = 0; j < queue->idx; j++) {
const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
- if (j_uuid && !strcmp(j_uuid, uuid)) break;
+ if (j_uuid && !strcmp(j_uuid, uuid)) {
+ switch_event_destroy(&queue->data[j]);
+ break;
+ }
}
if (j == queue->idx) {
char *odbc_pass;
int node_thread_running;
switch_odbc_handle_t *master_odbc;
+ int threads;
} globals;
struct call_helper *rows[MAX_ROWS] = { 0 };
int rowcount = 0;
+
+ if (!globals.running) return NULL;
+
switch_uuid_get(&uuid);
switch_uuid_format(uuid_str, &uuid);
+
+ switch_mutex_lock(globals.mutex);
+ globals.threads++;
+ switch_mutex_unlock(globals.mutex);
if (!cbh->rowcount) {
goto end;
switch_core_destroy_memory_pool(&cbh->pool);
+ switch_mutex_lock(globals.mutex);
+ globals.threads--;
+ switch_mutex_unlock(globals.mutex);
+
return NULL;
}
switch_event_t *event = NULL;
char *sql = NULL;
int connected = 0;
-
+
+ if (!globals.running) return NULL;
+
+ switch_mutex_lock(globals.mutex);
+ globals.threads++;
+ switch_mutex_unlock(globals.mutex);
+
+
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, h->node_name);
switch_mutex_unlock(globals.mutex);
}
switch_core_destroy_memory_pool(&h->pool);
+ switch_mutex_lock(globals.mutex);
+ globals.threads--;
+ switch_mutex_unlock(globals.mutex);
+
return NULL;
}
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
struct callback_helper *cbh;
- switch_memory_pool_t *pool;
+ switch_memory_pool_t *pool = NULL;
switch_core_new_memory_pool(&pool);
cbh = switch_core_alloc(pool, sizeof(*cbh));
fifo_strategy_t strat = STRAT_WAITING_LONGER;
const char *url = NULL;
const char *caller_uuid = NULL;
- switch_event_t *call_event;
const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid");
//const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count");
//int do_track = switch_true(track_use_count);
continue;
}
- call_event = (switch_event_t *) pop;
- pop = NULL;
+ url = switch_event_get_header(pop, "dial-url");
+ caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id"));
+ switch_event_destroy(&pop);
- url = switch_event_get_header(call_event, "dial-url");
- caller_uuid = switch_event_get_header(call_event, "unique-id");
-
if (url) {
switch_call_cause_t cause = SWITCH_CAUSE_NONE;
const char *o_announce = NULL;
switch_channel_event_set_data(channel, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop");
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session));
switch_event_fire(&event);
}
send_presence(node);
check_cancel(node);
switch_core_session_rwunlock(other_session);
- if (call_event) {
- switch_event_destroy(&call_event);
- }
+
if (!do_wait || !switch_channel_ready(channel)) {
break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
- while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+ while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
if (globals.node_thread_running) {
stop_node_thread();
}
+
+ while(globals.threads) {
+ switch_cond_next();
+ }
+
while ((hi = switch_hash_first(NULL, globals.fifo_hash))) {
int x = 0;
switch_thread_rwlock_wrlock(node->rwlock);
switch_mutex_lock(node->mutex);
for (x = 0; x < MAX_PRI; x++) {
- while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
+ while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}