From f74278a1c53704fabeb83fd19a22d21cd708579a Mon Sep 17 00:00:00 2001 From: pcarana Date: Wed, 27 Jan 2021 09:32:18 -0600 Subject: [PATCH] Fix rsync and thread pool bugs. +Mistakenly (of course, it was a bug) the returned value from rsync execution was being confused with the returned value from execvp call. The main problem was when rsync returned a code 12 (Error in rsync protocol data stream); in that case, the caller confused that error with ENOMEM (also with value 12), which led to terminate execution. +The thread pool wait function wasn't considering pending taks at the queue; also the poll function was holding and releasing the mutex more than it was needed, and the thread attributes are now globally initialized (thanks @ydahhrk for the code review). +Increment the number of threads at the internal pool to 10. --- src/internal_pool.c | 2 +- src/rsync/rsync.c | 7 ++-- src/thread/thread_pool.c | 91 +++++++++++++++++++++++++--------------- 3 files changed, 62 insertions(+), 38 deletions(-) diff --git a/src/internal_pool.c b/src/internal_pool.c index 5a0fcd4d..51399426 100644 --- a/src/internal_pool.c +++ b/src/internal_pool.c @@ -10,7 +10,7 @@ * related to the validation or server thread pool tasks) can be pushed here. */ -#define INTERNAL_POOL_MAX 5 +#define INTERNAL_POOL_MAX 10 struct thread_pool *pool; diff --git a/src/rsync/rsync.c b/src/rsync/rsync.c index 6332192d..ed7dadbd 100644 --- a/src/rsync/rsync.c +++ b/src/rsync/rsync.c @@ -258,7 +258,7 @@ handle_child_thread(char **args, int fds[2][2]) strerror(error)); /* https://stackoverflow.com/a/14493459/1735458 */ - exit(error); + exit(-error); } static int @@ -445,8 +445,9 @@ do_rsync(struct rpki_uri *uri, bool is_ta, bool log_operation) if (WIFEXITED(child_status)) { /* Happy path (but also sad path sometimes). */ error = WEXITSTATUS(child_status); - pr_val_debug("Child terminated with error code %d.", error); - if (error == ENOMEM) + pr_val_debug("Child terminated with error code %d.", + error); + if (error == -ENOMEM) pr_enomem(); if (!error) diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index 1612237b..7a5e1925 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -88,6 +88,7 @@ task_queue_pull(struct task_queue *queue) tmp = TAILQ_LAST(queue, task_queue); TAILQ_REMOVE(queue, tmp, next); + pr_op_debug("Pulling a task from the pool"); return tmp; } @@ -97,6 +98,7 @@ static void task_queue_push(struct task_queue *queue, struct task *task) { TAILQ_INSERT_HEAD(queue, task, next); + pr_op_debug("Pushing a task to the pool"); } /* @@ -115,13 +117,12 @@ tasks_poll(void *arg) /* The thread has started, send the signal */ thread_pool_lock(pool); pthread_cond_signal(&(pool->waiting_cond)); - thread_pool_unlock(pool); while (true) { - thread_pool_lock(pool); - - while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) + while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { + pr_op_debug("Thread waiting for work..."); pthread_cond_wait(&(pool->working_cond), &(pool->lock)); + } if (pool->stop) break; @@ -144,8 +145,6 @@ tasks_poll(void *arg) if (!pool->stop && pool->working_count == 0 && TAILQ_EMPTY(&(pool->queue))) pthread_cond_signal(&(pool->waiting_cond)); - - thread_pool_unlock(pool); } /* The thread will cease to exist */ @@ -173,52 +172,61 @@ thread_pool_thread_wait_start(struct thread_pool *pool) clock_gettime(CLOCK_REALTIME, &tmout); tmout.tv_sec += 2; + thread_pool_lock(pool); error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock), &tmout); - if (error) + if (error) { + thread_pool_unlock(pool); return pr_op_errno(error, "Waiting thread to start"); + } + thread_pool_unlock(pool); return 0; } static int -tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point) +thread_pool_attr_create(pthread_attr_t *attr) { - pthread_attr_t attr; - pthread_t thread_id; int error; - memset(&thread_id, 0, sizeof(pthread_t)); - - error = pthread_attr_init(&attr); + error = pthread_attr_init(attr); if (error) return pr_op_errno(error, "Calling pthread_attr_init()"); /* Use 2MB (default in most 64 bits systems) */ - error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2); - if (error) + error = pthread_attr_setstacksize(attr, 1024 * 1024 * 2); + if (error) { + pthread_attr_destroy(attr); return pr_op_errno(error, "Calling pthread_attr_setstacksize()"); + } - error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (error) + error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); + if (error) { + pthread_attr_destroy(attr); return pr_op_errno(error, "Calling pthread_attr_setdetachstate()"); + } - thread_pool_lock(pool); - error = pthread_create(&thread_id, &attr, entry_point, pool); - pthread_attr_destroy(&attr); - if (error) { - thread_pool_unlock(pool); + return 0; +} + +static int +tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr, + thread_pool_task_cb entry_point) +{ + pthread_t thread_id; + int error; + + memset(&thread_id, 0, sizeof(pthread_t)); + + error = pthread_create(&thread_id, attr, entry_point, pool); + if (error) return pr_op_errno(error, "Spawning pool thread"); - } error = thread_pool_thread_wait_start(pool); - if (error) { - thread_pool_unlock(pool); + if (error) return error; - } - thread_pool_unlock(pool); return 0; } @@ -227,6 +235,7 @@ int thread_pool_create(unsigned int threads, struct thread_pool **pool) { struct thread_pool *tmp; + pthread_attr_t attr; unsigned int i; int error; @@ -260,19 +269,28 @@ thread_pool_create(unsigned int threads, struct thread_pool **pool) TAILQ_INIT(&(tmp->queue)); tmp->stop = false; tmp->working_count = 0; - tmp->thread_count = threads; + tmp->thread_count = 0; + + error = thread_pool_attr_create(&attr); + if (error) + goto free_waiting_cond; for (i = 0; i < threads; i++) { - error = tpool_thread_spawn(tmp, tasks_poll); + error = tpool_thread_spawn(tmp, &attr, tasks_poll); if (error) { + pthread_attr_destroy(&attr); thread_pool_destroy(tmp); return error; } + tmp->thread_count++; pr_op_debug("Pool thread #%u spawned", i); } + pthread_attr_destroy(&attr); *pool = tmp; return 0; +free_waiting_cond: + pthread_cond_destroy(&(tmp->waiting_cond)); free_working_cond: pthread_cond_destroy(&(tmp->working_cond)); free_mutex: @@ -326,15 +344,14 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) thread_pool_lock(pool); task_queue_push(&(pool->queue), task); - thread_pool_unlock(pool); - /* There's work to do! */ - pthread_cond_broadcast(&(pool->working_cond)); + pthread_cond_signal(&(pool->working_cond)); + thread_pool_unlock(pool); return 0; } -/* Are there available threads to work? */ +/* There are available threads to work? */ bool thread_pool_avail_threads(struct thread_pool *pool) { @@ -354,7 +371,13 @@ thread_pool_wait(struct thread_pool *pool) thread_pool_lock(pool); while (true) { pr_op_debug("Waiting all tasks from the pool to end"); - if ((!pool->stop && pool->working_count != 0) || + pr_op_debug("- Stop: %s", pool->stop ? "true" : "false"); + pr_op_debug("- Working count: %u", pool->working_count); + pr_op_debug("- Thread count: %u", pool->thread_count); + pr_op_debug("- Empty queue: %s", + TAILQ_EMPTY(&(pool->queue)) ? "true" : "false"); + if ((!pool->stop && + (pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) || (pool->stop && pool->thread_count != 0)) pthread_cond_wait(&(pool->waiting_cond), &(pool->lock)); else -- 2.47.2