]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
cache session threads and reuse them if possible
authorAnthony Minessale <anthm@freeswitch.org>
Tue, 21 Aug 2012 05:14:43 +0000 (00:14 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Tue, 21 Aug 2012 05:14:50 +0000 (00:14 -0500)
src/include/private/switch_core_pvt.h
src/include/switch_core.h
src/include/switch_types.h
src/switch_core.c
src/switch_core_session.c
src/switch_core_state_machine.c
src/switch_event.c

index 2df37b0e053bfce3054799b47f12dd92b6b996b9..95fd120337e2bb51673fbb86f80174d09d732701 100644 (file)
@@ -283,6 +283,11 @@ struct switch_session_manager {
        uint32_t session_count;
        uint32_t session_limit;
        switch_size_t session_id;
+       switch_queue_t *thread_queue;
+       switch_mutex_t *mutex;
+       int ready;
+       int running;
+       int busy;
 };
 
 extern struct switch_session_manager session_manager;
index dcaace27240dc369341bf879a5558afaaf4638f7..1fda42588ccb6e42b7d39a698810655dfa0f63d8 100644 (file)
@@ -690,6 +690,9 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_
 */
 SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
 
+
+SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session);
+
 /*! 
   \brief Retrieve a pointer to the channel object associated with a given session
   \param session the session to retrieve from
index 381ab22012c8ba28fb5c0fdce7f81fb0368260f8..6a593a879e78ab49702c82ea43d9b30b294945ec 100644 (file)
@@ -323,7 +323,8 @@ typedef enum {
        SCF_SYNC_CLOCK_REQUESTED = (1 << 19),
        SCF_CORE_ODBC_REQ = (1 << 20),
        SCF_DEBUG_SQL = (1 << 21),
-       SCF_API_EXPANSION = (1 << 22)
+       SCF_API_EXPANSION = (1 << 22),
+       SCF_SESSION_THREAD_POOL = (1 << 23)
 } switch_core_flag_enum_t;
 typedef uint32_t switch_core_flag_t;
 
index 49b84f998aefce759e62ade644f775f4cb5f21ee..e259a2d8a8b044528010315ac4223f739c18c605 100644 (file)
@@ -1440,6 +1440,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
        switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
        switch_set_flag((&runtime), SCF_CLEAR_SQL);
        switch_set_flag((&runtime), SCF_API_EXPANSION);
+       switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL);
 #ifdef WIN32
        switch_set_flag((&runtime), SCF_THREADED_SYSTEM_EXEC);
 #endif
@@ -1751,6 +1752,12 @@ static void switch_load_core_config(const char *file)
                                        } else {
                                                switch_clear_flag((&runtime), SCF_AUTO_SCHEMAS);
                                        }
+                               } else if (!strcasecmp(var, "session-thread-pool")) {
+                                       if (switch_true(val)) {
+                                               switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL);
+                                       } else {
+                                               switch_clear_flag((&runtime), SCF_SESSION_THREAD_POOL);
+                                       }
                                } else if (!strcasecmp(var, "auto-clear-sql")) {
                                        if (switch_true(val)) {
                                                switch_set_flag((&runtime), SCF_CLEAR_SQL);
index 4113afd1e016e336c9b51e4fbd22f8ac064781a9..4b334315e299dc03473727aa0db683a5ef36d608 100644 (file)
@@ -1451,12 +1451,156 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread(switch_thread_t *thre
        return NULL;
 }
 
+typedef struct switch_thread_pool_node_s {
+       switch_memory_pool_t *pool;
+} switch_thread_pool_node_t;
+
+static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_thread_t *thread, void *obj)
+{
+       switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
+       switch_memory_pool_t *pool = node->pool;
+       void *pop;
+       int check = 0;
+
+       switch_mutex_lock(session_manager.mutex);
+       session_manager.running++;
+       switch_mutex_unlock(session_manager.mutex);
+
+       while(session_manager.ready) {
+               switch_status_t check_status;
+
+               if (check) {
+                       check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
+               } else {
+                       check_status = switch_queue_pop(session_manager.thread_queue, &pop);
+               }
+
+               if (check_status == SWITCH_STATUS_SUCCESS) {
+                       switch_core_session_t *session = (switch_core_session_t *) pop;
+
+                       if (!session) break;
+
+                       
+                       switch_mutex_lock(session_manager.mutex);
+                       session_manager.busy++;
+                       switch_mutex_unlock(session_manager.mutex);
+                       
+                       switch_core_session_thread(thread, (void *) session);
+
+                       switch_mutex_lock(session_manager.mutex);
+                       session_manager.busy--;
+                       switch_mutex_unlock(session_manager.mutex);
+
+               } else {
+                       if (check) {
+                               break;
+                       }
+                       check++;
+               }
+       }
+
+       switch_mutex_lock(session_manager.mutex);
+       session_manager.running--;
+       switch_mutex_unlock(session_manager.mutex);
+
+       switch_core_destroy_memory_pool(&pool);
+
+       return NULL;
+}
+
+
+static switch_status_t check_queue(void) 
+{
+       switch_status_t status = SWITCH_STATUS_FALSE;
+       int ttl = 0;
+       int x = 0;
+
+       switch_mutex_lock(session_manager.mutex);
+       ttl = switch_queue_size(session_manager.thread_queue);
+       x = (session_manager.running - session_manager.busy);
+       switch_mutex_unlock(session_manager.mutex);
+
+
+
+       while (x < ttl) {
+               switch_thread_t *thread;
+               switch_threadattr_t *thd_attr;
+               switch_memory_pool_t *pool;
+               switch_thread_pool_node_t *node;
+               
+               switch_core_new_memory_pool(&pool);
+               node = switch_core_alloc(pool, sizeof(*node));
+               node->pool = pool;
+
+               switch_threadattr_create(&thd_attr, node->pool);
+               switch_threadattr_detach_set(thd_attr, 1);
+               switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+
+               if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
+                       switch_core_destroy_memory_pool(&pool);
+                       status = SWITCH_STATUS_GENERR;
+               } else {
+                       status = SWITCH_STATUS_SUCCESS;
+               }
+               x++;
+       }
+
+       return status;
+}
+
+
+static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
+{
+       int x = 0;
+
+       while(session_manager.ready) {
+               switch_yield(100000);
+
+               if (++x == 300) {
+                       switch_queue_interrupt_all(session_manager.thread_queue);
+                       x = 0;
+               }
+
+               check_queue();
+       }
+
+       return NULL;
+}
+
+
+SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
+{
+       if (session_manager.ready == 1) {
+               switch_thread_t *thread;
+               switch_threadattr_t *thd_attr;
+
+               switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
+               switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
+
+               switch_threadattr_create(&thd_attr, session_manager.memory_pool);
+               switch_threadattr_detach_set(thd_attr, 1);
+               switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+               switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
+               session_manager.ready++;
+       }
+
+       switch_queue_push(session_manager.thread_queue, session);
+       check_queue();
+       return SWITCH_STATUS_SUCCESS;
+}
+
+
 SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_session_t *session)
 {
        switch_status_t status = SWITCH_STATUS_FALSE;
        switch_thread_t *thread;
-       switch_threadattr_t *thd_attr;;
+       switch_threadattr_t *thd_attr;
 
+       if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
+               return switch_core_session_thread_pool_launch(session);
+       }
+       
        switch_threadattr_create(&thd_attr, session->pool);
        switch_threadattr_detach_set(thd_attr, 1);
 
@@ -2042,16 +2186,24 @@ SWITCH_DECLARE(uint32_t) switch_core_sessions_per_second(uint32_t new_limit)
 
 void switch_core_session_init(switch_memory_pool_t *pool)
 {
+
+
        memset(&session_manager, 0, sizeof(session_manager));
        session_manager.session_limit = 1000;
        session_manager.session_id = 1;
        session_manager.memory_pool = pool;
        switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool);
+       session_manager.ready = 1;
+
+
 }
 
 void switch_core_session_uninit(void)
 {
        switch_core_hash_destroy(&session_manager.session_table);
+       session_manager.ready = 0;
+       switch_queue_interrupt_all(session_manager.thread_queue);
+       
 }
 
 SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
index 46d49e688301a04df031f2f7d9f5288e91573dd6..da00b6bca5ec0161e4ef74ddbda8f1f3dc5ba601 100644 (file)
@@ -336,7 +336,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
        const switch_state_handler_table_t *driver_state_handler = NULL;
        const switch_state_handler_table_t *application_state_handler = NULL;
        int silly = 0;
-       uint32_t new_loops = 5000;
+       //      uint32_t new_loops = 5000;
 
        /*
           Life of the channel. you have channel and pool in your session
@@ -468,6 +468,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
                endstate = switch_channel_get_state(session->channel);
 
                if (endstate == switch_channel_get_running_state(session->channel)) {
+                       /**
                        if (endstate == CS_NEW) {
                                switch_cond_next();
                                switch_ivr_parse_all_events(session);
@@ -477,6 +478,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
                                        switch_channel_hangup(session->channel, SWITCH_CAUSE_INVALID_CALL_REFERENCE);
                                }
                        } else {
+                       **/
                                switch_ivr_parse_all_events(session);
                                switch_ivr_parse_all_events(session);
 
@@ -490,7 +492,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
 
                                switch_ivr_parse_all_events(session);
                                switch_ivr_parse_all_events(session);
-                       }
+                               //}
                }
        }
   done:
index 5b7891d496499e3be4a3f5e901b4a90395e9257c..4a53229af7ab7a9e0ac3f01a4b7b71a7d527f25f 100644 (file)
@@ -307,6 +307,8 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp
                if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
                        launch++;
                }
+
+               switch_mutex_unlock(EVENT_QUEUE_MUTEX);
                
                if (launch) {
                        if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
@@ -314,8 +316,6 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp
                        }
                }
 
-               switch_mutex_unlock(EVENT_QUEUE_MUTEX);
-
                *eventp = NULL;
                switch_queue_push(EVENT_DISPATCH_QUEUE, event);
                event = NULL;