From 64a9bd1b791f5a73f13b0112896d33cec9eec55c Mon Sep 17 00:00:00 2001 From: Alberto Leiva Popper Date: Tue, 27 Apr 2021 20:47:03 -0500 Subject: [PATCH] Thread pool: Code review Gets rid of some inconsistencies, but no bugs as far as I can tell. --- src/thread/thread_pool.c | 150 ++++++++++++++++++++++----------------- src/validation_run.c | 4 +- 2 files changed, 88 insertions(+), 66 deletions(-) diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index 59f26d2d..c2ecc6e5 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -47,13 +47,13 @@ struct thread_pool { * Used by the Parent Thread to wake up Worker Threads when there's * work. */ - pthread_cond_t working_cond; + pthread_cond_t parent2worker; /* * Used by Working Threads to signal the Parent Thread that workers are * ready (during initialization) or that all the work is done (after * initialization). */ - pthread_cond_t waiting_cond; + pthread_cond_t worker2parent; /* Number of Working Threads. */ unsigned int working_count; /* Number of Worker Threads. */ @@ -71,25 +71,51 @@ struct thread_pool { }; static void -thread_pool_lock(struct thread_pool *pool) +panic_on_fail(int error, char const *function_name) { - int error; - - error = pthread_mutex_lock(&(pool->lock)); if (error) - pr_crit("pthread_mutex_lock() returned error code %d. This is too critical for a graceful recovery; I must die now.", - error); + pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.", + function_name, error); } static void -thread_pool_unlock(struct thread_pool *pool) +mutex_lock(struct thread_pool *pool) { - int error; + panic_on_fail(pthread_mutex_lock(&pool->lock), "pthread_mutex_lock"); +} - error = pthread_mutex_unlock(&(pool->lock)); - if (error) - pr_crit("pthread_mutex_unlock() returned error code %d. This is too critical for a graceful recovery; I must die now.", - error); +static void +mutex_unlock(struct thread_pool *pool) +{ + panic_on_fail(pthread_mutex_unlock(&pool->lock), "pthread_mutex_unlock"); +} + +/* Wait until the parent sends us work. */ +static void +wait_for_parent_signal(struct thread_pool *pool, unsigned int thread_id) +{ + pr_op_debug("Thread %s.%u: Waiting for work...", pool->name, thread_id); + panic_on_fail(pthread_cond_wait(&pool->parent2worker, &pool->lock), + "pthread_cond_wait"); +} + +static void +signal_to_parent(struct thread_pool *pool) +{ + panic_on_fail(pthread_cond_signal(&pool->worker2parent)); +} + +static void +wait_for_worker_signal(struct thread_pool *pool) +{ + panic_on_fail(pthread_cond_wait(&pool->worker2parent, &pool->lock), + "pthread_cond_wait"); +} + +static void +signal_to_worker(struct thread_pool *pool) +{ + panic_on_fail(pthread_cond_signal(&pool->parent2worker)); } static int @@ -166,20 +192,17 @@ tasks_poll(void *arg) struct thread_pool_task *task; unsigned int thread_id; - thread_pool_lock(pool); + mutex_lock(pool); thread_id = pool->thread_count; /* We're running; signal the Parent Thread. */ pool->thread_count++; - pthread_cond_signal(&(pool->waiting_cond)); + if (pool->thread_count == 1) + signal_to_parent(pool); while (true) { - while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { - /* Wait until the parent sends us work. */ - pr_op_debug("Thread %s.%u: Waiting for work...", - pool->name, thread_id); - pthread_cond_wait(&(pool->working_cond), &(pool->lock)); - } + while (TAILQ_EMPTY(&pool->queue) && !pool->stop) + wait_for_parent_signal(pool, thread_id); if (pool->stop) break; @@ -187,29 +210,30 @@ tasks_poll(void *arg) /* Claim the work. */ task = task_queue_pull(pool, thread_id); pool->working_count++; - thread_pool_unlock(pool); + mutex_unlock(pool); if (task != NULL) { task->cb(task->arg); pr_op_debug("Thread %s.%u: Task '%s' ended", pool->name, thread_id, task->name); - /* Now releasing the task */ task_destroy(task); } - thread_pool_lock(pool); + mutex_lock(pool); pool->working_count--; - /* If there's no more work left, signal the parent. */ - if (!pool->stop && pool->working_count == 0 && - TAILQ_EMPTY(&(pool->queue))) - pthread_cond_signal(&(pool->waiting_cond)); + if (pool->stop) + break; + /* If there's no more work left, wake up parent. */ + if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue)) + signal_to_parent(pool); } /* The thread will cease to exist */ pool->thread_count--; - pthread_cond_signal(&(pool->waiting_cond)); - thread_pool_unlock(pool); + if (pool->thread_count == 0) + signal_to_parent(pool); + mutex_unlock(pool); pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id); return NULL; @@ -282,14 +306,14 @@ thread_pool_create(char const *name, unsigned int threads, return pr_enomem(); /* Init locking */ - error = pthread_mutex_init(&(result->lock), NULL); + error = pthread_mutex_init(&result->lock, NULL); if (error) { error = pr_op_errno(error, "Calling pthread_mutex_init()"); goto free_tmp; } /* Init conditional to signal pending work */ - error = pthread_cond_init(&(result->working_cond), NULL); + error = pthread_cond_init(&result->parent2worker, NULL); if (error) { error = pr_op_errno(error, "Calling pthread_cond_init() at working condition"); @@ -297,14 +321,14 @@ thread_pool_create(char const *name, unsigned int threads, } /* Init conditional to signal no pending work */ - error = pthread_cond_init(&(result->waiting_cond), NULL); + error = pthread_cond_init(&result->worker2parent, NULL); if (error) { error = pr_op_errno(error, "Calling pthread_cond_init() at waiting condition"); goto free_working_cond; } - TAILQ_INIT(&(result->queue)); + TAILQ_INIT(&result->queue); result->name = name; result->stop = false; result->working_count = 0; @@ -317,11 +341,11 @@ thread_pool_create(char const *name, unsigned int threads, *pool = result; return 0; free_waiting_cond: - pthread_cond_destroy(&(result->waiting_cond)); + pthread_cond_destroy(&result->worker2parent); free_working_cond: - pthread_cond_destroy(&(result->working_cond)); + pthread_cond_destroy(&result->parent2worker); free_mutex: - pthread_mutex_destroy(&(result->lock)); + pthread_mutex_destroy(&result->lock); free_tmp: free(result); return error; @@ -334,7 +358,7 @@ thread_pool_destroy(struct thread_pool *pool) struct thread_pool_task *tmp; /* Remove all pending work and send the signal to stop it */ - thread_pool_lock(pool); + mutex_lock(pool); queue = &(pool->queue); while (!TAILQ_EMPTY(queue)) { tmp = TAILQ_FIRST(queue); @@ -342,15 +366,15 @@ thread_pool_destroy(struct thread_pool *pool) task_destroy(tmp); } pool->stop = true; - pthread_cond_broadcast(&(pool->working_cond)); - thread_pool_unlock(pool); + pthread_cond_broadcast(&pool->parent2worker); + mutex_unlock(pool); /* Wait for all to end */ thread_pool_wait(pool); - pthread_cond_destroy(&(pool->waiting_cond)); - pthread_cond_destroy(&(pool->working_cond)); - pthread_mutex_destroy(&(pool->lock)); + pthread_cond_destroy(&pool->worker2parent); + pthread_cond_destroy(&pool->parent2worker); + pthread_mutex_destroy(&pool->lock); free(pool); } @@ -370,10 +394,10 @@ static int validate_thread_count(struct thread_pool *pool) if (pool->thread_count != 0) return 0; - /* 2 seconds to start a thread */ + /* Give the threads 2 more seconds */ clock_gettime(CLOCK_REALTIME, &tmout); tmout.tv_sec += 2; - error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock), + error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock, &tmout); if (pool->thread_count != 0) @@ -393,26 +417,23 @@ thread_pool_push(struct thread_pool *pool, char const *task_name, struct thread_pool_task *task; int error; - task = NULL; error = task_create(task_name, cb, arg, &task); if (error) return error; - thread_pool_lock(pool); + mutex_lock(pool); error = validate_thread_count(pool); if (error) { + mutex_unlock(pool); task_destroy(task); - goto end; + return error; } task_queue_push(pool, task); - /* There's work to do! */ - pthread_cond_signal(&(pool->working_cond)); - -end: - thread_pool_unlock(pool); - return error; + mutex_unlock(pool); + signal_to_worker(pool); + return 0; } /* There are available threads to work? */ @@ -421,9 +442,9 @@ thread_pool_avail_threads(struct thread_pool *pool) { bool result; - thread_pool_lock(pool); + mutex_lock(pool); result = (pool->working_count < pool->thread_count); - thread_pool_unlock(pool); + mutex_unlock(pool); return result; } @@ -432,13 +453,14 @@ thread_pool_avail_threads(struct thread_pool *pool) void thread_pool_wait(struct thread_pool *pool) { - thread_pool_lock(pool); + mutex_lock(pool); + while (true) { pr_op_debug("- Stop: %s", pool->stop ? "true" : "false"); pr_op_debug("- Working count: %u", pool->working_count); pr_op_debug("- Thread count: %u", pool->thread_count); pr_op_debug("- Empty task queue: %s", - TAILQ_EMPTY(&(pool->queue)) ? "true" : "false"); + TAILQ_EMPTY(&pool->queue) ? "true" : "false"); if (pool->stop) { /* Wait until all Working Threads are dead. */ @@ -447,15 +469,15 @@ thread_pool_wait(struct thread_pool *pool) } else { /* Wait until all Working Threads finish. */ if (pool->working_count == 0 && - TAILQ_EMPTY(&(pool->queue))) + TAILQ_EMPTY(&pool->queue)) break; } pr_op_debug("Pool '%s': Waiting for tasks to be completed", pool->name); - pthread_cond_wait(&(pool->waiting_cond), &(pool->lock)); + wait_for_worker_signal(pool); } - thread_pool_unlock(pool); - pr_op_debug("Pool '%s': Waiting has ended, all tasks done", - pool->name); + + mutex_unlock(pool); + pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name); } diff --git a/src/validation_run.c b/src/validation_run.c index 50dce3a2..8e02612b 100644 --- a/src/validation_run.c +++ b/src/validation_run.c @@ -17,7 +17,7 @@ validation_run_first(void) int error; if (config_get_mode() == SERVER) - pr_op_info("First validation cycle has begun, wait until the next notification to connect your router(s)"); + pr_op_warn("First validation cycle has begun, wait until the next notification to connect your router(s)"); else pr_op_info("First validation cycle has begun"); @@ -27,7 +27,7 @@ validation_run_first(void) return pr_op_err("First validation wasn't successful."); if (config_get_mode() == SERVER) - pr_op_info("First validation cycle successfully ended, now you can connect your router(s)"); + pr_op_warn("First validation cycle successfully ended, now you can connect your router(s)"); else pr_op_info("First validation cycle successfully ended, terminating execution"); -- 2.47.3