static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 };
static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 };
static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 };
+static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 };
static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 };
static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
{
switch_queue_t *queue = (switch_queue_t *) obj;
int my_id = 0;
+
switch_mutex_lock(EVENT_QUEUE_MUTEX);
THREAD_COUNT++;
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
}
}
+ EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
+ switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+
for (;;) {
void *pop = NULL;
switch_event_t *event = NULL;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
+ EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
THREAD_COUNT--;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
for (;;) {
void *pop = NULL;
switch_event_t *event = NULL;
+ int loops = 0;
if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
break;
event = (switch_event_t *) pop;
while (event) {
- int max;
- switch_mutex_lock(EVENT_QUEUE_MUTEX);
- max = SOFT_MAX_DISPATCH;
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+ if (++loops > 2) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system overloading\n");
+ switch_yield(1000000);
+ }
- for (index = 0; (int)index < max; index++) {
+ for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
event = NULL;
break;
}
if (event) {
- switch_mutex_lock(EVENT_QUEUE_MUTEX);
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Adding a new event thread #%d\n", SOFT_MAX_DISPATCH + 1);
+ switch_mutex_lock(EVENT_QUEUE_MUTEX);
launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
+ switch_mutex_unlock(EVENT_QUEUE_MUTEX);
} else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event threads maxed out at %d.\n", SOFT_MAX_DISPATCH);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of threads!\n");
switch_yield(1000000);
}
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
}
-
-
- switch_cond_next();
}
}
{
switch_threadattr_t *thd_attr;
uint32_t index = 0;
+ int launched = 0;
+ uint32_t sanity = 200;
if (max > MAX_DISPATCH) {
return;
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Create event dispatch thread %d\n", index);
- switch_yield(100000);
+ while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
+ launched++;
+ break;
}
SOFT_MAX_DISPATCH = index;