return NULL;
}
+typedef struct switch_thread_pool_node_s {
+ switch_memory_pool_t *pool;
+} switch_thread_pool_node_t;
+
+static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_thread_t *thread, void *obj)
+{
+ switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
+ switch_memory_pool_t *pool = node->pool;
+ void *pop;
+ int check = 0;
+
+ switch_mutex_lock(session_manager.mutex);
+ session_manager.running++;
+ switch_mutex_unlock(session_manager.mutex);
+
+ while(session_manager.ready) {
+ switch_status_t check_status;
+
+ if (check) {
+ check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
+ } else {
+ check_status = switch_queue_pop(session_manager.thread_queue, &pop);
+ }
+
+ if (check_status == SWITCH_STATUS_SUCCESS) {
+ switch_core_session_t *session = (switch_core_session_t *) pop;
+
+ if (!session) break;
+
+
+ switch_mutex_lock(session_manager.mutex);
+ session_manager.busy++;
+ switch_mutex_unlock(session_manager.mutex);
+
+ switch_core_session_thread(thread, (void *) session);
+
+ switch_mutex_lock(session_manager.mutex);
+ session_manager.busy--;
+ switch_mutex_unlock(session_manager.mutex);
+
+ } else {
+ if (check) {
+ break;
+ }
+ check++;
+ }
+ }
+
+ switch_mutex_lock(session_manager.mutex);
+ session_manager.running--;
+ switch_mutex_unlock(session_manager.mutex);
+
+ switch_core_destroy_memory_pool(&pool);
+
+ return NULL;
+}
+
+
+static switch_status_t check_queue(void)
+{
+ switch_status_t status = SWITCH_STATUS_FALSE;
+ int ttl = 0;
+ int x = 0;
+
+ switch_mutex_lock(session_manager.mutex);
+ ttl = switch_queue_size(session_manager.thread_queue);
+ x = (session_manager.running - session_manager.busy);
+ switch_mutex_unlock(session_manager.mutex);
+
+
+
+ while (x < ttl) {
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr;
+ switch_memory_pool_t *pool;
+ switch_thread_pool_node_t *node;
+
+ switch_core_new_memory_pool(&pool);
+ node = switch_core_alloc(pool, sizeof(*node));
+ node->pool = pool;
+
+ switch_threadattr_create(&thd_attr, node->pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+
+ if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
+ switch_core_destroy_memory_pool(&pool);
+ status = SWITCH_STATUS_GENERR;
+ } else {
+ status = SWITCH_STATUS_SUCCESS;
+ }
+ x++;
+ }
+
+ return status;
+}
+
+
+static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
+{
+ int x = 0;
+
+ while(session_manager.ready) {
+ switch_yield(100000);
+
+ if (++x == 300) {
+ switch_queue_interrupt_all(session_manager.thread_queue);
+ x = 0;
+ }
+
+ check_queue();
+ }
+
+ return NULL;
+}
+
+
+SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
+{
+ if (session_manager.ready == 1) {
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr;
+
+ switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
+ switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
+
+ switch_threadattr_create(&thd_attr, session_manager.memory_pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
+ session_manager.ready++;
+ }
+
+ switch_queue_push(session_manager.thread_queue, session);
+ check_queue();
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_session_t *session)
{
switch_status_t status = SWITCH_STATUS_FALSE;
switch_thread_t *thread;
- switch_threadattr_t *thd_attr;;
+ switch_threadattr_t *thd_attr;
+ if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
+ return switch_core_session_thread_pool_launch(session);
+ }
+
switch_threadattr_create(&thd_attr, session->pool);
switch_threadattr_detach_set(thd_attr, 1);
void switch_core_session_init(switch_memory_pool_t *pool)
{
+
+
memset(&session_manager, 0, sizeof(session_manager));
session_manager.session_limit = 1000;
session_manager.session_id = 1;
session_manager.memory_pool = pool;
switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool);
+ session_manager.ready = 1;
+
+
}
void switch_core_session_uninit(void)
{
switch_core_hash_destroy(&session_manager.session_table);
+ session_manager.ready = 0;
+ switch_queue_interrupt_all(session_manager.thread_queue);
+
}
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)