{
switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
switch_memory_pool_t *pool = node->pool;
- void *pop;
- int check = 0;
- switch_mutex_lock(session_manager.mutex);
- session_manager.starting--;
- session_manager.running++;
- switch_mutex_unlock(session_manager.mutex);
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) thread);
#endif
- while(session_manager.ready) {
- switch_status_t check_status;
-
- pop = NULL;
-
- if (check) {
- check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
- } else {
- switch_mutex_lock(session_manager.mutex);
- session_manager.popping++;
- switch_mutex_unlock(session_manager.mutex);
-
- check_status = switch_queue_pop(session_manager.thread_queue, &pop);
-
- switch_mutex_lock(session_manager.mutex);
- session_manager.popping--;
- switch_mutex_unlock(session_manager.mutex);
- }
-
- if (check_status == SWITCH_STATUS_SUCCESS && pop) {
+ while (1) {
+ void *pop;
+ switch_status_t check_status = switch_queue_pop_timeout(session_manager.thread_queue, &pop, apr_time_from_sec(5));
+ if (check_status == SWITCH_STATUS_SUCCESS) {
switch_thread_data_t *td = (switch_thread_data_t *) pop;
if (!td) break;
- switch_mutex_lock(session_manager.mutex);
- session_manager.busy++;
- switch_mutex_unlock(session_manager.mutex);
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
#endif
-
td->func(thread, td->obj);
if (td->pool) {
switch_mutex_lock(session_manager.mutex);
session_manager.busy--;
switch_mutex_unlock(session_manager.mutex);
-
} else {
- if (check) {
+ switch_mutex_lock(session_manager.mutex);
+ if (!switch_status_is_timeup(check_status) || session_manager.running > session_manager.busy) {
+ if (!--session_manager.running) {
+ switch_thread_cond_signal(session_manager.cond);
+ }
+ switch_mutex_unlock(session_manager.mutex);
break;
}
- check++;
+ switch_mutex_unlock(session_manager.mutex);
}
}
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long) thread);
#endif
- switch_mutex_lock(session_manager.mutex);
- session_manager.running--;
- switch_mutex_unlock(session_manager.mutex);
-
switch_core_destroy_memory_pool(&pool);
-
return NULL;
}
switch_mutex_unlock(session_manager.mutex);
}
-static int wake_queue(void)
-{
- switch_status_t status;
- int tries = 0;
-
- top:
-
- status = switch_mutex_trylock(session_manager.cond_mutex);
-
- if (status == SWITCH_STATUS_SUCCESS) {
- switch_thread_cond_signal(session_manager.cond);
- switch_mutex_unlock(session_manager.cond_mutex);
- return 1;
- } else {
- if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
- switch_mutex_unlock(session_manager.cond2_mutex);
- } else {
- if (++tries < 10) {
- switch_cond_next();
- goto top;
- }
- }
- }
-
- return 0;
-}
-
static switch_status_t check_queue(void)
{
switch_status_t status = SWITCH_STATUS_FALSE;
- int ttl = 0;
- int x = 0;
-
switch_mutex_lock(session_manager.mutex);
- ttl = switch_queue_size(session_manager.thread_queue);
- x = ((session_manager.running + session_manager.starting) - session_manager.busy);
+ if (session_manager.running >= ++session_manager.busy) {
+ switch_mutex_unlock(session_manager.mutex);
+ return SWITCH_STATUS_SUCCESS;
+ }
+ ++session_manager.running;
switch_mutex_unlock(session_manager.mutex);
-
- while (x < ttl) {
+ {
switch_thread_t *thread;
switch_threadattr_t *thd_attr;
switch_memory_pool_t *pool;
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_LOW);
if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
+ switch_mutex_lock(session_manager.mutex);
+ if (!--session_manager.running) {
+ switch_thread_cond_signal(session_manager.cond);
+ }
+ switch_mutex_unlock(session_manager.mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
switch_core_destroy_memory_pool(&pool);
status = SWITCH_STATUS_GENERR;
} else {
status = SWITCH_STATUS_SUCCESS;
}
-
- switch_mutex_lock(session_manager.mutex);
- session_manager.starting++;
- switch_mutex_unlock(session_manager.mutex);
- x++;
}
-
return status;
}
-static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
-{
-
- uint32_t sleep = 10000000;
- switch_time_t next = switch_micro_time_now() + sleep;
-
- switch_mutex_lock(session_manager.cond_mutex);
-
- while(session_manager.ready) {
- int check = 1;
- int ttl = 0;
- uint32_t xsleep = sleep;
-
- switch_mutex_lock(session_manager.mutex);
- ttl = switch_queue_size(session_manager.thread_queue);
- switch_mutex_unlock(session_manager.mutex);
-
-
- if (!ttl) {
- xsleep = 10000;
- }
-
- if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
- switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, xsleep);
- switch_mutex_unlock(session_manager.cond2_mutex);
- }
-
-
- if (switch_micro_time_now() >= next) {
- if (session_manager.popping) {
-#ifdef DEBUG_THREAD_POOL
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
- "Thread pool: running:%d busy:%d popping:%d\n", session_manager.running, session_manager.busy, session_manager.popping);
-#endif
- switch_queue_interrupt_all(session_manager.thread_queue);
-
- sleep = 100000;
- check = 0;
- } else {
- sleep = 10000000;
- }
- }
-
- if (check) check_queue();
-
- next = switch_micro_time_now() + sleep;
- }
-
- switch_mutex_unlock(session_manager.cond_mutex);
-
- while(session_manager.running) {
- switch_queue_interrupt_all(session_manager.thread_queue);
- switch_yield(20000);
- }
-
-
- return NULL;
-}
-
SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
td = *tdp;
*tdp = NULL;
- switch_queue_push(session_manager.thread_queue, td);
- wake_queue();
+ status = switch_queue_push(session_manager.thread_queue, td);
+ check_queue();
return status;
}
} else if (switch_test_flag(session, SSF_THREAD_STARTED)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n");
} else {
- status = SWITCH_STATUS_SUCCESS;
switch_set_flag(session, SSF_THREAD_RUNNING);
switch_set_flag(session, SSF_THREAD_STARTED);
td = switch_core_session_alloc(session, sizeof(*td));
td->obj = session;
td->func = switch_core_session_thread;
- switch_queue_push(session_manager.thread_queue, td);
- wake_queue();
+ status = switch_queue_push(session_manager.thread_queue, td);
+ check_queue();
}
switch_mutex_unlock(session->mutex);
session_manager.session_id = 1;
session_manager.memory_pool = pool;
switch_core_hash_init(&session_manager.session_table);
-
- if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
- switch_threadattr_t *thd_attr;
-
- switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
- switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
- switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
- switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
- switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
- switch_threadattr_create(&thd_attr, session_manager.memory_pool);
- switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- session_manager.ready = 1;
- switch_thread_create(&session_manager.manager_thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
- }
-
+ switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_DEFAULT, session_manager.memory_pool);
+ switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
+ switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
}
void switch_core_session_uninit(void)
{
- int sanity = 100;
- switch_status_t st = SWITCH_STATUS_FALSE;
-
- session_manager.ready = 0;
- wake_queue();
-
- while(session_manager.running && --sanity > 0) {
- switch_queue_interrupt_all(session_manager.thread_queue);
- switch_yield(100000);
- }
-
- switch_thread_join(&st, session_manager.manager_thread);
+ switch_queue_term(session_manager.thread_queue);
+ switch_mutex_lock(session_manager.mutex);
+ if (session_manager.running)
+ switch_thread_cond_timedwait(session_manager.cond, session_manager.mutex, apr_time_from_sec(10));
+ switch_mutex_unlock(session_manager.mutex);
switch_core_hash_destroy(&session_manager.session_table);
-
}
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
SWITCH_DECLARE(void) switch_core_session_debug_pool(switch_stream_handle_t *stream)
{
stream->write_function(stream, "Thread pool: running:%d busy:%d popping:%d\n",
- session_manager.running, session_manager.busy, session_manager.popping);
+ session_manager.running, session_manager.busy, session_manager.running - session_manager.busy);
}
/* For Emacs: