From: Alberto Leiva Popper Date: Mon, 7 Jun 2021 19:22:17 +0000 (-0500) Subject: Thread pool: Convert threads to joinable X-Git-Tag: v1.5.1~18 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=408191ef13fcedab81ce617d2acf438dbddc67f7;p=thirdparty%2FFORT-validator.git Thread pool: Convert threads to joinable Hypothesis: Something (which I haven't spotted yet) was causing the main thread to skip its wait before the pool threads finished their tasks. Maybe something to do with the ready signal again? So the main thread returned early, which means pool threads were silently suppressed by the OS. That explains the early terminations and nonexistent stack traces. If I keep finding crippling errors like this, I will definitely have to purge the thread pool. It's turned out to be a fucking bug colony at this point. I'm sick of it. Way I see it, the root of the problem was the thread pool's control code, which was too complicated for its own good. A surprisingly large part of why it was overcomplicated was because it reinvented thread joining. So I simplified the control code by removing the detach property. Now that the main thread joins the proper way, the validation code will not be interrupted anymore. This might well be the solution for #49. However, it bothers me that I still don't have a reasonable explanation as to why the main thread seemed to be skipping wait. Argh! --- diff --git a/src/http/http.c b/src/http/http.c index 45f293f6..01cdc8bd 100644 --- a/src/http/http.c +++ b/src/http/http.c @@ -346,7 +346,7 @@ __http_download_file(struct rpki_uri *uri, long *response_code, long ims_value, cond_met, log_operation, write_cb, out, is_ta); if (error != EREQFAILED) { TA_DEBUG_MSG("%d", error); - break; + break; /* Note: Usually happy path */ } TA_DEBUG_MSG("%d", error); diff --git a/src/main.c b/src/main.c index cefd4c1d..c85c7bb6 100644 --- a/src/main.c +++ b/src/main.c @@ -34,6 +34,7 @@ main(int argc, char **argv) { int error; + printf("Fort 1.5.0.1\n"); /* Initializations */ error = log_setup(); @@ -113,5 +114,6 @@ revert_config: free_rpki_config(); revert_log: log_teardown(); + PR_DEBUG_MSG("Main thread returning."); return error; } diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index e3c04380..9ee8eb8a 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -56,8 +56,14 @@ struct thread_pool { pthread_cond_t worker2parent; /* Number of Working Threads. */ unsigned int working_count; - /* Number of Worker Threads. */ + + /* + * Just a counter. Its use is very specific; you probably don't want + * to rely on this. + * See @thread_ids_len for what you probably actually want. + */ unsigned int thread_count; + /* * Enable to signal all threads to stop. * (But all ongoing tasks will be completed first.) @@ -68,6 +74,9 @@ struct thread_pool { * Worker Thread to claim them. */ struct task_queue queue; + + pthread_t *thread_ids; /* Array. */ + unsigned int thread_ids_len; }; static void @@ -195,12 +204,9 @@ tasks_poll(void *arg) unsigned int thread_id; mutex_lock(pool); - thread_id = pool->thread_count; - /* We're running; signal the Parent Thread. */ pool->thread_count++; - if (pool->thread_count == 1) - signal_to_parent(pool); + thread_id = pool->thread_count; while (true) { while (TAILQ_EMPTY(&pool->queue) && !pool->stop) @@ -231,13 +237,8 @@ tasks_poll(void *arg) signal_to_parent(pool); } - /* The thread will cease to exist */ - pool->thread_count--; - if (pool->thread_count == 0) - signal_to_parent(pool); mutex_unlock(pool); - - pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id); + pr_op_debug("Thread %s.%u: Returning.", pool->name, thread_id); return NULL; } @@ -258,21 +259,29 @@ thread_pool_attr_create(pthread_attr_t *attr) "Calling pthread_attr_setstacksize()"); } - error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); - if (error) { - pthread_attr_destroy(attr); - return pr_op_errno(error, - "Calling pthread_attr_setdetachstate()"); - } + /* + * In the original implementation, they set the threads as detached + * here. + * + * This lead to giant load of miserable trouble, because unless I'm + * missing something, they seemed to assume there was something ensuring + * the threads had spawned by the time the parent posted work, and also, + * something ensuring that the threads had died by the time the main + * thread started cleaning up modules. Neither of these were true at + * all. + * + * Between complicating the code with even more control logic, and + * employing joinable threads, I chose the latter. I don't think I will + * ever use detached threads ever again. + */ return 0; } static int -spawn_threads(struct thread_pool *pool, unsigned int threads) +spawn_threads(struct thread_pool *pool) { pthread_attr_t attr; - pthread_t thread_id; unsigned int i; int error; @@ -280,15 +289,15 @@ spawn_threads(struct thread_pool *pool, unsigned int threads) if (error) return error; - for (i = 0; i < threads; i++) { - memset(&thread_id, 0, sizeof(pthread_t)); - error = pthread_create(&thread_id, &attr, tasks_poll, pool); + for (i = 0; i < pool->thread_ids_len; i++) { + error = pthread_create(&pool->thread_ids[i], &attr, tasks_poll, + pool); if (error) { error = pr_op_errno(error, "Spawning pool thread"); goto end; } - pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i); + pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i + 1); } end: @@ -335,13 +344,22 @@ thread_pool_create(char const *name, unsigned int threads, result->stop = false; result->working_count = 0; result->thread_count = 0; + result->thread_ids = calloc(threads, sizeof(pthread_t)); + if (result->thread_ids == NULL) { + error = pr_enomem(); + goto free_waiting_cond; + } + result->thread_ids_len = threads; - error = spawn_threads(result, threads); + error = spawn_threads(result); if (error) - goto free_waiting_cond; + goto free_thread_ids; *pool = result; return 0; + +free_thread_ids: + free(result->thread_ids); free_waiting_cond: pthread_cond_destroy(&result->worker2parent); free_working_cond: @@ -358,6 +376,9 @@ thread_pool_destroy(struct thread_pool *pool) { struct task_queue *queue; struct thread_pool_task *tmp; + unsigned int t; + + pr_op_debug("Destroying thread pool '%s'.", pool->name); /* Remove all pending work and send the signal to stop it */ mutex_lock(pool); @@ -371,41 +392,16 @@ thread_pool_destroy(struct thread_pool *pool) pthread_cond_broadcast(&pool->parent2worker); mutex_unlock(pool); - /* Wait for all to end */ - thread_pool_wait(pool); + for (t = 0; t < pool->thread_ids_len; t++) + pthread_join(pool->thread_ids[t], NULL); + free(pool->thread_ids); pthread_cond_destroy(&pool->worker2parent); pthread_cond_destroy(&pool->parent2worker); pthread_mutex_destroy(&pool->lock); free(pool); -} - -/* - * pthread_create() does not guarantee that the threads will have started by the - * time work needs to be done. - * - * If they haven't started, this function will add a little timeout. - * - * Lock must be held. - */ -static int validate_thread_count(struct thread_pool *pool) -{ - struct timespec tmout; - int error; - if (pool->thread_count != 0) - return 0; - - /* Give the threads 2 more seconds */ - clock_gettime(CLOCK_REALTIME, &tmout); - tmout.tv_sec += 2; - error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock, - &tmout); - - if (pool->thread_count != 0) - return 0; - - return pr_op_errno(error, "Waiting thread to start"); + pr_op_debug("Destroyed."); } /* @@ -424,16 +420,13 @@ thread_pool_push(struct thread_pool *pool, char const *task_name, return error; mutex_lock(pool); - - error = validate_thread_count(pool); - if (error) { - mutex_unlock(pool); - task_destroy(task); - return error; - } - task_queue_push(pool, task); mutex_unlock(pool); + + /* + * Note: This assumes the threads have already spawned. + * If not, they will claim work once they spawn anyway. + */ signal_to_worker(pool); return 0; } @@ -445,7 +438,7 @@ thread_pool_avail_threads(struct thread_pool *pool) bool result; mutex_lock(pool); - result = (pool->working_count < pool->thread_count); + result = (pool->working_count < pool->thread_ids_len); mutex_unlock(pool); return result; @@ -457,22 +450,16 @@ thread_pool_wait(struct thread_pool *pool) { mutex_lock(pool); - while (true) { - pr_op_debug("- Stop: %s", pool->stop ? "true" : "false"); - pr_op_debug("- Working count: %u", pool->working_count); - pr_op_debug("- Thread count: %u", pool->thread_count); - pr_op_debug("- Empty task queue: %s", - TAILQ_EMPTY(&pool->queue) ? "true" : "false"); - - if (pool->stop) { - /* Wait until all Working Threads are dead. */ - if (pool->thread_count == 0) - break; - } else { - /* Wait until all Working Threads finish. */ - if (pool->working_count == 0 && - TAILQ_EMPTY(&pool->queue)) - break; + /* If the pool has to stop, the wait will happen during the joins. */ + while (!pool->stop) { + pr_op_debug("- Active workers: %u", pool->working_count); + pr_op_debug("- Task queue: %s", + TAILQ_EMPTY(&pool->queue) ? "Empty" : "Not Empty"); + + if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue)) { + pr_op_debug("Pool '%s': All work has been completed.", + pool->name); + break; } pr_op_debug("Pool '%s': Waiting for tasks to be completed", @@ -481,5 +468,4 @@ thread_pool_wait(struct thread_pool *pool) } mutex_unlock(pool); - pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name); }