]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Implement a thread pool, still pending to use at RTR clients
authorpcarana <pc.moreno2099@gmail.com>
Fri, 6 Nov 2020 01:49:43 +0000 (19:49 -0600)
committerpcarana <pc.moreno2099@gmail.com>
Fri, 6 Nov 2020 01:49:43 +0000 (19:49 -0600)
+The pool is basically a tasks queue, it's initialized using a fixed amount of threads (all of them spawned at pool creation) where each of them will be waiting for pending tasks to attend.
+TODO: the number of threads per pool must be configurable.
+TODO: right now only a pool is utilized at the TALs validation (and therefore the whole RPKI tree beneath them), at least another pool can be used to receive RTR clients.

12 files changed:
src/Makefile.am
src/object/tal.c
src/object/tal.h
src/rtr/db/vrps.c
src/thread/thread_pool.c [new file with mode: 0644]
src/thread/thread_pool.h [new file with mode: 0644]
test/Makefile.am
test/rtr/db/rtr_db_impersonator.c
test/rtr/db/vrps_test.c
test/rtr/pdu_handler_test.c
test/tal_test.c
test/thread_pool_test.c [new file with mode: 0644]

index c4ee73a47547e8e5bdf6e390dcf94fb5081d92fd..cf5380933791c6e9c3cd894581a8ba2661c8da29 100644 (file)
@@ -116,6 +116,8 @@ fort_SOURCES += slurm/db_slurm.c slurm/db_slurm.h
 fort_SOURCES += slurm/slurm_loader.c slurm/slurm_loader.h
 fort_SOURCES += slurm/slurm_parser.c slurm/slurm_parser.h
 
+fort_SOURCES += thread/thread_pool.c thread/thread_pool.h
+
 fort_SOURCES += xml/relax_ng.c xml/relax_ng.h
 
 # I'm placing these at the end because they rarely change, and I want warnings
index d74c68cb5e062f13abab050ac161b6e26ea51af7..49487e44067cac701992d3c0ed10e5b952339b46 100644 (file)
@@ -48,11 +48,7 @@ struct tal {
 };
 
 struct validation_thread {
-       /*
-        * pid is not guaranteed to be defined in the thread. It should only
-        * be manipulated by the parent thread.
-        */
-       pthread_t pid;
+       /* TAL file name */
        char *tal_file;
        /*
         * Try to use the TA from the local cache? Only if none of the URIs
@@ -71,6 +67,7 @@ struct validation_thread {
 SLIST_HEAD(threads_list, validation_thread);
 
 struct tal_param {
+       struct thread_pool *pool;
        struct db_table *db;
        struct threads_list *threads;
 };
@@ -394,7 +391,8 @@ fail4:
        return error;
 }
 
-void tal_destroy(struct tal *tal)
+void
+tal_destroy(struct tal *tal)
 {
        if (tal == NULL)
                return;
@@ -751,10 +749,9 @@ __do_file_validation(char const *tal_file, void *arg)
        thread->retry_local = true;
        thread->sync_files = true;
 
-       errno = pthread_create(&thread->pid, NULL, do_file_validation, thread);
-       if (errno) {
-               error = -pr_op_errno(errno,
-                   "Could not spawn the file validation thread");
+       error = thread_pool_push(t_param->pool, do_file_validation, thread);
+       if (error) {
+               pr_op_err("Couldn't push a thread to do files validation");
                goto free_tal_file;
        }
 
@@ -771,7 +768,7 @@ free_db_rrdp:
 }
 
 int
-perform_standalone_validation(struct db_table *table)
+perform_standalone_validation(struct thread_pool *pool, struct db_table *table)
 {
        struct tal_param *param;
        struct threads_list threads;
@@ -787,16 +784,16 @@ perform_standalone_validation(struct db_table *table)
 
        SLIST_INIT(&threads);
 
+       param->pool = pool;
        param->db = table;
        param->threads = &threads;
 
        error = process_file_or_dir(config_get_tal(), TAL_FILE_EXTENSION, true,
            __do_file_validation, param);
        if (error) {
-               /* End all threads */
+               /* End all thread data */
                while (!SLIST_EMPTY(&threads)) {
                        thread = threads.slh_first;
-                       close_thread(thread->pid, thread->tal_file);
                        SLIST_REMOVE_HEAD(&threads, next);
                        thread_destroy(thread);
                }
@@ -805,13 +802,11 @@ perform_standalone_validation(struct db_table *table)
        }
 
        /* Wait for all */
+       thread_pool_wait(pool);
+
        t_error = 0;
        while (!SLIST_EMPTY(&threads)) {
                thread = threads.slh_first;
-               error = pthread_join(thread->pid, NULL);
-               if (error)
-                       pr_crit("pthread_join() threw %d on the '%s' thread.",
-                           error, thread->tal_file);
                SLIST_REMOVE_HEAD(&threads, next);
                if (thread->exit_status) {
                        t_error = thread->exit_status;
index dba2644b9095a72c609b7e91e9ad7eed9c2b43e4..4acc91624928bf679e9f27ceafa0712f60a15654 100644 (file)
@@ -6,6 +6,7 @@
 #include <stddef.h>
 #include "uri.h"
 #include "rtr/db/db_table.h"
+#include "thread/thread_pool.h"
 
 struct tal;
 
@@ -15,6 +16,6 @@ void tal_destroy(struct tal *);
 char const *tal_get_file_name(struct tal *);
 void tal_get_spki(struct tal *, unsigned char const **, size_t *);
 
-int perform_standalone_validation(struct db_table *);
+int perform_standalone_validation(struct thread_pool *, struct db_table *);
 
 #endif /* TAL_OBJECT_H_ */
index ec6901c10972c4aeefd19da9d825e8b24f8da09b..7e89728a70369b9cf3f2ab8605434aca909c05b2 100644 (file)
@@ -13,6 +13,7 @@
 #include "object/tal.h"
 #include "rtr/db/db_table.h"
 #include "slurm/slurm_loader.h"
+#include "thread/thread_pool.h"
 
 /*
  * Storage of VRPs (term taken from RFC 6811 "Validated ROA Payload") and
@@ -64,6 +65,9 @@ struct state {
 
 static struct state state;
 
+/* Thread pool to use when the TALs will be processed */
+static struct thread_pool *pool;
+
 /** Read/write lock, which protects @state and its inhabitants. */
 static pthread_rwlock_t state_lock;
 
@@ -82,6 +86,12 @@ vrps_init(void)
        time_t now;
        int error;
 
+       /* FIXME (now) Configure max threads for tals pool */
+       pool = NULL;
+       error = thread_pool_create(10, &pool);
+       if (error)
+               return error;
+
        state.base = NULL;
 
        deltas_db_init(&state.deltas);
@@ -124,6 +134,7 @@ release_state_lock:
        pthread_rwlock_destroy(&state_lock);
 release_deltas:
        deltas_db_cleanup(&state.deltas, deltagroup_cleanup);
+       thread_pool_destroy(pool);
        return error;
 }
 
@@ -138,6 +149,7 @@ vrps_destroy(void)
        /* Nothing to do with error codes from now on */
        pthread_rwlock_destroy(&state_lock);
        pthread_rwlock_destroy(&table_lock);
+       thread_pool_destroy(pool);
 }
 
 #define WLOCK_HANDLER(lock, cb)                                                \
@@ -188,7 +200,7 @@ __perform_standalone_validation(struct db_table **result)
        if (db == NULL)
                return pr_enomem();
 
-       error = perform_standalone_validation(db);
+       error = perform_standalone_validation(pool, db);
        if (error) {
                db_table_destroy(db);
                return error;
@@ -429,6 +441,7 @@ vrps_update(bool *changed)
 
        return error;
 }
+
 /**
  * Please keep in mind that there is at least one errcode-aware caller. The most
  * important ones are
diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c
new file mode 100644 (file)
index 0000000..4cac370
--- /dev/null
@@ -0,0 +1,353 @@
+#include "thread/thread_pool.h"
+
+#include <sys/queue.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include "log.h"
+
+/*
+ * Mainly based on a solution proposed by Ivaylo Josifov (from VarnaIX)
+ * and from https://nachtimwald.com/2019/04/12/thread-pool-in-c/
+ */
+
+/* Task to be done by each thread */
+struct task {
+       thread_pool_task_cb cb;
+       void *arg;
+       TAILQ_ENTRY(task) next;
+};
+
+/* Tasks queue (utilized as FIFO) */
+TAILQ_HEAD(task_queue, task);
+
+struct thread_pool {
+       pthread_mutex_t lock;
+       /* Work/wait conditions, utilized accordingly to their names */
+       pthread_cond_t working_cond;
+       pthread_cond_t waiting_cond;
+       /* Currently working thread */
+       unsigned int working_count;
+       /* Total number of spawned threads */
+       unsigned int thread_count;
+       /* Use to stop all the threads */
+       bool stop;
+       /* Queue of pending tasks to attend */
+       struct task_queue queue;
+};
+
+static void
+thread_pool_lock(struct thread_pool *pool)
+{
+       int error;
+
+       error = pthread_mutex_lock(&(pool->lock));
+       if (error)
+               pr_crit("pthread_mutex_lock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+                   error);
+}
+
+static void
+thread_pool_unlock(struct thread_pool *pool)
+{
+       int error;
+
+       error = pthread_mutex_unlock(&(pool->lock));
+       if (error)
+               pr_crit("pthread_mutex_unlock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+                   error);
+}
+
+static int
+task_create(thread_pool_task_cb cb, void *arg, struct task **result)
+{
+       struct task *tmp;
+
+       tmp = malloc(sizeof(struct task));
+       if (tmp == NULL)
+               return pr_enomem();
+
+       tmp->cb = cb;
+       tmp->arg = arg;
+
+       *result = tmp;
+       return 0;
+}
+
+static void
+task_destroy(struct task *task)
+{
+       free(task);
+}
+
+/* Get the TAIL, remove the ref from @queue, don't forget to free the task! */
+static struct task *
+task_queue_pull(struct task_queue *queue)
+{
+       struct task *tmp;
+
+       tmp = TAILQ_LAST(queue, task_queue);
+       TAILQ_REMOVE(queue, tmp, next);
+
+       return tmp;
+}
+
+/* Insert the task at the HEAD */
+static void
+task_queue_push(struct task_queue *queue, struct task *task)
+{
+       TAILQ_INSERT_HEAD(queue, task, next);
+}
+
+/*
+ * Poll for pending tasks at the pool queue. Called by each spawned thread.
+ *
+ * Once a task is available, at least one thread of the pool will process it.
+ *
+ * The call ends only if the pool wishes to be stopped.
+ */
+static void *
+tasks_poll(void *arg)
+{
+       struct thread_pool *pool = arg;
+       struct task *task;
+
+       /* The thread has started, send the signal */
+       thread_pool_lock(pool);
+       pthread_cond_signal(&(pool->waiting_cond));
+       thread_pool_unlock(pool);
+
+       while (true) {
+               thread_pool_lock(pool);
+
+               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+                       pthread_cond_wait(&(pool->working_cond), &(pool->lock));
+               }
+               if (pool->stop)
+                       break;
+
+               /* Pop the tail */
+               task = task_queue_pull(&(pool->queue));
+               pool->working_count++;
+               pr_op_debug("Working on task #%u", pool->working_count);
+               thread_pool_unlock(pool);
+
+               if (task != NULL) {
+                       task->cb(task->arg);
+                       /* Now releasing the task */
+                       task_destroy(task);
+                       pr_op_debug("Task ended");
+               }
+
+               thread_pool_lock(pool);
+               pool->working_count--;
+               if (!pool->stop && pool->working_count == 0 &&
+                   TAILQ_EMPTY(&(pool->queue)))
+                       pthread_cond_signal(&(pool->waiting_cond));
+
+               thread_pool_unlock(pool);
+       }
+
+       /* The thread will cease to exist */
+       pool->thread_count--;
+       pthread_cond_signal(&(pool->waiting_cond));
+       thread_pool_unlock(pool);
+
+       return NULL;
+}
+
+/*
+ * Wait a couple of seconds to be sure the thread has started and is ready to
+ * work
+ */
+static int
+thread_pool_thread_wait_start(struct thread_pool *pool)
+{
+       struct timespec tmout = {
+           .tv_sec = 0 ,
+           .tv_nsec = 0
+       };
+       int error;
+
+       /* 2 seconds to start a thread */
+       clock_gettime(CLOCK_REALTIME, &tmout);
+       tmout.tv_sec += 2;
+
+       error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
+           &tmout);
+       if (error)
+               return pr_op_errno(error, "Waiting thread to start");
+
+       return 0;
+}
+
+static int
+tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point)
+{
+       pthread_attr_t attr;
+       pthread_t thread_id;
+       int error;
+
+       memset(&thread_id, 0, sizeof(pthread_t));
+
+       error = pthread_attr_init(&attr);
+       if (error)
+               return pr_op_errno(error, "Calling pthread_attr_init()");
+
+       /* FIXME (now) Let at 2MB? */
+       /* 2MB */
+       error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
+       if (error)
+               return pr_op_errno(error,
+                   "Calling pthread_attr_setstacksize()");
+
+       error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+       if (error)
+               return pr_op_errno(error,
+                   "Calling pthread_attr_setdetachstate()");
+
+       thread_pool_lock(pool);
+       error = pthread_create(&thread_id, &attr, entry_point, pool);
+       pthread_attr_destroy(&attr);
+       if (error) {
+               thread_pool_unlock(pool);
+               return pr_op_errno(error, "Spawning pool thread");
+       }
+
+       error = thread_pool_thread_wait_start(pool);
+       if (error) {
+               thread_pool_unlock(pool);
+               return error;
+       }
+       thread_pool_unlock(pool);
+
+       return 0;
+}
+
+int
+thread_pool_create(unsigned int threads, struct thread_pool **pool)
+{
+       struct thread_pool *tmp;
+       unsigned int i;
+       int error;
+
+       tmp = malloc(sizeof(struct thread_pool));
+       if (tmp == NULL)
+               return pr_enomem();
+
+       /* Init locking */
+       error = pthread_mutex_init(&(tmp->lock), NULL);
+       if (error) {
+               error = pr_op_errno(error, "Calling pthread_mutex_init()");
+               goto free_tmp;
+       }
+
+       /* Init conditional to signal pending work */
+       error = pthread_cond_init(&(tmp->working_cond), NULL);
+       if (error) {
+               error = pr_op_errno(error,
+                   "Calling pthread_cond_init() at working condition");
+               goto free_mutex;
+       }
+
+       /* Init conditional to signal no pending work */
+       error = pthread_cond_init(&(tmp->waiting_cond), NULL);
+       if (error) {
+               error = pr_op_errno(error,
+                   "Calling pthread_cond_init() at waiting condition");
+               goto free_working_cond;
+       }
+
+       TAILQ_INIT(&(tmp->queue));
+       tmp->stop = false;
+       tmp->working_count = 0;
+       tmp->thread_count = threads;
+
+       for (i = 0; i < threads; i++) {
+               error = tpool_thread_spawn(tmp, tasks_poll);
+               if (error) {
+                       thread_pool_destroy(tmp);
+                       return error;
+               }
+               pr_op_debug("Pool thread #%u spawned", i);
+       }
+
+       *pool = tmp;
+       return 0;
+free_working_cond:
+       pthread_cond_destroy(&(tmp->working_cond));
+free_mutex:
+       pthread_mutex_destroy(&(tmp->lock));
+free_tmp:
+       free(tmp);
+       return error;
+}
+
+void
+thread_pool_destroy(struct thread_pool *pool)
+{
+       struct task_queue *queue;
+       struct task *tmp;
+
+       /* Remove all pending work and send the signal to stop it */
+       thread_pool_lock(pool);
+       queue = &(pool->queue);
+       while (!TAILQ_EMPTY(queue)) {
+               tmp = TAILQ_FIRST(queue);
+               TAILQ_REMOVE(queue, tmp, next);
+               task_destroy(tmp);
+       }
+       pool->stop = true;
+       pthread_cond_broadcast(&(pool->working_cond));
+       thread_pool_unlock(pool);
+
+       /* Wait for all to end */
+       thread_pool_wait(pool);
+
+       pthread_cond_destroy(&(pool->waiting_cond));
+       pthread_cond_destroy(&(pool->working_cond));
+       pthread_mutex_destroy(&(pool->lock));
+       free(pool);
+}
+
+/*
+ * Push a new task to @pool, the task to be executed is @cb with the argument
+ * @arg.
+ */
+int
+thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
+{
+       struct task *task;
+       int error;
+
+       task = NULL;
+       error = task_create(cb, arg, &task);
+       if (error)
+               return error;
+
+       thread_pool_lock(pool);
+       task_queue_push(&(pool->queue), task);
+       /* There's work to do! */
+       pthread_cond_broadcast(&(pool->working_cond));
+       thread_pool_unlock(pool);
+
+       return 0;
+}
+
+/* Waits for all pending tasks at @poll to end */
+void
+thread_pool_wait(struct thread_pool *pool)
+{
+       thread_pool_lock(pool);
+       while (true) {
+               pr_op_debug("Waiting all tasks from the pool to end");
+               if ((!pool->stop && pool->working_count != 0) ||
+                   (pool->stop && pool->thread_count != 0))
+                       pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
+               else
+                       break;
+       }
+       thread_pool_unlock(pool);
+       pr_op_debug("Waiting has ended, all tasks have finished");
+}
diff --git a/src/thread/thread_pool.h b/src/thread/thread_pool.h
new file mode 100644 (file)
index 0000000..9a17813
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef SRC_THREAD_THREAD_POOL_H_
+#define SRC_THREAD_THREAD_POOL_H_
+
+/* Thread pool base struct */
+struct thread_pool;
+
+int thread_pool_create(unsigned int, struct thread_pool **);
+void thread_pool_destroy(struct thread_pool *);
+
+typedef void *(*thread_pool_task_cb)(void *);
+int thread_pool_push(struct thread_pool *, thread_pool_task_cb, void *);
+
+void thread_pool_wait(struct thread_pool *);
+
+#endif /* SRC_THREAD_THREAD_POOL_H_ */
index 27affc0226297901af530b92dfae7b21ea670e07..c353952721e7bea29af9434d03fac2fa8ec9e39f 100644 (file)
@@ -27,6 +27,7 @@ check_PROGRAMS += line_file.test
 check_PROGRAMS += pdu_handler.test
 check_PROGRAMS += rsync.test
 check_PROGRAMS += tal.test
+check_PROGRAMS += thread_pool.test
 check_PROGRAMS += vcard.test
 check_PROGRAMS += vrps.test
 check_PROGRAMS += xml.test
@@ -58,6 +59,9 @@ rsync_test_LDADD = ${MY_LDADD}
 tal_test_SOURCES = tal_test.c
 tal_test_LDADD = ${MY_LDADD}
 
+thread_pool_test_SOURCES = thread_pool_test.c
+thread_pool_test_LDADD = ${MY_LDADD}
+
 vcard_test_SOURCES = vcard_test.c
 vcard_test_LDADD = ${MY_LDADD}
 
index 6d83c5266f375b653458f05e91bec04662761a1a..9b8d930ed71dd10a662cd6901d17a37907b37680 100644 (file)
@@ -70,7 +70,7 @@ __handle_router_key(unsigned char const *ski, uint32_t as,
 }
 
 int
-perform_standalone_validation(struct db_table *table)
+perform_standalone_validation(struct thread_pool *pool, struct db_table *table)
 {
        struct validation_handler handler;
 
index 962cc4477333750350ad12b10dbdba82f5698aeb..fde5e5de5b0b2fa274a48a12720209676a138503 100644 (file)
@@ -18,6 +18,7 @@
 #include "slurm/db_slurm.c"
 #include "slurm/slurm_loader.c"
 #include "slurm/slurm_parser.c"
+#include "thread/thread_pool.c"
 
 /* -- Expected database descriptors -- */
 
index b0fcbcdc6c80081cbaa5de8cf6582ffbfb79c076..74b22b6635adec4a13305c8d7b09da74dc5d2d91 100644 (file)
@@ -25,6 +25,7 @@
 #include "slurm/db_slurm.c"
 #include "slurm/slurm_loader.c"
 #include "slurm/slurm_parser.c"
+#include "thread/thread_pool.c"
 
 /* Helper functions */
 
index 93f39d297238a5cbee32d90092a830756be6e8d2..795d7a3d1fcf1a72c15a088b7b537720e8a1d78c 100644 (file)
@@ -15,6 +15,7 @@
 #include "random.c"
 #include "crypto/base64.c"
 #include "rsync/rsync.c"
+#include "thread/thread_pool.c"
 
 /* Impersonate functions that won't be utilized by tests */
 
diff --git a/test/thread_pool_test.c b/test/thread_pool_test.c
new file mode 100644 (file)
index 0000000..c9a3bb7
--- /dev/null
@@ -0,0 +1,83 @@
+#include <check.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "log.c"
+#include "impersonator.c"
+#include "thread/thread_pool.c"
+
+#define TOTAL_THREADS 50
+
+static void *
+thread_work(void *arg)
+{
+       int *value = arg;
+       sleep(2);
+       (*value) += 2;
+       return NULL;
+}
+
+START_TEST(tpool_work)
+{
+       struct thread_pool *pool;
+       int **data;
+       int i;
+       int error;
+
+       error = thread_pool_create(TOTAL_THREADS, &pool);
+       ck_assert_int_eq(error, 0);
+
+       /* Just a dummy array where each thread will modify one slot only */
+       data = calloc(TOTAL_THREADS, sizeof(int *));
+       ck_assert_ptr_ne(data, NULL);
+
+       for (i = 0; i < TOTAL_THREADS; i++) {
+               data[i] = malloc(sizeof(int));
+               ck_assert_ptr_ne(data[i], NULL);
+               *data[i] = 0;
+               thread_pool_push(pool, thread_work, data[i]);
+       }
+
+       /* Wait for all to finish (~2 sec) */
+       thread_pool_wait(pool);
+
+       /* Every element should have been modified */
+       for (i = 0; i < TOTAL_THREADS; i++) {
+               ck_assert_int_eq(*data[i], 2);
+               free(data[i]);
+       }
+
+       free(data);
+       thread_pool_destroy(pool);
+}
+END_TEST
+
+Suite *thread_pool_suite(void)
+{
+       Suite *suite;
+       TCase *work;
+
+       work = tcase_create("work");
+       tcase_add_test(work, tpool_work);
+
+       suite = suite_create("thread_pool_test()");
+       suite_add_tcase(suite, work);
+
+       return suite;
+}
+
+int main(void)
+{
+       Suite *suite;
+       SRunner *runner;
+       int tests_failed;
+
+       suite = thread_pool_suite();
+
+       runner = srunner_create(suite);
+       srunner_run_all(runner, CK_NORMAL);
+       tests_failed = srunner_ntests_failed(runner);
+       srunner_free(runner);
+
+       return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}