From 3700699020894fef7b34f9e8daf8a4c28cd2150b Mon Sep 17 00:00:00 2001 From: Alberto Leiva Popper Date: Wed, 7 Apr 2021 16:41:43 -0500 Subject: [PATCH] Thread pool: Patch previous commit I accidentally removed a lock operation in the previous commit, so lots of undefined behavior was being triggered. Also, restores (but improves) the thread ready signal. It's hard to explain: - Before: Workers send ready signal to parent, but parent might not be listening yet; Therefore parent timeouts on wait. - Previous: Workers do not send ready signal to parent. Therefore, parent might signal work when no workers are ready yet; Therefore nobody works. - Now: Workers send ready signal to parent, parent listens lazily (ie. late), but only if workers aren't ready yet. Therefore, correct behavior. --- src/thread/thread_pool.c | 51 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index bc706c71..7e9e785e 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -39,8 +39,9 @@ struct thread_pool { */ pthread_cond_t working_cond; /* - * Used by Working Threads to signal that all the work is done, - * for the benefit of the Parent Thread. + * Used by Working Threads to signal the Parent Thread that workers are + * ready (during initialization) or that all the work is done (after + * initialization). */ pthread_cond_t waiting_cond; /* Number of Working Threads. */ @@ -145,6 +146,11 @@ tasks_poll(void *arg) struct thread_pool *pool = arg; struct task *task; + thread_pool_lock(pool); + /* We're running; signal the Parent Thread. */ + pool->thread_count++; + pthread_cond_signal(&(pool->waiting_cond)); + while (true) { while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { /* Wait until the parent sends us work. */ @@ -236,7 +242,6 @@ spawn_threads(struct thread_pool *pool, unsigned int threads) goto end; } - pool->thread_count++; pr_op_debug("Pool thread #%u spawned", i); } @@ -327,6 +332,34 @@ thread_pool_destroy(struct thread_pool *pool) free(pool); } +/* + * pthread_create() does not guarantee that the threads will have started by the + * time work needs to be done. + * + * If they haven't started, this function will add a little timeout. + * + * Lock must be held. + */ +static int validate_thread_count(struct thread_pool *pool) +{ + struct timespec tmout; + int error; + + if (pool->thread_count != 0) + return 0; + + /* 2 seconds to start a thread */ + clock_gettime(CLOCK_REALTIME, &tmout); + tmout.tv_sec += 2; + error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock), + &tmout); + + if (pool->thread_count != 0) + return 0; + + return pr_op_errno(error, "Waiting thread to start"); +} + /* * Push a new task to @pool, the task to be executed is @cb with the argument * @arg. @@ -343,12 +376,20 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) return error; thread_pool_lock(pool); + + error = validate_thread_count(pool); + if (error) { + task_destroy(task); + goto end; + } + task_queue_push(&(pool->queue), task); /* There's work to do! */ pthread_cond_signal(&(pool->working_cond)); - thread_pool_unlock(pool); - return 0; +end: + thread_pool_unlock(pool); + return error; } /* There are available threads to work? */ -- 2.47.3