From fbb6c3fa091040b8ddba384d61e345e594f09d89 Mon Sep 17 00:00:00 2001 From: Alberto Leiva Popper Date: Thu, 22 Apr 2021 15:32:56 -0500 Subject: [PATCH] Thread pool: Print more debug information Might help us catch #49. --- src/delete_dir_daemon.c | 2 +- src/internal_pool.c | 6 +- src/internal_pool.h | 2 +- src/object/tal.c | 3 +- src/rtr/db/vrps.c | 4 +- src/rtr/rtr.c | 10 +-- src/thread/thread_pool.c | 141 +++++++++++++++++++++++---------------- src/thread/thread_pool.h | 5 +- src/validation_run.c | 10 +-- 9 files changed, 107 insertions(+), 76 deletions(-) diff --git a/src/delete_dir_daemon.c b/src/delete_dir_daemon.c index 009f377e..2c3ef20f 100644 --- a/src/delete_dir_daemon.c +++ b/src/delete_dir_daemon.c @@ -290,7 +290,7 @@ delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace) } /* Thread arg is released at thread */ - error = internal_pool_push(remove_from_root, (void *) arg); + error = internal_pool_push("Directory deleter", remove_from_root, arg); if (error) { rem_dirs_destroy(arg); return error; diff --git a/src/internal_pool.c b/src/internal_pool.c index 51399426..22c5338a 100644 --- a/src/internal_pool.c +++ b/src/internal_pool.c @@ -20,7 +20,7 @@ internal_pool_init(void) int error; pool = NULL; - error = thread_pool_create(INTERNAL_POOL_MAX, &pool); + error = thread_pool_create("Internal", INTERNAL_POOL_MAX, &pool); if (error) return error; @@ -28,9 +28,9 @@ internal_pool_init(void) } int -internal_pool_push(thread_pool_task_cb cb, void *arg) +internal_pool_push(char const *task_name, thread_pool_task_cb cb, void *arg) { - return thread_pool_push(pool, cb, arg); + return thread_pool_push(pool, task_name, cb, arg); } void diff --git a/src/internal_pool.h b/src/internal_pool.h index 77fc1fcb..01aed8c2 100644 --- a/src/internal_pool.h +++ b/src/internal_pool.h @@ -4,7 +4,7 @@ #include "thread/thread_pool.h" int internal_pool_init(void); -int internal_pool_push(thread_pool_task_cb, void *); +int internal_pool_push(char const *, thread_pool_task_cb, void *); void internal_pool_cleanup(void); #endif /* SRC_INTERNAL_POOL_H_ */ diff --git a/src/object/tal.c b/src/object/tal.c index 49487e44..f31aa9f7 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -749,7 +749,8 @@ __do_file_validation(char const *tal_file, void *arg) thread->retry_local = true; thread->sync_files = true; - error = thread_pool_push(t_param->pool, do_file_validation, thread); + error = thread_pool_push(t_param->pool, thread->tal_file, + do_file_validation, thread); if (error) { pr_op_err("Couldn't push a thread to do files validation"); goto free_tal_file; diff --git a/src/rtr/db/vrps.c b/src/rtr/db/vrps.c index ceaa46c3..9e352d46 100644 --- a/src/rtr/db/vrps.c +++ b/src/rtr/db/vrps.c @@ -87,8 +87,8 @@ vrps_init(void) int error; pool = NULL; - error = thread_pool_create(config_get_thread_pool_validation_max(), - &pool); + error = thread_pool_create("Validation", + config_get_thread_pool_validation_max(), &pool); if (error) return error; diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index 24d8fda5..96bb2e85 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -581,8 +581,8 @@ handle_client_connections(void *arg) param->fd = client_fd; param->addr = client_addr; - error = thread_pool_push(pool, client_thread_cb, - param); + error = thread_pool_push(pool, "Client thread", + client_thread_cb, param); if (error) { pr_op_err("Couldn't push a thread to attend incoming RTR client"); /* Error with min RTR version */ @@ -625,7 +625,8 @@ __handle_client_connections(struct server_fds *fds, struct thread_pool *pool) param->pool = pool; /* handle_client_connections() must release param */ - error = internal_pool_push(handle_client_connections, param); + error = internal_pool_push("Server thread", handle_client_connections, + param); if (error) { free(param); return error; @@ -672,7 +673,8 @@ rtr_listen(void) goto revert_server_fds; pool = NULL; - error = thread_pool_create(config_get_thread_pool_server_max(), &pool); + error = thread_pool_create("Server", + config_get_thread_pool_server_max(), &pool); if (error) goto revert_server_fds; diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index 7e9e785e..59f26d2d 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -22,16 +22,26 @@ */ /* Task to be done by each Worker Thread. */ -struct task { +struct thread_pool_task { + /* + * Debugging purposes only. Uniqueness is not a requirement. + * Will not be released by task_destroy(). + */ + char const *name; thread_pool_task_cb cb; void *arg; - TAILQ_ENTRY(task) next; + TAILQ_ENTRY(thread_pool_task) next; }; /* A collection of Tasks, used as FIFO. */ -TAILQ_HEAD(task_queue, task); +TAILQ_HEAD(task_queue, thread_pool_task); struct thread_pool { + /* + * Debugging purposes only. Uniqueness is not a requirement. + * Will not be released by thread_pool_destroy(). + */ + char const *name; pthread_mutex_t lock; /* * Used by the Parent Thread to wake up Worker Threads when there's @@ -83,50 +93,59 @@ thread_pool_unlock(struct thread_pool *pool) } static int -task_create(thread_pool_task_cb cb, void *arg, struct task **result) +task_create(char const *name, thread_pool_task_cb cb, void *arg, + struct thread_pool_task **out) { - struct task *tmp; + struct thread_pool_task *task; - tmp = malloc(sizeof(struct task)); - if (tmp == NULL) + task = malloc(sizeof(struct thread_pool_task)); + if (task == NULL) return pr_enomem(); - tmp->cb = cb; - tmp->arg = arg; + task->name = name; + task->cb = cb; + task->arg = arg; - *result = tmp; + *out = task; return 0; } static void -task_destroy(struct task *task) +task_destroy(struct thread_pool_task *task) { free(task); } /** - * Pops the tail of @queue. + * Pops the tail of @queue. pthread_cond_signal() is technically allowed to wake + * more than one thread, so please keep in mind that the result might be NULL. * * Freeing the task is the caller's responsibility. */ -static struct task * -task_queue_pull(struct task_queue *queue) +static struct thread_pool_task * +task_queue_pull(struct thread_pool *pool, unsigned int thread_id) { - struct task *tmp; - - tmp = TAILQ_LAST(queue, task_queue); - TAILQ_REMOVE(queue, tmp, next); - pr_op_debug("Pulling a task from the pool"); + struct thread_pool_task *task; + + task = TAILQ_LAST(&pool->queue, task_queue); + if (task != NULL) { + TAILQ_REMOVE(&pool->queue, task, next); + pr_op_debug("Thread %s.%u: Claimed task '%s'", pool->name, + thread_id, task->name); + } else { + pr_op_debug("Thread %s.%u: Claimed nothing", pool->name, + thread_id); + } - return tmp; + return task; } /* Insert the task at the HEAD */ static void -task_queue_push(struct task_queue *queue, struct task *task) +task_queue_push(struct thread_pool *pool, struct thread_pool_task *task) { - TAILQ_INSERT_HEAD(queue, task, next); - pr_op_debug("Pushing a task to the pool"); + TAILQ_INSERT_HEAD(&pool->queue, task, next); + pr_op_debug("Pool '%s': Pushed task '%s'", pool->name, task->name); } /* @@ -144,9 +163,12 @@ static void * tasks_poll(void *arg) { struct thread_pool *pool = arg; - struct task *task; + struct thread_pool_task *task; + unsigned int thread_id; thread_pool_lock(pool); + thread_id = pool->thread_count; + /* We're running; signal the Parent Thread. */ pool->thread_count++; pthread_cond_signal(&(pool->waiting_cond)); @@ -154,7 +176,8 @@ tasks_poll(void *arg) while (true) { while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { /* Wait until the parent sends us work. */ - pr_op_debug("Thread waiting for work..."); + pr_op_debug("Thread %s.%u: Waiting for work...", + pool->name, thread_id); pthread_cond_wait(&(pool->working_cond), &(pool->lock)); } @@ -162,20 +185,16 @@ tasks_poll(void *arg) break; /* Claim the work. */ - task = task_queue_pull(&(pool->queue)); + task = task_queue_pull(pool, thread_id); pool->working_count++; - pr_op_debug("Working on task #%u", pool->working_count); thread_pool_unlock(pool); - /* - * The null check exists because pthread_cond_signal() is - * technically allowed to wake up more than one thread. - */ if (task != NULL) { task->cb(task->arg); + pr_op_debug("Thread %s.%u: Task '%s' ended", pool->name, + thread_id, task->name); /* Now releasing the task */ task_destroy(task); - pr_op_debug("Task ended"); } thread_pool_lock(pool); @@ -192,6 +211,7 @@ tasks_poll(void *arg) pthread_cond_signal(&(pool->waiting_cond)); thread_pool_unlock(pool); + pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id); return NULL; } @@ -242,7 +262,7 @@ spawn_threads(struct thread_pool *pool, unsigned int threads) goto end; } - pr_op_debug("Pool thread #%u spawned", i); + pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i); } end: @@ -251,24 +271,25 @@ end: } int -thread_pool_create(unsigned int threads, struct thread_pool **pool) +thread_pool_create(char const *name, unsigned int threads, + struct thread_pool **pool) { - struct thread_pool *tmp; + struct thread_pool *result; int error; - tmp = malloc(sizeof(struct thread_pool)); - if (tmp == NULL) + result = malloc(sizeof(struct thread_pool)); + if (result == NULL) return pr_enomem(); /* Init locking */ - error = pthread_mutex_init(&(tmp->lock), NULL); + error = pthread_mutex_init(&(result->lock), NULL); if (error) { error = pr_op_errno(error, "Calling pthread_mutex_init()"); goto free_tmp; } /* Init conditional to signal pending work */ - error = pthread_cond_init(&(tmp->working_cond), NULL); + error = pthread_cond_init(&(result->working_cond), NULL); if (error) { error = pr_op_errno(error, "Calling pthread_cond_init() at working condition"); @@ -276,32 +297,33 @@ thread_pool_create(unsigned int threads, struct thread_pool **pool) } /* Init conditional to signal no pending work */ - error = pthread_cond_init(&(tmp->waiting_cond), NULL); + error = pthread_cond_init(&(result->waiting_cond), NULL); if (error) { error = pr_op_errno(error, "Calling pthread_cond_init() at waiting condition"); goto free_working_cond; } - TAILQ_INIT(&(tmp->queue)); - tmp->stop = false; - tmp->working_count = 0; - tmp->thread_count = 0; + TAILQ_INIT(&(result->queue)); + result->name = name; + result->stop = false; + result->working_count = 0; + result->thread_count = 0; - error = spawn_threads(tmp, threads); + error = spawn_threads(result, threads); if (error) goto free_waiting_cond; - *pool = tmp; + *pool = result; return 0; free_waiting_cond: - pthread_cond_destroy(&(tmp->waiting_cond)); + pthread_cond_destroy(&(result->waiting_cond)); free_working_cond: - pthread_cond_destroy(&(tmp->working_cond)); + pthread_cond_destroy(&(result->working_cond)); free_mutex: - pthread_mutex_destroy(&(tmp->lock)); + pthread_mutex_destroy(&(result->lock)); free_tmp: - free(tmp); + free(result); return error; } @@ -309,7 +331,7 @@ void thread_pool_destroy(struct thread_pool *pool) { struct task_queue *queue; - struct task *tmp; + struct thread_pool_task *tmp; /* Remove all pending work and send the signal to stop it */ thread_pool_lock(pool); @@ -365,13 +387,14 @@ static int validate_thread_count(struct thread_pool *pool) * @arg. */ int -thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) +thread_pool_push(struct thread_pool *pool, char const *task_name, + thread_pool_task_cb cb, void *arg) { - struct task *task; + struct thread_pool_task *task; int error; task = NULL; - error = task_create(cb, arg, &task); + error = task_create(task_name, cb, arg, &task); if (error) return error; @@ -383,7 +406,7 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) goto end; } - task_queue_push(&(pool->queue), task); + task_queue_push(pool, task); /* There's work to do! */ pthread_cond_signal(&(pool->working_cond)); @@ -411,11 +434,10 @@ thread_pool_wait(struct thread_pool *pool) { thread_pool_lock(pool); while (true) { - pr_op_debug("Waiting all tasks from the pool to end"); pr_op_debug("- Stop: %s", pool->stop ? "true" : "false"); pr_op_debug("- Working count: %u", pool->working_count); pr_op_debug("- Thread count: %u", pool->thread_count); - pr_op_debug("- Empty queue: %s", + pr_op_debug("- Empty task queue: %s", TAILQ_EMPTY(&(pool->queue)) ? "true" : "false"); if (pool->stop) { @@ -429,8 +451,11 @@ thread_pool_wait(struct thread_pool *pool) break; } + pr_op_debug("Pool '%s': Waiting for tasks to be completed", + pool->name); pthread_cond_wait(&(pool->waiting_cond), &(pool->lock)); } thread_pool_unlock(pool); - pr_op_debug("Waiting has ended, all tasks have finished"); + pr_op_debug("Pool '%s': Waiting has ended, all tasks done", + pool->name); } diff --git a/src/thread/thread_pool.h b/src/thread/thread_pool.h index 4a75cf13..c294f7df 100644 --- a/src/thread/thread_pool.h +++ b/src/thread/thread_pool.h @@ -6,11 +6,12 @@ /* Thread pool base struct */ struct thread_pool; -int thread_pool_create(unsigned int, struct thread_pool **); +int thread_pool_create(char const *, unsigned int, struct thread_pool **); void thread_pool_destroy(struct thread_pool *); typedef void *(*thread_pool_task_cb)(void *); -int thread_pool_push(struct thread_pool *, thread_pool_task_cb, void *); +int thread_pool_push(struct thread_pool *, char const *, thread_pool_task_cb, + void *); bool thread_pool_avail_threads(struct thread_pool *); void thread_pool_wait(struct thread_pool *); diff --git a/src/validation_run.c b/src/validation_run.c index 325641fd..50dce3a2 100644 --- a/src/validation_run.c +++ b/src/validation_run.c @@ -17,9 +17,9 @@ validation_run_first(void) int error; if (config_get_mode() == SERVER) - pr_op_warn("First validation cycle has begun, wait until the next notification to connect your router(s)"); + pr_op_info("First validation cycle has begun, wait until the next notification to connect your router(s)"); else - pr_op_warn("First validation cycle has begun"); + pr_op_info("First validation cycle has begun"); upd = false; error = vrps_update(&upd); @@ -27,9 +27,11 @@ validation_run_first(void) return pr_op_err("First validation wasn't successful."); if (config_get_mode() == SERVER) - return pr_op_warn("First validation cycle successfully ended, now you can connect your router(s)"); + pr_op_info("First validation cycle successfully ended, now you can connect your router(s)"); + else + pr_op_info("First validation cycle successfully ended, terminating execution"); - return pr_op_warn("First validation cycle successfully ended, terminating execution"); + return 0; } /* Run a validation cycle each 'server.interval.validation' secs */ -- 2.47.3