From: pcarana Date: Fri, 6 Nov 2020 01:49:43 +0000 (-0600) Subject: Implement a thread pool, still pending to use at RTR clients X-Git-Tag: v1.5.0~14 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5b80f4d54a8f5ecd9f931c63787fa517eca6fd87;p=thirdparty%2FFORT-validator.git Implement a thread pool, still pending to use at RTR clients +The pool is basically a tasks queue, it's initialized using a fixed amount of threads (all of them spawned at pool creation) where each of them will be waiting for pending tasks to attend. +TODO: the number of threads per pool must be configurable. +TODO: right now only a pool is utilized at the TALs validation (and therefore the whole RPKI tree beneath them), at least another pool can be used to receive RTR clients. --- diff --git a/src/Makefile.am b/src/Makefile.am index c4ee73a4..cf538093 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -116,6 +116,8 @@ fort_SOURCES += slurm/db_slurm.c slurm/db_slurm.h fort_SOURCES += slurm/slurm_loader.c slurm/slurm_loader.h fort_SOURCES += slurm/slurm_parser.c slurm/slurm_parser.h +fort_SOURCES += thread/thread_pool.c thread/thread_pool.h + fort_SOURCES += xml/relax_ng.c xml/relax_ng.h # I'm placing these at the end because they rarely change, and I want warnings diff --git a/src/object/tal.c b/src/object/tal.c index d74c68cb..49487e44 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -48,11 +48,7 @@ struct tal { }; struct validation_thread { - /* - * pid is not guaranteed to be defined in the thread. It should only - * be manipulated by the parent thread. - */ - pthread_t pid; + /* TAL file name */ char *tal_file; /* * Try to use the TA from the local cache? Only if none of the URIs @@ -71,6 +67,7 @@ struct validation_thread { SLIST_HEAD(threads_list, validation_thread); struct tal_param { + struct thread_pool *pool; struct db_table *db; struct threads_list *threads; }; @@ -394,7 +391,8 @@ fail4: return error; } -void tal_destroy(struct tal *tal) +void +tal_destroy(struct tal *tal) { if (tal == NULL) return; @@ -751,10 +749,9 @@ __do_file_validation(char const *tal_file, void *arg) thread->retry_local = true; thread->sync_files = true; - errno = pthread_create(&thread->pid, NULL, do_file_validation, thread); - if (errno) { - error = -pr_op_errno(errno, - "Could not spawn the file validation thread"); + error = thread_pool_push(t_param->pool, do_file_validation, thread); + if (error) { + pr_op_err("Couldn't push a thread to do files validation"); goto free_tal_file; } @@ -771,7 +768,7 @@ free_db_rrdp: } int -perform_standalone_validation(struct db_table *table) +perform_standalone_validation(struct thread_pool *pool, struct db_table *table) { struct tal_param *param; struct threads_list threads; @@ -787,16 +784,16 @@ perform_standalone_validation(struct db_table *table) SLIST_INIT(&threads); + param->pool = pool; param->db = table; param->threads = &threads; error = process_file_or_dir(config_get_tal(), TAL_FILE_EXTENSION, true, __do_file_validation, param); if (error) { - /* End all threads */ + /* End all thread data */ while (!SLIST_EMPTY(&threads)) { thread = threads.slh_first; - close_thread(thread->pid, thread->tal_file); SLIST_REMOVE_HEAD(&threads, next); thread_destroy(thread); } @@ -805,13 +802,11 @@ perform_standalone_validation(struct db_table *table) } /* Wait for all */ + thread_pool_wait(pool); + t_error = 0; while (!SLIST_EMPTY(&threads)) { thread = threads.slh_first; - error = pthread_join(thread->pid, NULL); - if (error) - pr_crit("pthread_join() threw %d on the '%s' thread.", - error, thread->tal_file); SLIST_REMOVE_HEAD(&threads, next); if (thread->exit_status) { t_error = thread->exit_status; diff --git a/src/object/tal.h b/src/object/tal.h index dba2644b..4acc9162 100644 --- a/src/object/tal.h +++ b/src/object/tal.h @@ -6,6 +6,7 @@ #include #include "uri.h" #include "rtr/db/db_table.h" +#include "thread/thread_pool.h" struct tal; @@ -15,6 +16,6 @@ void tal_destroy(struct tal *); char const *tal_get_file_name(struct tal *); void tal_get_spki(struct tal *, unsigned char const **, size_t *); -int perform_standalone_validation(struct db_table *); +int perform_standalone_validation(struct thread_pool *, struct db_table *); #endif /* TAL_OBJECT_H_ */ diff --git a/src/rtr/db/vrps.c b/src/rtr/db/vrps.c index ec6901c1..7e89728a 100644 --- a/src/rtr/db/vrps.c +++ b/src/rtr/db/vrps.c @@ -13,6 +13,7 @@ #include "object/tal.h" #include "rtr/db/db_table.h" #include "slurm/slurm_loader.h" +#include "thread/thread_pool.h" /* * Storage of VRPs (term taken from RFC 6811 "Validated ROA Payload") and @@ -64,6 +65,9 @@ struct state { static struct state state; +/* Thread pool to use when the TALs will be processed */ +static struct thread_pool *pool; + /** Read/write lock, which protects @state and its inhabitants. */ static pthread_rwlock_t state_lock; @@ -82,6 +86,12 @@ vrps_init(void) time_t now; int error; + /* FIXME (now) Configure max threads for tals pool */ + pool = NULL; + error = thread_pool_create(10, &pool); + if (error) + return error; + state.base = NULL; deltas_db_init(&state.deltas); @@ -124,6 +134,7 @@ release_state_lock: pthread_rwlock_destroy(&state_lock); release_deltas: deltas_db_cleanup(&state.deltas, deltagroup_cleanup); + thread_pool_destroy(pool); return error; } @@ -138,6 +149,7 @@ vrps_destroy(void) /* Nothing to do with error codes from now on */ pthread_rwlock_destroy(&state_lock); pthread_rwlock_destroy(&table_lock); + thread_pool_destroy(pool); } #define WLOCK_HANDLER(lock, cb) \ @@ -188,7 +200,7 @@ __perform_standalone_validation(struct db_table **result) if (db == NULL) return pr_enomem(); - error = perform_standalone_validation(db); + error = perform_standalone_validation(pool, db); if (error) { db_table_destroy(db); return error; @@ -429,6 +441,7 @@ vrps_update(bool *changed) return error; } + /** * Please keep in mind that there is at least one errcode-aware caller. The most * important ones are diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c new file mode 100644 index 00000000..4cac370e --- /dev/null +++ b/src/thread/thread_pool.c @@ -0,0 +1,353 @@ +#include "thread/thread_pool.h" + +#include +#include +#include +#include +#include +#include "log.h" + +/* + * Mainly based on a solution proposed by Ivaylo Josifov (from VarnaIX) + * and from https://nachtimwald.com/2019/04/12/thread-pool-in-c/ + */ + +/* Task to be done by each thread */ +struct task { + thread_pool_task_cb cb; + void *arg; + TAILQ_ENTRY(task) next; +}; + +/* Tasks queue (utilized as FIFO) */ +TAILQ_HEAD(task_queue, task); + +struct thread_pool { + pthread_mutex_t lock; + /* Work/wait conditions, utilized accordingly to their names */ + pthread_cond_t working_cond; + pthread_cond_t waiting_cond; + /* Currently working thread */ + unsigned int working_count; + /* Total number of spawned threads */ + unsigned int thread_count; + /* Use to stop all the threads */ + bool stop; + /* Queue of pending tasks to attend */ + struct task_queue queue; +}; + +static void +thread_pool_lock(struct thread_pool *pool) +{ + int error; + + error = pthread_mutex_lock(&(pool->lock)); + if (error) + pr_crit("pthread_mutex_lock() returned error code %d. This is too critical for a graceful recovery; I must die now.", + error); +} + +static void +thread_pool_unlock(struct thread_pool *pool) +{ + int error; + + error = pthread_mutex_unlock(&(pool->lock)); + if (error) + pr_crit("pthread_mutex_unlock() returned error code %d. This is too critical for a graceful recovery; I must die now.", + error); +} + +static int +task_create(thread_pool_task_cb cb, void *arg, struct task **result) +{ + struct task *tmp; + + tmp = malloc(sizeof(struct task)); + if (tmp == NULL) + return pr_enomem(); + + tmp->cb = cb; + tmp->arg = arg; + + *result = tmp; + return 0; +} + +static void +task_destroy(struct task *task) +{ + free(task); +} + +/* Get the TAIL, remove the ref from @queue, don't forget to free the task! */ +static struct task * +task_queue_pull(struct task_queue *queue) +{ + struct task *tmp; + + tmp = TAILQ_LAST(queue, task_queue); + TAILQ_REMOVE(queue, tmp, next); + + return tmp; +} + +/* Insert the task at the HEAD */ +static void +task_queue_push(struct task_queue *queue, struct task *task) +{ + TAILQ_INSERT_HEAD(queue, task, next); +} + +/* + * Poll for pending tasks at the pool queue. Called by each spawned thread. + * + * Once a task is available, at least one thread of the pool will process it. + * + * The call ends only if the pool wishes to be stopped. + */ +static void * +tasks_poll(void *arg) +{ + struct thread_pool *pool = arg; + struct task *task; + + /* The thread has started, send the signal */ + thread_pool_lock(pool); + pthread_cond_signal(&(pool->waiting_cond)); + thread_pool_unlock(pool); + + while (true) { + thread_pool_lock(pool); + + while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { + pthread_cond_wait(&(pool->working_cond), &(pool->lock)); + } + if (pool->stop) + break; + + /* Pop the tail */ + task = task_queue_pull(&(pool->queue)); + pool->working_count++; + pr_op_debug("Working on task #%u", pool->working_count); + thread_pool_unlock(pool); + + if (task != NULL) { + task->cb(task->arg); + /* Now releasing the task */ + task_destroy(task); + pr_op_debug("Task ended"); + } + + thread_pool_lock(pool); + pool->working_count--; + if (!pool->stop && pool->working_count == 0 && + TAILQ_EMPTY(&(pool->queue))) + pthread_cond_signal(&(pool->waiting_cond)); + + thread_pool_unlock(pool); + } + + /* The thread will cease to exist */ + pool->thread_count--; + pthread_cond_signal(&(pool->waiting_cond)); + thread_pool_unlock(pool); + + return NULL; +} + +/* + * Wait a couple of seconds to be sure the thread has started and is ready to + * work + */ +static int +thread_pool_thread_wait_start(struct thread_pool *pool) +{ + struct timespec tmout = { + .tv_sec = 0 , + .tv_nsec = 0 + }; + int error; + + /* 2 seconds to start a thread */ + clock_gettime(CLOCK_REALTIME, &tmout); + tmout.tv_sec += 2; + + error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock), + &tmout); + if (error) + return pr_op_errno(error, "Waiting thread to start"); + + return 0; +} + +static int +tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point) +{ + pthread_attr_t attr; + pthread_t thread_id; + int error; + + memset(&thread_id, 0, sizeof(pthread_t)); + + error = pthread_attr_init(&attr); + if (error) + return pr_op_errno(error, "Calling pthread_attr_init()"); + + /* FIXME (now) Let at 2MB? */ + /* 2MB */ + error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2); + if (error) + return pr_op_errno(error, + "Calling pthread_attr_setstacksize()"); + + error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (error) + return pr_op_errno(error, + "Calling pthread_attr_setdetachstate()"); + + thread_pool_lock(pool); + error = pthread_create(&thread_id, &attr, entry_point, pool); + pthread_attr_destroy(&attr); + if (error) { + thread_pool_unlock(pool); + return pr_op_errno(error, "Spawning pool thread"); + } + + error = thread_pool_thread_wait_start(pool); + if (error) { + thread_pool_unlock(pool); + return error; + } + thread_pool_unlock(pool); + + return 0; +} + +int +thread_pool_create(unsigned int threads, struct thread_pool **pool) +{ + struct thread_pool *tmp; + unsigned int i; + int error; + + tmp = malloc(sizeof(struct thread_pool)); + if (tmp == NULL) + return pr_enomem(); + + /* Init locking */ + error = pthread_mutex_init(&(tmp->lock), NULL); + if (error) { + error = pr_op_errno(error, "Calling pthread_mutex_init()"); + goto free_tmp; + } + + /* Init conditional to signal pending work */ + error = pthread_cond_init(&(tmp->working_cond), NULL); + if (error) { + error = pr_op_errno(error, + "Calling pthread_cond_init() at working condition"); + goto free_mutex; + } + + /* Init conditional to signal no pending work */ + error = pthread_cond_init(&(tmp->waiting_cond), NULL); + if (error) { + error = pr_op_errno(error, + "Calling pthread_cond_init() at waiting condition"); + goto free_working_cond; + } + + TAILQ_INIT(&(tmp->queue)); + tmp->stop = false; + tmp->working_count = 0; + tmp->thread_count = threads; + + for (i = 0; i < threads; i++) { + error = tpool_thread_spawn(tmp, tasks_poll); + if (error) { + thread_pool_destroy(tmp); + return error; + } + pr_op_debug("Pool thread #%u spawned", i); + } + + *pool = tmp; + return 0; +free_working_cond: + pthread_cond_destroy(&(tmp->working_cond)); +free_mutex: + pthread_mutex_destroy(&(tmp->lock)); +free_tmp: + free(tmp); + return error; +} + +void +thread_pool_destroy(struct thread_pool *pool) +{ + struct task_queue *queue; + struct task *tmp; + + /* Remove all pending work and send the signal to stop it */ + thread_pool_lock(pool); + queue = &(pool->queue); + while (!TAILQ_EMPTY(queue)) { + tmp = TAILQ_FIRST(queue); + TAILQ_REMOVE(queue, tmp, next); + task_destroy(tmp); + } + pool->stop = true; + pthread_cond_broadcast(&(pool->working_cond)); + thread_pool_unlock(pool); + + /* Wait for all to end */ + thread_pool_wait(pool); + + pthread_cond_destroy(&(pool->waiting_cond)); + pthread_cond_destroy(&(pool->working_cond)); + pthread_mutex_destroy(&(pool->lock)); + free(pool); +} + +/* + * Push a new task to @pool, the task to be executed is @cb with the argument + * @arg. + */ +int +thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) +{ + struct task *task; + int error; + + task = NULL; + error = task_create(cb, arg, &task); + if (error) + return error; + + thread_pool_lock(pool); + task_queue_push(&(pool->queue), task); + /* There's work to do! */ + pthread_cond_broadcast(&(pool->working_cond)); + thread_pool_unlock(pool); + + return 0; +} + +/* Waits for all pending tasks at @poll to end */ +void +thread_pool_wait(struct thread_pool *pool) +{ + thread_pool_lock(pool); + while (true) { + pr_op_debug("Waiting all tasks from the pool to end"); + if ((!pool->stop && pool->working_count != 0) || + (pool->stop && pool->thread_count != 0)) + pthread_cond_wait(&(pool->waiting_cond), &(pool->lock)); + else + break; + } + thread_pool_unlock(pool); + pr_op_debug("Waiting has ended, all tasks have finished"); +} diff --git a/src/thread/thread_pool.h b/src/thread/thread_pool.h new file mode 100644 index 00000000..9a17813a --- /dev/null +++ b/src/thread/thread_pool.h @@ -0,0 +1,15 @@ +#ifndef SRC_THREAD_THREAD_POOL_H_ +#define SRC_THREAD_THREAD_POOL_H_ + +/* Thread pool base struct */ +struct thread_pool; + +int thread_pool_create(unsigned int, struct thread_pool **); +void thread_pool_destroy(struct thread_pool *); + +typedef void *(*thread_pool_task_cb)(void *); +int thread_pool_push(struct thread_pool *, thread_pool_task_cb, void *); + +void thread_pool_wait(struct thread_pool *); + +#endif /* SRC_THREAD_THREAD_POOL_H_ */ diff --git a/test/Makefile.am b/test/Makefile.am index 27affc02..c3539527 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -27,6 +27,7 @@ check_PROGRAMS += line_file.test check_PROGRAMS += pdu_handler.test check_PROGRAMS += rsync.test check_PROGRAMS += tal.test +check_PROGRAMS += thread_pool.test check_PROGRAMS += vcard.test check_PROGRAMS += vrps.test check_PROGRAMS += xml.test @@ -58,6 +59,9 @@ rsync_test_LDADD = ${MY_LDADD} tal_test_SOURCES = tal_test.c tal_test_LDADD = ${MY_LDADD} +thread_pool_test_SOURCES = thread_pool_test.c +thread_pool_test_LDADD = ${MY_LDADD} + vcard_test_SOURCES = vcard_test.c vcard_test_LDADD = ${MY_LDADD} diff --git a/test/rtr/db/rtr_db_impersonator.c b/test/rtr/db/rtr_db_impersonator.c index 6d83c526..9b8d930e 100644 --- a/test/rtr/db/rtr_db_impersonator.c +++ b/test/rtr/db/rtr_db_impersonator.c @@ -70,7 +70,7 @@ __handle_router_key(unsigned char const *ski, uint32_t as, } int -perform_standalone_validation(struct db_table *table) +perform_standalone_validation(struct thread_pool *pool, struct db_table *table) { struct validation_handler handler; diff --git a/test/rtr/db/vrps_test.c b/test/rtr/db/vrps_test.c index 962cc447..fde5e5de 100644 --- a/test/rtr/db/vrps_test.c +++ b/test/rtr/db/vrps_test.c @@ -18,6 +18,7 @@ #include "slurm/db_slurm.c" #include "slurm/slurm_loader.c" #include "slurm/slurm_parser.c" +#include "thread/thread_pool.c" /* -- Expected database descriptors -- */ diff --git a/test/rtr/pdu_handler_test.c b/test/rtr/pdu_handler_test.c index b0fcbcdc..74b22b66 100644 --- a/test/rtr/pdu_handler_test.c +++ b/test/rtr/pdu_handler_test.c @@ -25,6 +25,7 @@ #include "slurm/db_slurm.c" #include "slurm/slurm_loader.c" #include "slurm/slurm_parser.c" +#include "thread/thread_pool.c" /* Helper functions */ diff --git a/test/tal_test.c b/test/tal_test.c index 93f39d29..795d7a3d 100644 --- a/test/tal_test.c +++ b/test/tal_test.c @@ -15,6 +15,7 @@ #include "random.c" #include "crypto/base64.c" #include "rsync/rsync.c" +#include "thread/thread_pool.c" /* Impersonate functions that won't be utilized by tests */ diff --git a/test/thread_pool_test.c b/test/thread_pool_test.c new file mode 100644 index 00000000..c9a3bb71 --- /dev/null +++ b/test/thread_pool_test.c @@ -0,0 +1,83 @@ +#include +#include +#include + +#include "log.c" +#include "impersonator.c" +#include "thread/thread_pool.c" + +#define TOTAL_THREADS 50 + +static void * +thread_work(void *arg) +{ + int *value = arg; + sleep(2); + (*value) += 2; + return NULL; +} + +START_TEST(tpool_work) +{ + struct thread_pool *pool; + int **data; + int i; + int error; + + error = thread_pool_create(TOTAL_THREADS, &pool); + ck_assert_int_eq(error, 0); + + /* Just a dummy array where each thread will modify one slot only */ + data = calloc(TOTAL_THREADS, sizeof(int *)); + ck_assert_ptr_ne(data, NULL); + + for (i = 0; i < TOTAL_THREADS; i++) { + data[i] = malloc(sizeof(int)); + ck_assert_ptr_ne(data[i], NULL); + *data[i] = 0; + thread_pool_push(pool, thread_work, data[i]); + } + + /* Wait for all to finish (~2 sec) */ + thread_pool_wait(pool); + + /* Every element should have been modified */ + for (i = 0; i < TOTAL_THREADS; i++) { + ck_assert_int_eq(*data[i], 2); + free(data[i]); + } + + free(data); + thread_pool_destroy(pool); +} +END_TEST + +Suite *thread_pool_suite(void) +{ + Suite *suite; + TCase *work; + + work = tcase_create("work"); + tcase_add_test(work, tpool_work); + + suite = suite_create("thread_pool_test()"); + suite_add_tcase(suite, work); + + return suite; +} + +int main(void) +{ + Suite *suite; + SRunner *runner; + int tests_failed; + + suite = thread_pool_suite(); + + runner = srunner_create(suite); + srunner_run_all(runner, CK_NORMAL); + tests_failed = srunner_ntests_failed(runner); + srunner_free(runner); + + return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +}