From: excurso Date: Thu, 27 Mar 2025 14:29:11 +0000 (+0000) Subject: Re: Coverity report Oct 31st, 2024 (Issue #40991) X-Git-Tag: tor-0.4.9.2-alpha~6^2~1^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ee9b3c127c180ab24a55d44268a848fdf54347f5;p=thirdparty%2Ftor.git Re: Coverity report Oct 31st, 2024 (Issue #40991) --- diff --git a/changes/ticket40991 b/changes/ticket40991 new file mode 100644 index 0000000000..c8826cff60 --- /dev/null +++ b/changes/ticket40991 @@ -0,0 +1,4 @@ + o Minor bugfixes (threads, memory): + - Rework start and exit of worker threads. + - Improvements in cleanup of resources used by threads. + Fixes bug 40991; bugfix on 0.4.8.13-dev. diff --git a/src/lib/evloop/workqueue.c b/src/lib/evloop/workqueue.c index 17ab44e3ab..59a6cf13cd 100644 --- a/src/lib/evloop/workqueue.c +++ b/src/lib/evloop/workqueue.c @@ -78,6 +78,8 @@ struct threadpool_t { /** Number of elements in threads. */ int n_threads; + /** Number of elements to be created in threads. */ + int n_threads_max; /** Mutex to protect all the above fields. */ tor_mutex_t lock; @@ -88,6 +90,11 @@ struct threadpool_t { void *(*new_thread_state_fn)(void*); void (*free_thread_state_fn)(void*); void *new_thread_state_arg; + + /** Used for signalling the worker threads to exit. */ + int exit; + /** Mutex for controlling worker threads' startup and exit. */ + tor_mutex_t control_lock; }; /** Used to put a workqueue_priority_t value into a bitfield. */ @@ -270,13 +277,34 @@ worker_thread_extract_next_work(workerthread_t *thread) static void worker_thread_main(void *thread_) { + static int n_worker_threads_running = 0; workerthread_t *thread = thread_; threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; workqueue_reply_t result; + tor_mutex_acquire(&pool->control_lock); + log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].", + n_worker_threads_running + 1, pool->n_threads_max, + tor_get_thread_id()); + + if (++n_worker_threads_running == pool->n_threads_max) + tor_cond_signal_one(&pool->condition); + + tor_mutex_release(&pool->control_lock); + + /* Wait until all worker threads have started. + * pool->lock must be prelocked here. */ tor_mutex_acquire(&pool->lock); + + log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].", + tor_get_thread_id()); + while (1) { + /* Exit thread when signaled to exit */ + if (pool->exit) + goto exit; + /* lock must be held at this point. */ while (worker_thread_has_work(thread)) { /* lock must be held at this point. */ @@ -290,11 +318,12 @@ worker_thread_main(void *thread_) workqueue_reply_t r = update_fn(thread->state, arg); - if (r != WQ_RPL_REPLY) { - return; - } - tor_mutex_acquire(&pool->lock); + + /* We may need to exit the thread. */ + if (r != WQ_RPL_REPLY) + goto exit; + continue; } work = worker_thread_extract_next_work(thread); @@ -309,11 +338,11 @@ worker_thread_main(void *thread_) /* Queue the reply for the main thread. */ queue_reply(thread->reply_queue, work); - /* We may need to exit the thread. */ - if (result != WQ_RPL_REPLY) { - return; - } tor_mutex_acquire(&pool->lock); + + /* We may need to exit the thread. */ + if (result != WQ_RPL_REPLY) + goto exit; } /* At this point the lock is held, and there is no work in this thread's * queue. */ @@ -325,6 +354,19 @@ worker_thread_main(void *thread_) log_warn(LD_GENERAL, "Fail tor_cond_wait."); } } + +exit: + /* At this point pool->lock must be held */ + + log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].", + pool->n_threads_max - n_worker_threads_running + 1, + pool->n_threads_max, tor_get_thread_id()); + + if (--n_worker_threads_running == 0) + /* Let the main thread know, the last worker thread has exited. */ + tor_mutex_release(&pool->control_lock); + + tor_mutex_release(&pool->lock); } /** Put a reply on the reply queue. The reply must not currently be on @@ -516,12 +558,17 @@ threadpool_start_threads(threadpool_t *pool, int n) if (n > MAX_THREADS) n = MAX_THREADS; + tor_mutex_acquire(&pool->control_lock); tor_mutex_acquire(&pool->lock); if (pool->n_threads < n) pool->threads = tor_reallocarray(pool->threads, sizeof(workerthread_t*), n); + int status = 0; + pool->n_threads_max = n; + log_debug(LD_GENERAL, "Starting worker threads..."); + while (pool->n_threads < n) { /* For half of our threads, we'll choose lower priorities permissively; * for the other half, we'll stick more strictly to higher priorities. @@ -536,16 +583,80 @@ threadpool_start_threads(threadpool_t *pool, int n) //LCOV_EXCL_START tor_assert_nonfatal_unreached(); pool->free_thread_state_fn(state); - tor_mutex_release(&pool->lock); - return -1; + status = -1; + goto check_status; //LCOV_EXCL_STOP } thr->index = pool->n_threads; pool->threads[pool->n_threads++] = thr; } + + struct timeval tv = {.tv_sec = 30, .tv_usec = 0}; + + /* Wait for the last launched thread to confirm us, it has started. + * Wait max 30 seconds */ + status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv); + +check_status: + switch (status) { + case 0: + log_debug(LD_GENERAL, "Starting worker threads finished."); + break; + case -1: + log_warn(LD_GENERAL, "Failed to confirm worker threads' start up."); + break; + case 1: + log_warn(LD_GENERAL, "Failed to confirm worker threads' " + "start up after timeout."); + FALLTHROUGH; + default: + status = -1; + } + + log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop."); + + /* If we had an error, let the worker threads (if any) exit directly. */ + if (status != 0) { + pool->exit = 1; + log_debug(LD_GENERAL, "Signaled the worker threads to exit..."); + } + + /* Let worker threads enter the work loop. */ tor_mutex_release(&pool->lock); - return 0; + /* pool->control_lock stays locked. This is required for the main thread + * to wait for the worker threads to exit on shutdown. */ + + return status; +} + +/** Stop all worker threads */ +static void +threadpool_stop_threads(threadpool_t *pool) +{ + tor_mutex_acquire(&pool->lock); + + if (pool->exit == 0) { + /* Signal the worker threads to exit */ + pool->exit = 1; + /* If worker threads are waiting for work, let them continue to exit */ + tor_cond_signal_all(&pool->condition); + + log_debug(LD_GENERAL, "Signaled worker threads to exit. " + "Waiting for them to exit..."); + } + + tor_mutex_release(&pool->lock); + + /* Wait until all worker threads have exited. + * pool->control_lock must be prelocked here. */ + tor_mutex_acquire(&pool->control_lock); + /* Unlock required, else main thread hangs on mutex uninit. */ + tor_mutex_release(&pool->control_lock); + + /* If this message appears in the log before all threads have confirmed + * their exit, then pool->control_lock wasn't prelocked for some reason. */ + log_debug(LD_GENERAL, "All worker threads have exited."); } /** @@ -566,6 +677,9 @@ threadpool_new(int n_threads, pool = tor_malloc_zero(sizeof(threadpool_t)); tor_mutex_init_nonrecursive(&pool->lock); tor_cond_init(&pool->condition); + tor_mutex_init_nonrecursive(&pool->control_lock); + pool->exit = 0; + unsigned i; for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { TOR_TAILQ_INIT(&pool->work[i]); @@ -579,8 +693,6 @@ threadpool_new(int n_threads, if (threadpool_start_threads(pool, n_threads) < 0) { //LCOV_EXCL_START tor_assert_nonfatal_unreached(); - tor_cond_uninit(&pool->condition); - tor_mutex_uninit(&pool->lock); threadpool_free(pool); return NULL; //LCOV_EXCL_STOP @@ -598,6 +710,14 @@ threadpool_free_(threadpool_t *pool) if (!pool) return; + threadpool_stop_threads(pool); + + log_debug(LD_GENERAL, "Beginning to clean up..."); + + tor_cond_uninit(&pool->condition); + tor_mutex_uninit(&pool->lock); + tor_mutex_uninit(&pool->control_lock); + if (pool->threads) { for (int i = 0; i != pool->n_threads; ++i) workerthread_free(pool->threads[i]); @@ -605,21 +725,35 @@ threadpool_free_(threadpool_t *pool) tor_free(pool->threads); } - if (pool->update_args) - pool->free_update_arg_fn(pool->update_args); + if (pool->update_args) { + if (!pool->free_update_arg_fn) + log_warn(LD_GENERAL, "Freeing pool->update_args not possible. " + "pool->free_update_arg_fn is not set."); + else + pool->free_update_arg_fn(pool->update_args); + } if (pool->reply_event) { - tor_event_del(pool->reply_event); - tor_event_free(pool->reply_event); + if (tor_event_del(pool->reply_event) == -1) + log_warn(LD_GENERAL, "libevent error: deleting reply event failed."); + else + tor_event_free(pool->reply_event); } if (pool->reply_queue) replyqueue_free(pool->reply_queue); - if (pool->new_thread_state_arg) - pool->free_thread_state_fn(pool->new_thread_state_arg); + if (pool->new_thread_state_arg) { + if (!pool->free_thread_state_fn) + log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. " + "pool->free_thread_state_fn is not set."); + else + pool->free_thread_state_fn(pool->new_thread_state_arg); + } tor_free(pool); + + log_debug(LD_GENERAL, "Cleanup finished."); } /** Return the reply queue associated with a given thread pool. */