]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
Merge pull request #475 in FS/freeswitch from ~ARTURZ/freeswitch:FS-8142-switch_core_...
authorAnthony Minessale II <anthony.minessale@gmail.com>
Wed, 9 Sep 2015 16:38:14 +0000 (11:38 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 9 Sep 2015 17:37:52 +0000 (12:37 -0500)
* commit 'cd4c3188e4f715ff129dc4eea1a4ba50140c2a42':
  FS-8142 Fix a thread cache thread-safety and caching

src/include/private/switch_core_pvt.h
src/switch_core_session.c

index 4b8be156b80a0d7cbef89fcd14ece1c2d16121ef..4fa4f8e4c7cbc596dba4b3ab4d06a29141410031 100644 (file)
@@ -290,16 +290,10 @@ struct switch_session_manager {
        uint32_t session_limit;
        switch_size_t session_id;
        switch_queue_t *thread_queue;
-       switch_thread_t *manager_thread;
        switch_mutex_t *mutex;
        switch_thread_cond_t *cond;
-       switch_mutex_t *cond_mutex;
-       switch_mutex_t *cond2_mutex;
-       int ready;
        int running;
        int busy;
-       int popping;
-       int starting;
 };
 
 extern struct switch_session_manager session_manager;
index a0e1f01f34d39ba4442841b7cb1c6919e5c00453..9882fc675a017234ee36c822275efc69d3c2dfa2 100644 (file)
@@ -1655,47 +1655,21 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
 {
        switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
        switch_memory_pool_t *pool = node->pool;
-       void *pop;
-       int check = 0;
 
-       switch_mutex_lock(session_manager.mutex);
-       session_manager.starting--;
-       session_manager.running++;
-       switch_mutex_unlock(session_manager.mutex);
 #ifdef DEBUG_THREAD_POOL
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) thread);
 #endif
-       while(session_manager.ready) {
-               switch_status_t check_status;
-
-               pop = NULL;
-
-               if (check) {
-                       check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
-               } else {
-                       switch_mutex_lock(session_manager.mutex);
-                       session_manager.popping++;
-                       switch_mutex_unlock(session_manager.mutex);
-                       
-                       check_status = switch_queue_pop(session_manager.thread_queue, &pop);
-
-                       switch_mutex_lock(session_manager.mutex);
-                       session_manager.popping--;
-                       switch_mutex_unlock(session_manager.mutex);
-               }
-
-               if (check_status == SWITCH_STATUS_SUCCESS && pop) {
+       while (1) {
+               void *pop;
+               switch_status_t check_status = switch_queue_pop_timeout(session_manager.thread_queue, &pop, apr_time_from_sec(5));
+               if (check_status == SWITCH_STATUS_SUCCESS) {
                        switch_thread_data_t *td = (switch_thread_data_t *) pop;
                        
                        if (!td) break;
 
-                       switch_mutex_lock(session_manager.mutex);
-                       session_manager.busy++;
-                       switch_mutex_unlock(session_manager.mutex);
 #ifdef DEBUG_THREAD_POOL                       
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
 #endif
-
                        td->func(thread, td->obj);
 
                        if (td->pool) {
@@ -1711,23 +1685,22 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
                        switch_mutex_lock(session_manager.mutex);
                        session_manager.busy--;
                        switch_mutex_unlock(session_manager.mutex);
-
                } else {
-                       if (check) {
+                       switch_mutex_lock(session_manager.mutex);
+                       if (!switch_status_is_timeup(check_status) || session_manager.running > session_manager.busy) {
+                               if (!--session_manager.running) {
+                                       switch_thread_cond_signal(session_manager.cond);
+                               }
+                               switch_mutex_unlock(session_manager.mutex);
                                break;
                        }
-                       check++;
+                       switch_mutex_unlock(session_manager.mutex);
                }
        }
 #ifdef DEBUG_THREAD_POOL
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long) thread);
 #endif
-       switch_mutex_lock(session_manager.mutex);
-       session_manager.running--;
-       switch_mutex_unlock(session_manager.mutex);
-
        switch_core_destroy_memory_pool(&pool);
-
        return NULL;
 }
 
@@ -1754,46 +1727,18 @@ static void thread_launch_failure(void)
        switch_mutex_unlock(session_manager.mutex);
 }
 
-static int wake_queue(void)
-{
-       switch_status_t status;
-       int tries = 0;
-
- top:
-       
-       status = switch_mutex_trylock(session_manager.cond_mutex);
-
-       if (status == SWITCH_STATUS_SUCCESS) {
-               switch_thread_cond_signal(session_manager.cond);
-               switch_mutex_unlock(session_manager.cond_mutex);
-               return 1;
-       } else {
-               if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
-                       switch_mutex_unlock(session_manager.cond2_mutex);
-               } else {
-                       if (++tries < 10) {
-                               switch_cond_next();
-                               goto top;
-                       }
-               }
-       }
-
-       return 0;
-}
-
 static switch_status_t check_queue(void) 
 {
        switch_status_t status = SWITCH_STATUS_FALSE;
-       int ttl = 0;
-       int x = 0;
-
        switch_mutex_lock(session_manager.mutex);
-       ttl = switch_queue_size(session_manager.thread_queue);
-       x = ((session_manager.running + session_manager.starting) - session_manager.busy);
+       if (session_manager.running >= ++session_manager.busy) {
+               switch_mutex_unlock(session_manager.mutex);
+               return SWITCH_STATUS_SUCCESS;
+       }
+       ++session_manager.running;
        switch_mutex_unlock(session_manager.mutex);
 
-
-       while (x < ttl) {
+       {
                switch_thread_t *thread;
                switch_threadattr_t *thd_attr;
                switch_memory_pool_t *pool;
@@ -1809,6 +1754,11 @@ static switch_status_t check_queue(void)
                switch_threadattr_priority_set(thd_attr, SWITCH_PRI_LOW);
 
                if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
+                       switch_mutex_lock(session_manager.mutex);
+                       if (!--session_manager.running) {
+                               switch_thread_cond_signal(session_manager.cond);
+                       }
+                       switch_mutex_unlock(session_manager.mutex);
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
                        switch_core_destroy_memory_pool(&pool);
                        status = SWITCH_STATUS_GENERR;
@@ -1816,76 +1766,11 @@ static switch_status_t check_queue(void)
                } else {
                        status = SWITCH_STATUS_SUCCESS;
                }
-
-               switch_mutex_lock(session_manager.mutex);
-               session_manager.starting++;
-               switch_mutex_unlock(session_manager.mutex);
-               x++;
        }
-
        return status;
 }
 
 
-static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
-{
-
-       uint32_t sleep = 10000000;
-       switch_time_t next = switch_micro_time_now() + sleep;
-
-       switch_mutex_lock(session_manager.cond_mutex);
-
-       while(session_manager.ready) {
-               int check = 1;
-               int ttl = 0;
-               uint32_t xsleep = sleep;
-
-               switch_mutex_lock(session_manager.mutex);
-               ttl = switch_queue_size(session_manager.thread_queue);
-               switch_mutex_unlock(session_manager.mutex);
-
-
-               if (!ttl) {
-                       xsleep = 10000;
-               }
-
-               if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
-                       switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, xsleep);
-                       switch_mutex_unlock(session_manager.cond2_mutex);
-               }
-               
-
-               if (switch_micro_time_now() >= next) {
-                       if (session_manager.popping) {
-#ifdef DEBUG_THREAD_POOL
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
-                                                                 "Thread pool: running:%d busy:%d popping:%d\n", session_manager.running, session_manager.busy, session_manager.popping);
-#endif
-                               switch_queue_interrupt_all(session_manager.thread_queue);
-
-                               sleep = 100000;
-                               check = 0;
-                       } else {
-                               sleep = 10000000;
-                       }
-               }
-
-               if (check) check_queue();
-
-               next = switch_micro_time_now() + sleep;
-       }
-
-       switch_mutex_unlock(session_manager.cond_mutex);
-
-       while(session_manager.running) {
-               switch_queue_interrupt_all(session_manager.thread_queue);
-               switch_yield(20000);
-       }
-
-       
-       return NULL;
-}
-
 SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
 {
        switch_status_t status = SWITCH_STATUS_SUCCESS;
@@ -1896,8 +1781,8 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
        td = *tdp;
        *tdp = NULL;
 
-       switch_queue_push(session_manager.thread_queue, td);
-       wake_queue();
+       status = switch_queue_push(session_manager.thread_queue, td);
+       check_queue();
 
        return status;  
 }
@@ -1913,14 +1798,13 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
        } else if (switch_test_flag(session, SSF_THREAD_STARTED)) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n");
        } else {
-               status = SWITCH_STATUS_SUCCESS;
                switch_set_flag(session, SSF_THREAD_RUNNING);
                switch_set_flag(session, SSF_THREAD_STARTED);
                td = switch_core_session_alloc(session, sizeof(*td));
                td->obj = session;
                td->func = switch_core_session_thread;
-               switch_queue_push(session_manager.thread_queue, td);
-               wake_queue();
+               status = switch_queue_push(session_manager.thread_queue, td);
+               check_queue();
        }
        switch_mutex_unlock(session->mutex);
 
@@ -2560,39 +2444,19 @@ void switch_core_session_init(switch_memory_pool_t *pool)
        session_manager.session_id = 1;
        session_manager.memory_pool = pool;
        switch_core_hash_init(&session_manager.session_table);
-       
-       if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
-               switch_threadattr_t *thd_attr;
-
-               switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
-               switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
-               switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
-               switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
-               switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
-               switch_threadattr_create(&thd_attr, session_manager.memory_pool);
-               switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-               session_manager.ready = 1;
-               switch_thread_create(&session_manager.manager_thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);            
-       }
-
+       switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_DEFAULT, session_manager.memory_pool);
+       switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
+       switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
 }
 
 void switch_core_session_uninit(void)
 {
-       int sanity = 100;
-       switch_status_t st = SWITCH_STATUS_FALSE;
-
-       session_manager.ready = 0;
-       wake_queue();
-
-       while(session_manager.running && --sanity > 0) {
-               switch_queue_interrupt_all(session_manager.thread_queue);
-               switch_yield(100000);
-       }
-
-       switch_thread_join(&st, session_manager.manager_thread);
+       switch_queue_term(session_manager.thread_queue);
+       switch_mutex_lock(session_manager.mutex);
+       if (session_manager.running)
+               switch_thread_cond_timedwait(session_manager.cond, session_manager.mutex, apr_time_from_sec(10));
+       switch_mutex_unlock(session_manager.mutex);
        switch_core_hash_destroy(&session_manager.session_table);
-       
 }
 
 SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
@@ -3049,7 +2913,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_refresh_video(switch_core_se
 SWITCH_DECLARE(void) switch_core_session_debug_pool(switch_stream_handle_t *stream)
 {
        stream->write_function(stream, "Thread pool: running:%d busy:%d popping:%d\n",
-               session_manager.running, session_manager.busy, session_manager.popping);
+               session_manager.running, session_manager.busy, session_manager.running - session_manager.busy);
 }
 
 /* For Emacs: