]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
improve thread pool logic
authorAnthony Minessale <anthm@freeswitch.org>
Fri, 27 Sep 2013 18:37:05 +0000 (23:37 +0500)
committerAnthony Minessale <anthm@freeswitch.org>
Fri, 27 Sep 2013 18:37:05 +0000 (23:37 +0500)
src/include/private/switch_core_pvt.h
src/switch_core.c
src/switch_core_session.c

index fffd51abac266a2e2cd0e87b3a4a9f37fc94f157..f6d3474386e513394f5ee982bbd47c81da741c7a 100644 (file)
@@ -290,10 +290,14 @@ struct switch_session_manager {
        switch_queue_t *thread_queue;
        switch_thread_t *manager_thread;
        switch_mutex_t *mutex;
+       switch_thread_cond_t *cond;
+       switch_mutex_t *cond_mutex;
+       switch_mutex_t *cond2_mutex;
        int ready;
        int running;
        int busy;
        int popping;
+       int starting;
 };
 
 extern struct switch_session_manager session_manager;
index 2f6cd7b0234e4ae8b6ba2e4cf6888027254ea69d..73c9d056ac5af09f5db52cee218614f260a5637c 100644 (file)
@@ -1792,6 +1792,12 @@ static void switch_load_core_config(const char *file)
        switch_core_hash_insert(runtime.ptimes, "isac", &d_30);
        switch_core_hash_insert(runtime.ptimes, "G723", &d_30);
 
+       if (runtime.cpu_count == 1) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
+                                                 "Implicitly setting events-use-dispatch based on a single CPU\n");
+               runtime.events_use_dispatch = 1;
+       }
+
        if ((xml = switch_xml_open_cfg(file, &cfg, NULL))) {
                switch_xml_t settings, param;
 
index feeab6023851937f2b90163b6fe9328496f304e3..dca86f555884c35ab11f963277508afea00365ba 100644 (file)
@@ -1588,6 +1588,7 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
        int check = 0;
 
        switch_mutex_lock(session_manager.mutex);
+       session_manager.starting--;
        session_manager.running++;
        switch_mutex_unlock(session_manager.mutex);
 #ifdef DEBUG_THREAD_POOL
@@ -1682,6 +1683,33 @@ static void thread_launch_failure(void)
        switch_mutex_unlock(session_manager.mutex);
 }
 
+static int wake_queue(void)
+{
+       switch_status_t status;
+       int tries = 0;
+
+ top:
+       
+       status = switch_mutex_trylock(session_manager.cond_mutex);
+
+       if (status == SWITCH_STATUS_SUCCESS) {
+               switch_thread_cond_signal(session_manager.cond);
+               switch_mutex_unlock(session_manager.cond_mutex);
+               return 1;
+       } else {
+               if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
+                       switch_mutex_unlock(session_manager.cond2_mutex);
+               } else {
+                       if (++tries < 10) {
+                               switch_cond_next();
+                               goto top;
+                       }
+               }
+       }
+
+       return 0;
+}
+
 static switch_status_t check_queue(void) 
 {
        switch_status_t status = SWITCH_STATUS_FALSE;
@@ -1690,7 +1718,7 @@ static switch_status_t check_queue(void)
 
        switch_mutex_lock(session_manager.mutex);
        ttl = switch_queue_size(session_manager.thread_queue);
-       x = (session_manager.running - session_manager.busy);
+       x = ((session_manager.running + session_manager.starting) - session_manager.busy);
        switch_mutex_unlock(session_manager.mutex);
 
 
@@ -1717,6 +1745,10 @@ static switch_status_t check_queue(void)
                } else {
                        status = SWITCH_STATUS_SUCCESS;
                }
+
+               switch_mutex_lock(session_manager.mutex);
+               session_manager.starting++;
+               switch_mutex_unlock(session_manager.mutex);
                x++;
        }
 
@@ -1726,12 +1758,20 @@ static switch_status_t check_queue(void)
 
 static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
 {
-       int x = 0;
+
+       uint32_t sleep = 10000000;
+       switch_time_t next = switch_micro_time_now() + sleep;
+
+       switch_mutex_lock(session_manager.cond_mutex);
 
        while(session_manager.ready) {
-               switch_yield(100000);
+               int check = 1;
+
+               switch_mutex_lock(session_manager.cond2_mutex);
+               switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, sleep);
+               switch_mutex_unlock(session_manager.cond2_mutex);
 
-               if (++x == 300) {
+               if (switch_micro_time_now() >= next) {
                        if (session_manager.popping) {
 #ifdef DEBUG_THREAD_POOL
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
@@ -1739,17 +1779,20 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t
 #endif
                                switch_queue_interrupt_all(session_manager.thread_queue);
 
-                               x--;
-
-                               continue;
+                               sleep = 100000;
+                               check = 0;
                        } else {
-                               x = 0;
+                               sleep = 10000000;
                        }
                }
 
-               check_queue();
+               if (check) check_queue();
+
+               next = switch_micro_time_now() + sleep;
        }
 
+       switch_mutex_unlock(session_manager.cond_mutex);
+
        while(session_manager.running) {
                switch_queue_interrupt_all(session_manager.thread_queue);
                switch_yield(20000);
@@ -1770,7 +1813,7 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
        *tdp = NULL;
 
        switch_queue_push(session_manager.thread_queue, td);
-       check_queue();
+       wake_queue();
 
        return status;  
 }
@@ -1793,7 +1836,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
                td->obj = session;
                td->func = switch_core_session_thread;
                switch_queue_push(session_manager.thread_queue, td);
-               check_queue();
+               wake_queue();
        }
        switch_mutex_unlock(session->mutex);
 
@@ -2438,6 +2481,9 @@ void switch_core_session_init(switch_memory_pool_t *pool)
                switch_threadattr_t *thd_attr;
 
                switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
+               switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
+               switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
+               switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
                switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
                switch_threadattr_create(&thd_attr, session_manager.memory_pool);
                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);