nua_handle_unref(nh);
nua_stack_unref(nua);
+ switch_os_yield();
}
+
+static int msg_queue_threads = 0;
+//static int count = 0;
+
void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj)
{
void *pop;
switch_queue_t *q = (switch_queue_t *) obj;
int my_id;
+
for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) {
- if (mod_sofia_globals.msg_queue[my_id] == q) {
+ if (mod_sofia_globals.msg_queue_thread[my_id] == thread) {
break;
}
}
-
+
+ switch_mutex_lock(mod_sofia_globals.mutex);
+ msg_queue_threads++;
+ switch_mutex_unlock(mod_sofia_globals.mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id);
- switch_core_thread_set_cpu_affinity(my_id);
- while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
- sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
- sofia_process_dispatch_event(&de);
- switch_cond_next();
+ for(;;) {
+
+ if (switch_queue_pop(q, &pop) != SWITCH_STATUS_SUCCESS) {
+ switch_cond_next();
+ continue;
+ }
+
+ if (pop) {
+ sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
+ sofia_process_dispatch_event(&de);
+ switch_os_yield();
+ } else {
+ break;
+ }
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Ended\n");
+ switch_mutex_lock(mod_sofia_globals.mutex);
+ msg_queue_threads--;
+ switch_mutex_unlock(mod_sofia_globals.mutex);
+
return NULL;
}
int i;
mod_sofia_globals.msg_queue_len = idx + 1;
+ if (!mod_sofia_globals.msg_queue) {
+ switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.cpu_count, mod_sofia_globals.pool);
+ }
+
+
for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
- if (!mod_sofia_globals.msg_queue[i]) {
+ if (!mod_sofia_globals.msg_queue_thread[i]) {
switch_threadattr_t *thd_attr = NULL;
- switch_queue_create(&mod_sofia_globals.msg_queue[i], SOFIA_MSG_QUEUE_SIZE, mod_sofia_globals.pool);
-
switch_threadattr_create(&thd_attr, mod_sofia_globals.pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&mod_sofia_globals.msg_queue_thread[i],
thd_attr,
sofia_msg_thread_run,
- mod_sofia_globals.msg_queue[i],
+ mod_sofia_globals.msg_queue,
mod_sofia_globals.pool);
}
}
switch_mutex_unlock(mod_sofia_globals.mutex);
}
-
+//static int foo = 0;
static void sofia_queue_message(sofia_dispatch_event_t *de)
{
- int idx = 0, queued = 0;
+ int launch = 0;
- if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) {
+ if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue) {
sofia_process_dispatch_event(&de);
return;
}
}
- again:
-
- for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) {
- if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) {
- queued++;
- break;
- }
+ if ((switch_queue_size(mod_sofia_globals.msg_queue) > (SOFIA_MSG_QUEUE_SIZE * msg_queue_threads))) {
+ launch++;
}
- if (!queued) {
+ if (launch) {
if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) {
sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1);
- goto again;
}
-
- switch_queue_push(mod_sofia_globals.msg_queue[0], de);
}
-
+
+ switch_queue_push(mod_sofia_globals.msg_queue, de);
}
return;
}
+
switch_mutex_lock(profile->flag_mutex);
profile->queued_events++;
de->nua = nua_stack_ref(nua);
if (event == nua_i_invite && !sofia_private) {
+ int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000);
+
+ if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) {
+ nua_respond(nh, 503, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
+ return;
+ }
+
if (!(sofia_private = su_alloc(nh->nh_home, sizeof(*sofia_private)))) {
abort();
}
}
}
-
sofia_queue_message(de);
+ switch_os_yield();
}
#include <switch.h>
#include <switch_event.h>
//#define SWITCH_EVENT_RECYCLE
-#define DISPATCH_QUEUE_LEN 1000
+#define DISPATCH_QUEUE_LEN 100
//#define DEBUG_DISPATCH_QUEUES
/*! \brief A node to store binded events */
static switch_mutex_t *POOL_LOCK = NULL;
static switch_memory_pool_t *RUNTIME_POOL = NULL;
static switch_memory_pool_t *THRUNTIME_POOL = NULL;
-#define NUMBER_OF_QUEUES 3
-static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 };
-static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 };
static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 };
static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 };
-static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 };
-static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
+static switch_queue_t *EVENT_DISPATCH_QUEUE = NULL;
static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
static switch_hash_t *CUSTOM_HASH = NULL;
static int THREAD_COUNT = 0;
+static int DISPATCH_THREAD_COUNT = 0;
static int SYSTEM_RUNNING = 0;
static uint64_t EVENT_SEQUENCE_NR = 0;
#ifdef SWITCH_EVENT_RECYCLE
static switch_queue_t *EVENT_RECYCLE_QUEUE = NULL;
static switch_queue_t *EVENT_HEADER_RECYCLE_QUEUE = NULL;
#endif
-static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool);
+static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool);
static char *my_dup(const char *s)
{
switch_mutex_lock(EVENT_QUEUE_MUTEX);
THREAD_COUNT++;
+ DISPATCH_THREAD_COUNT++;
- for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
- if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
+ for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) {
+ if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) {
break;
}
}
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
- switch_core_thread_set_cpu_affinity(my_id);
for (;;) {
void *pop = NULL;
}
if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
- break;
+ continue;
}
if (!pop) {
switch_mutex_lock(EVENT_QUEUE_MUTEX);
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0;
THREAD_COUNT--;
+ DISPATCH_THREAD_COUNT--;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
}
-
-static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, void *obj)
+static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp)
{
- switch_queue_t *queue = (switch_queue_t *) obj;
- uint32_t index = 0;
- int my_id = 0;
- switch_mutex_lock(EVENT_QUEUE_MUTEX);
- THREAD_COUNT++;
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+ switch_event_t *event = *eventp;
- for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
- if (EVENT_QUEUE[my_id] == queue) {
- break;
- }
+ if (!SYSTEM_RUNNING) {
+ return SWITCH_STATUS_FALSE;
}
+
+ while (event) {
+ int launch = 0;
- for (;;) {
- void *pop = NULL;
- switch_event_t *event = NULL;
+ switch_mutex_lock(EVENT_QUEUE_MUTEX);
- if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
- break;
- }
-
- if (!pop) {
- break;
+ if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
+ launch++;
}
-
- if (!SYSTEM_RUNNING) {
- break;
- }
-
- event = (switch_event_t *) pop;
-
- while (event) {
-
- for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
- if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
- event = NULL;
- break;
- }
- }
-
- if (event) {
- if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
- switch_mutex_lock(EVENT_QUEUE_MUTEX);
- launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Event Thread %d is blocking\n", my_id);
- switch_queue_push(EVENT_DISPATCH_QUEUE[0], event);
- event = NULL;
- }
+
+ if (launch) {
+ if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
+ launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, RUNTIME_POOL);
}
}
- }
-
- switch_mutex_lock(EVENT_QUEUE_MUTEX);
- THREAD_COUNT--;
- switch_mutex_unlock(EVENT_QUEUE_MUTEX);
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread %d Ended.\n", my_id);
- return NULL;
+ switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+ *eventp = NULL;
+ switch_queue_push(EVENT_DISPATCH_QUEUE, event);
+ event = NULL;
+
+ }
+
+ return SWITCH_STATUS_SUCCESS;
}
-
SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
{
switch_event_types_t e;
SYSTEM_RUNNING = 0;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
- for (x = 0; x < 3; x++) {
- if (EVENT_QUEUE[x]) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x);
- switch_queue_trypush(EVENT_QUEUE[x], NULL);
- switch_queue_interrupt_all(EVENT_QUEUE[x]);
- }
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
+
+
+ for(x = 0; x < DISPATCH_THREAD_COUNT; x++) {
+ switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
}
- for (x = 0; x < SOFT_MAX_DISPATCH; x++) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x);
- switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL);
- switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE[x]);
+ switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
+
+ for(x = 0; x < DISPATCH_THREAD_COUNT; x++) {
+ switch_status_t st;
+ switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
}
+ x = 0;
while (x < 10000 && THREAD_COUNT) {
switch_cond_next();
if (THREAD_COUNT == last) {
last = THREAD_COUNT;
}
- for (x = 0; x < SOFT_MAX_DISPATCH; x++) {
+ {
void *pop = NULL;
switch_event_t *event = NULL;
- switch_status_t st;
-
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch thread %d\n", x);
- switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
- while (switch_queue_trypop(EVENT_DISPATCH_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ while (switch_queue_trypop(EVENT_DISPATCH_QUEUE, &pop) == SWITCH_STATUS_SUCCESS && pop) {
event = (switch_event_t *) pop;
switch_event_destroy(&event);
}
}
- for (x = 0; x < NUMBER_OF_QUEUES; x++) {
- void *pop = NULL;
- switch_event_t *event = NULL;
- switch_status_t st;
-
- if (EVENT_QUEUE_THREADS[x]) {
-
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x);
- switch_thread_join(&st, EVENT_QUEUE_THREADS[x]);
-
- while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
- event = (switch_event_t *) pop;
- switch_event_destroy(&event);
- }
- }
- }
-
for (hi = switch_hash_first(NULL, CUSTOM_HASH); hi; hi = switch_hash_next(hi)) {
switch_event_subclass_t *subclass;
switch_hash_this(hi, &var, NULL, &val);
return SWITCH_STATUS_SUCCESS;
}
-static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool)
+static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool)
{
switch_threadattr_t *thd_attr;
uint32_t index = 0;
}
for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) {
- if (EVENT_DISPATCH_QUEUE[index]) {
+ if (EVENT_DISPATCH_QUEUE_THREADS[index]) {
continue;
}
- switch_queue_create(&EVENT_DISPATCH_QUEUE[index], len, pool);
+
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
- switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
+ switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool);
while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
if (index == 1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
*/
/* don't need any more dispatch threads than we have CPU's*/
- MAX_DISPATCH = switch_core_cpu_count() + 1;
-
+ MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1;
+ if (MAX_DISPATCH < 2) {
+ MAX_DISPATCH = 2;
+ }
switch_assert(pool != NULL);
THRUNTIME_POOL = RUNTIME_POOL = pool;
//switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
//switch_threadattr_priority_increase(thd_attr);
- launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
+ switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
+ launch_dispatch_threads(1, RUNTIME_POOL);
+
//switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
(*event)->event_user_data = user_data;
}
- if (!EVENT_QUEUE_THREADS[(*event)->priority] && (*event)->priority < 3) {
- switch_threadattr_t *thd_attr;
-
- switch_queue_create(&EVENT_QUEUE[(*event)->priority], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
- switch_threadattr_create(&thd_attr, THRUNTIME_POOL);
- switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_threadattr_priority_increase(thd_attr);
- switch_thread_create(&EVENT_QUEUE_THREADS[(*event)->priority], thd_attr, switch_event_thread, EVENT_QUEUE[(*event)->priority], RUNTIME_POOL);
- }
-
- for (;;) {
- if (switch_queue_trypush(EVENT_QUEUE[(*event)->priority], *event) == SWITCH_STATUS_SUCCESS) {
- goto end;
- }
-
-
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event queue is full!\n");
- switch_yield(100000);
+ if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
+ switch_event_destroy(event);
+ return SWITCH_STATUS_FALSE;
}
- end:
-
- *event = NULL;
-
return SWITCH_STATUS_SUCCESS;
}