]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
performance tweaks for sip message parsing and event system
authorAnthony Minessale <anthm@freeswitch.org>
Fri, 18 May 2012 01:10:53 +0000 (20:10 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Fri, 18 May 2012 01:10:53 +0000 (20:10 -0500)
src/include/switch_core.h
src/mod/endpoints/mod_sofia/mod_sofia.c
src/mod/endpoints/mod_sofia/mod_sofia.h
src/mod/endpoints/mod_sofia/sofia.c
src/switch_channel.c
src/switch_event.c
src/switch_time.c

index 8f9220d0b9818602c7ca8ad88d29e60bf019e983..9803c6708b6662d1b019b56557306d674586b449 100644 (file)
@@ -2329,6 +2329,8 @@ SWITCH_DECLARE(void) switch_say_file(switch_say_file_handle_t *sh, const char *f
 SWITCH_DECLARE(int) switch_max_file_desc(void);
 SWITCH_DECLARE(void) switch_close_extra_files(int *keep, int keep_ttl);
 SWITCH_DECLARE(switch_status_t) switch_core_thread_set_cpu_affinity(int cpu);
+SWITCH_DECLARE(void) switch_os_yield(void);
+
 SWITCH_END_EXTERN_C
 #endif
 /* For Emacs:
index 164d0e76ffe7f0cc0cf3ab9a536c445ff3a1fe66..6f64078509a3eea4f192d54c99ddb18ba3220e0f 100644 (file)
@@ -5445,7 +5445,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
        switch_yield(1500000);
 
        mod_sofia_globals.cpu_count = switch_core_cpu_count();
-       mod_sofia_globals.max_msg_queues = mod_sofia_globals.cpu_count + 1;
+       mod_sofia_globals.max_msg_queues = (mod_sofia_globals.cpu_count / 2) + 1;
+       if (mod_sofia_globals.max_msg_queues < 2) {
+               mod_sofia_globals.max_msg_queues = 2;
+       }
 
        if (mod_sofia_globals.max_msg_queues > SOFIA_MAX_MSG_QUEUE) {
                mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE;
@@ -5627,11 +5630,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_sofia_shutdown)
        }
 
 
-       for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) { 
-               switch_queue_push(mod_sofia_globals.msg_queue[i], NULL);
+       for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) {
+               switch_queue_push(mod_sofia_globals.msg_queue, NULL);
        }
 
-       for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
+
+       for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) {
                switch_status_t st;
                switch_thread_join(&st, mod_sofia_globals.msg_queue_thread[i]);
        }
index b96ff52011cae5b6e7c8e6548bad0e479b086b46..c0fc7dfba45dbec6ec1c46b2a1a6d579740044f4 100644 (file)
@@ -342,7 +342,7 @@ typedef enum {
 } TFLAGS;
 
 #define SOFIA_MAX_MSG_QUEUE 64
-#define SOFIA_MSG_QUEUE_SIZE 250
+#define SOFIA_MSG_QUEUE_SIZE 100
 
 struct mod_sofia_globals {
        switch_memory_pool_t *pool;
@@ -359,7 +359,7 @@ struct mod_sofia_globals {
        char hostname[512];
        switch_queue_t *presence_queue;
        switch_queue_t *mwi_queue;
-       switch_queue_t *msg_queue[SOFIA_MAX_MSG_QUEUE];
+       switch_queue_t *msg_queue;
        switch_thread_t *msg_queue_thread[SOFIA_MAX_MSG_QUEUE];
        int msg_queue_len;
        struct sofia_private destroy_private;
index 0e579ca1f8089e51f0fd4b102a1d08ce772b01b3..8eb122b310cd01a506d74416710d7c1770665064 100644 (file)
@@ -1347,34 +1347,56 @@ void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
        
        nua_handle_unref(nh);
        nua_stack_unref(nua);
+       switch_os_yield();
 }
 
 
+
+static int msg_queue_threads = 0;
+//static int count = 0;
+
 void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj)
 {
        void *pop;
        switch_queue_t *q = (switch_queue_t *) obj;
        int my_id;
 
+
        for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) {
-               if (mod_sofia_globals.msg_queue[my_id] == q) {
+               if (mod_sofia_globals.msg_queue_thread[my_id] == thread) {
                        break;
                }
        }
-
+       
+       switch_mutex_lock(mod_sofia_globals.mutex); 
+       msg_queue_threads++;
+       switch_mutex_unlock(mod_sofia_globals.mutex); 
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id);
 
-       switch_core_thread_set_cpu_affinity(my_id);
 
-       while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
-               sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
-               sofia_process_dispatch_event(&de);
-               switch_cond_next();
+       for(;;) {
+
+               if (switch_queue_pop(q, &pop) != SWITCH_STATUS_SUCCESS) {
+                       switch_cond_next();
+                       continue;
+               }
+
+               if (pop) {
+                       sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
+                       sofia_process_dispatch_event(&de);
+                       switch_os_yield();
+               } else {
+                       break;
+               }
        }
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Ended\n");
 
+       switch_mutex_lock(mod_sofia_globals.mutex); 
+       msg_queue_threads--;
+       switch_mutex_unlock(mod_sofia_globals.mutex); 
+
        return NULL;    
 }
 
@@ -1392,19 +1414,22 @@ void sofia_msg_thread_start(int idx)
                int i;
                mod_sofia_globals.msg_queue_len = idx + 1;
 
+               if (!mod_sofia_globals.msg_queue) {
+                       switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.cpu_count, mod_sofia_globals.pool);
+               }
+
+
                for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
-                       if (!mod_sofia_globals.msg_queue[i]) {
+                       if (!mod_sofia_globals.msg_queue_thread[i]) {
                                switch_threadattr_t *thd_attr = NULL;
 
-                               switch_queue_create(&mod_sofia_globals.msg_queue[i], SOFIA_MSG_QUEUE_SIZE, mod_sofia_globals.pool);
-
                                switch_threadattr_create(&thd_attr, mod_sofia_globals.pool);
                                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                                switch_threadattr_priority_increase(thd_attr);
                                switch_thread_create(&mod_sofia_globals.msg_queue_thread[i], 
                                                                         thd_attr, 
                                                                         sofia_msg_thread_run, 
-                                                                        mod_sofia_globals.msg_queue[i]
+                                                                        mod_sofia_globals.msg_queue, 
                                                                         mod_sofia_globals.pool);
                        }
                }
@@ -1413,12 +1438,12 @@ void sofia_msg_thread_start(int idx)
        switch_mutex_unlock(mod_sofia_globals.mutex);
 }
 
-
+//static int foo = 0;
 static void sofia_queue_message(sofia_dispatch_event_t *de)
 {
-       int idx = 0, queued = 0;
+       int launch = 0;
 
-       if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) {
+       if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue) {
                sofia_process_dispatch_event(&de);
                return;
        }
@@ -1430,25 +1455,18 @@ static void sofia_queue_message(sofia_dispatch_event_t *de)
        }
 
 
- again:
-
-       for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) {
-               if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) {
-                       queued++;
-                       break;
-               }
+       if ((switch_queue_size(mod_sofia_globals.msg_queue) > (SOFIA_MSG_QUEUE_SIZE * msg_queue_threads))) {
+               launch++;
        }
 
-       if (!queued) {
 
+       if (launch) {
                if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) {
                        sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1);
-                       goto again;
                }
-               
-               switch_queue_push(mod_sofia_globals.msg_queue[0], de);
        }
-       
+
+       switch_queue_push(mod_sofia_globals.msg_queue, de);
 }
 
 
@@ -1468,6 +1486,7 @@ void sofia_event_callback(nua_event_t event,
                return;
        }
 
+       
 
        switch_mutex_lock(profile->flag_mutex);
        profile->queued_events++;
@@ -1483,6 +1502,13 @@ void sofia_event_callback(nua_event_t event,
        de->nua = nua_stack_ref(nua);
 
        if (event == nua_i_invite && !sofia_private) {
+               int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000);
+               
+               if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) {
+                       nua_respond(nh, 503, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
+                       return;
+               }
+
                if (!(sofia_private = su_alloc(nh->nh_home, sizeof(*sofia_private)))) {
                        abort();
                }
@@ -1516,8 +1542,8 @@ void sofia_event_callback(nua_event_t event,
                }
        }
 
-       
        sofia_queue_message(de);
+       switch_os_yield();
 }
 
 
index f7d7e3aae402ead74cc4e2a74c1996f8f38f7001..7afe342103ba228cae5bab314d9b23a5cdbac618 100644 (file)
@@ -1415,7 +1415,7 @@ SWITCH_DECLARE(void) switch_channel_wait_for_state(switch_channel_t *channel, sw
                        (other_channel && switch_channel_down_nosig(other_channel)) || switch_channel_down(channel)) {
                        break;
                }
-               switch_yield(20000);
+               switch_cond_next();
        }
 }
 
index 23670fac93df0d5e3e4799be89be2e21e4cfc907..38ec16bf262e273325788576ed9bc0b9f1560818 100644 (file)
@@ -35,7 +35,7 @@
 #include <switch.h>
 #include <switch_event.h>
 //#define SWITCH_EVENT_RECYCLE
-#define DISPATCH_QUEUE_LEN 1000
+#define DISPATCH_QUEUE_LEN 100
 //#define DEBUG_DISPATCH_QUEUES
 
 /*! \brief A node to store binded events */
@@ -74,23 +74,20 @@ static switch_mutex_t *BLOCK = NULL;
 static switch_mutex_t *POOL_LOCK = NULL;
 static switch_memory_pool_t *RUNTIME_POOL = NULL;
 static switch_memory_pool_t *THRUNTIME_POOL = NULL;
-#define NUMBER_OF_QUEUES 3
-static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 };
-static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 };
 static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 };
 static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 };
-static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 };
-static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
+static switch_queue_t *EVENT_DISPATCH_QUEUE = NULL;
 static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
 static switch_hash_t *CUSTOM_HASH = NULL;
 static int THREAD_COUNT = 0;
+static int DISPATCH_THREAD_COUNT = 0;
 static int SYSTEM_RUNNING = 0;
 static uint64_t EVENT_SEQUENCE_NR = 0;
 #ifdef SWITCH_EVENT_RECYCLE
 static switch_queue_t *EVENT_RECYCLE_QUEUE = NULL;
 static switch_queue_t *EVENT_HEADER_RECYCLE_QUEUE = NULL;
 #endif
-static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool);
+static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool);
 
 static char *my_dup(const char *s)
 {
@@ -244,9 +241,10 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
 
        switch_mutex_lock(EVENT_QUEUE_MUTEX);
        THREAD_COUNT++;
+       DISPATCH_THREAD_COUNT++;
 
-       for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
-               if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
+       for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) {
+               if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) {
                        break;
                }
        }
@@ -254,7 +252,6 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
        EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
        
-       switch_core_thread_set_cpu_affinity(my_id);
 
        for (;;) {
                void *pop = NULL;
@@ -265,7 +262,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
                }
 
                if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
-                       break;
+                       continue;
                }
 
                if (!pop) {
@@ -280,6 +277,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
        switch_mutex_lock(EVENT_QUEUE_MUTEX);
        EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0;
        THREAD_COUNT--;
+       DISPATCH_THREAD_COUNT--;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
@@ -287,74 +285,41 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
 
 }
 
-
-static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, void *obj)
+static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp)
 {
-       switch_queue_t *queue = (switch_queue_t *) obj;
-       uint32_t index = 0;
-       int my_id = 0;
 
-       switch_mutex_lock(EVENT_QUEUE_MUTEX);
-       THREAD_COUNT++;
-       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+       switch_event_t *event = *eventp;
 
-       for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
-               if (EVENT_QUEUE[my_id] == queue) {
-                       break;
-               }
+       if (!SYSTEM_RUNNING) {
+               return SWITCH_STATUS_FALSE;
        }
+       
+       while (event) {
+               int launch = 0;
 
-       for (;;) {
-               void *pop = NULL;
-               switch_event_t *event = NULL;
+               switch_mutex_lock(EVENT_QUEUE_MUTEX);           
 
-               if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
-                       break;
-               }
-
-               if (!pop) {
-                       break;
+               if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
+                       launch++;
                }
-
-               if (!SYSTEM_RUNNING) {
-                       break;
-               }
-
-               event = (switch_event_t *) pop;
-
-               while (event) {
-
-                       for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
-                               if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
-                                       event = NULL;
-                                       break;
-                               }
-                       }
-
-                       if (event) {
-                               if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
-                                       switch_mutex_lock(EVENT_QUEUE_MUTEX);
-                                       launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
-                                       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
-                               } else {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Event Thread %d is blocking\n", my_id);
-                                       switch_queue_push(EVENT_DISPATCH_QUEUE[0], event);
-                                       event = NULL;
-                               }
+               
+               if (launch) {
+                       if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
+                               launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, RUNTIME_POOL);
                        }
                }
-       }
-
-       switch_mutex_lock(EVENT_QUEUE_MUTEX);
-       THREAD_COUNT--;
-       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread %d Ended.\n", my_id);
-       return NULL;
+               switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
+               *eventp = NULL;
+               switch_queue_push(EVENT_DISPATCH_QUEUE, event);
+               event = NULL;
+               
+       }
+       
+       return SWITCH_STATUS_SUCCESS;
 }
 
-
 SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
 {
        switch_event_types_t e;
@@ -499,20 +464,23 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
        SYSTEM_RUNNING = 0;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
-       for (x = 0; x < 3; x++) {
-               if (EVENT_QUEUE[x]) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x);
-                       switch_queue_trypush(EVENT_QUEUE[x], NULL);
-                       switch_queue_interrupt_all(EVENT_QUEUE[x]);
-               }
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
+
+       
+       for(x = 0; x < DISPATCH_THREAD_COUNT; x++) {
+               switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
        }
 
-       for (x = 0; x < SOFT_MAX_DISPATCH; x++) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x);
-               switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL);
-               switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE[x]);
+       switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
+
+       for(x = 0; x < DISPATCH_THREAD_COUNT; x++) {
+               switch_status_t st;
+               switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
        }
 
+       x = 0;
        while (x < 10000 && THREAD_COUNT) {
                switch_cond_next();
                if (THREAD_COUNT == last) {
@@ -521,37 +489,16 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
                last = THREAD_COUNT;
        }
 
-       for (x = 0; x < SOFT_MAX_DISPATCH; x++) {
+       {
                void *pop = NULL;
                switch_event_t *event = NULL;
-               switch_status_t st;
-
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch thread %d\n", x);
-               switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
 
-               while (switch_queue_trypop(EVENT_DISPATCH_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
+               while (switch_queue_trypop(EVENT_DISPATCH_QUEUE, &pop) == SWITCH_STATUS_SUCCESS && pop) {
                        event = (switch_event_t *) pop;
                        switch_event_destroy(&event);
                }
        }
 
-       for (x = 0; x < NUMBER_OF_QUEUES; x++) {
-               void *pop = NULL;
-               switch_event_t *event = NULL;
-               switch_status_t st;
-
-               if (EVENT_QUEUE_THREADS[x]) {
-
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x);
-                       switch_thread_join(&st, EVENT_QUEUE_THREADS[x]);
-
-                       while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
-                               event = (switch_event_t *) pop;
-                               switch_event_destroy(&event);
-                       }
-               }
-       }
-
        for (hi = switch_hash_first(NULL, CUSTOM_HASH); hi; hi = switch_hash_next(hi)) {
                switch_event_subclass_t *subclass;
                switch_hash_this(hi, &var, NULL, &val);
@@ -568,7 +515,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
        return SWITCH_STATUS_SUCCESS;
 }
 
-static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool)
+static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool)
 {
        switch_threadattr_t *thd_attr;
        uint32_t index = 0;
@@ -584,14 +531,14 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
        }
 
        for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) {
-               if (EVENT_DISPATCH_QUEUE[index]) {
+               if (EVENT_DISPATCH_QUEUE_THREADS[index]) {
                        continue;
                }
-               switch_queue_create(&EVENT_DISPATCH_QUEUE[index], len, pool);
+
                switch_threadattr_create(&thd_attr, pool);
                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                switch_threadattr_priority_increase(thd_attr);
-               switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
+               switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool);
                while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
                if (index == 1) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
@@ -616,8 +563,10 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
         */
        
        /* don't need any more dispatch threads than we have CPU's*/
-       MAX_DISPATCH = switch_core_cpu_count() + 1;
-
+       MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1;
+       if (MAX_DISPATCH < 2) {
+               MAX_DISPATCH = 2;
+       }
 
        switch_assert(pool != NULL);
        THRUNTIME_POOL = RUNTIME_POOL = pool;
@@ -648,7 +597,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
        //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
        //switch_threadattr_priority_increase(thd_attr);
 
-       launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
+       switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
+       launch_dispatch_threads(1, RUNTIME_POOL);
+
        //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
        //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
        //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
@@ -1782,30 +1733,11 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con
                (*event)->event_user_data = user_data;
        }
 
-       if (!EVENT_QUEUE_THREADS[(*event)->priority] && (*event)->priority < 3) {
-               switch_threadattr_t *thd_attr;
-
-               switch_queue_create(&EVENT_QUEUE[(*event)->priority], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
-               switch_threadattr_create(&thd_attr, THRUNTIME_POOL);
-               switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-               switch_threadattr_priority_increase(thd_attr);
-               switch_thread_create(&EVENT_QUEUE_THREADS[(*event)->priority], thd_attr, switch_event_thread, EVENT_QUEUE[(*event)->priority], RUNTIME_POOL);
-       }
-
-       for (;;) {
-               if (switch_queue_trypush(EVENT_QUEUE[(*event)->priority], *event) == SWITCH_STATUS_SUCCESS) {
-                       goto end;
-               }
-
-
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event queue is full!\n");
-               switch_yield(100000);
+       if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
+               switch_event_destroy(event);
+               return SWITCH_STATUS_FALSE;
        }
 
-  end:
-
-       *event = NULL;
-
        return SWITCH_STATUS_SUCCESS;
 }
 
index e7256538f36326fb07998df846b7af424fa1810a..381d2dc5ada6607697ed6e25cc39ea7a0f419a33 100644 (file)
@@ -140,7 +140,7 @@ typedef struct timer_matrix timer_matrix_t;
 
 static timer_matrix_t TIMER_MATRIX[MAX_ELEMENTS + 1];
 
-static void os_yield(void)
+SWITCH_DECLARE(void) switch_os_yield(void)
 {
 #if defined(WIN32)
        SwitchToThread();
@@ -467,7 +467,7 @@ SWITCH_DECLARE(void) switch_sleep(switch_interval_time_t t)
 SWITCH_DECLARE(void) switch_cond_next(void)
 {
        if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
-               os_yield();
+               switch_os_yield();
                return;
        }
 #ifdef DISABLE_1MS_COND
@@ -633,7 +633,7 @@ static switch_status_t timer_next(switch_timer_t *timer)
                check_roll();
 
                if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
-                       os_yield();
+                       switch_os_yield();
                        globals.use_cond_yield = 0;
                } else {
                        if (globals.use_cond_yield == 1) {
@@ -884,7 +884,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime)
                        }
 
                        if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
-                               os_yield();
+                               switch_os_yield();
                        } else {
                                if (tfd > -1 && globals.RUNNING == 1) {
                                        uint64_t exp;