]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Thread pool: Code review
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Wed, 28 Apr 2021 01:47:03 +0000 (20:47 -0500)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Mon, 24 May 2021 17:40:29 +0000 (12:40 -0500)
Gets rid of some inconsistencies, but no bugs as far as I can tell.

src/thread/thread_pool.c
src/validation_run.c

index 59f26d2d3a369fb82f3b3cd7d831fec321237fae..c2ecc6e5f89b5a69aa59a7cf33017e31519cf0bb 100644 (file)
@@ -47,13 +47,13 @@ struct thread_pool {
         * Used by the Parent Thread to wake up Worker Threads when there's
         * work.
         */
-       pthread_cond_t working_cond;
+       pthread_cond_t parent2worker;
        /*
         * Used by Working Threads to signal the Parent Thread that workers are
         * ready (during initialization) or that all the work is done (after
         * initialization).
         */
-       pthread_cond_t waiting_cond;
+       pthread_cond_t worker2parent;
        /* Number of Working Threads. */
        unsigned int working_count;
        /* Number of Worker Threads. */
@@ -71,25 +71,51 @@ struct thread_pool {
 };
 
 static void
-thread_pool_lock(struct thread_pool *pool)
+panic_on_fail(int error, char const *function_name)
 {
-       int error;
-
-       error = pthread_mutex_lock(&(pool->lock));
        if (error)
-               pr_crit("pthread_mutex_lock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
-                   error);
+               pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+                   function_name, error);
 }
 
 static void
-thread_pool_unlock(struct thread_pool *pool)
+mutex_lock(struct thread_pool *pool)
 {
-       int error;
+       panic_on_fail(pthread_mutex_lock(&pool->lock), "pthread_mutex_lock");
+}
 
-       error = pthread_mutex_unlock(&(pool->lock));
-       if (error)
-               pr_crit("pthread_mutex_unlock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
-                   error);
+static void
+mutex_unlock(struct thread_pool *pool)
+{
+       panic_on_fail(pthread_mutex_unlock(&pool->lock), "pthread_mutex_unlock");
+}
+
+/* Wait until the parent sends us work. */
+static void
+wait_for_parent_signal(struct thread_pool *pool, unsigned int thread_id)
+{
+       pr_op_debug("Thread %s.%u: Waiting for work...", pool->name, thread_id);
+       panic_on_fail(pthread_cond_wait(&pool->parent2worker, &pool->lock),
+           "pthread_cond_wait");
+}
+
+static void
+signal_to_parent(struct thread_pool *pool)
+{
+       panic_on_fail(pthread_cond_signal(&pool->worker2parent));
+}
+
+static void
+wait_for_worker_signal(struct thread_pool *pool)
+{
+       panic_on_fail(pthread_cond_wait(&pool->worker2parent, &pool->lock),
+           "pthread_cond_wait");
+}
+
+static void
+signal_to_worker(struct thread_pool *pool)
+{
+       panic_on_fail(pthread_cond_signal(&pool->parent2worker));
 }
 
 static int
@@ -166,20 +192,17 @@ tasks_poll(void *arg)
        struct thread_pool_task *task;
        unsigned int thread_id;
 
-       thread_pool_lock(pool);
+       mutex_lock(pool);
        thread_id = pool->thread_count;
 
        /* We're running; signal the Parent Thread. */
        pool->thread_count++;
-       pthread_cond_signal(&(pool->waiting_cond));
+       if (pool->thread_count == 1)
+               signal_to_parent(pool);
 
        while (true) {
-               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
-                       /* Wait until the parent sends us work. */
-                       pr_op_debug("Thread %s.%u: Waiting for work...",
-                           pool->name, thread_id);
-                       pthread_cond_wait(&(pool->working_cond), &(pool->lock));
-               }
+               while (TAILQ_EMPTY(&pool->queue) && !pool->stop)
+                       wait_for_parent_signal(pool, thread_id);
 
                if (pool->stop)
                        break;
@@ -187,29 +210,30 @@ tasks_poll(void *arg)
                /* Claim the work. */
                task = task_queue_pull(pool, thread_id);
                pool->working_count++;
-               thread_pool_unlock(pool);
+               mutex_unlock(pool);
 
                if (task != NULL) {
                        task->cb(task->arg);
                        pr_op_debug("Thread %s.%u: Task '%s' ended", pool->name,
                            thread_id, task->name);
-                       /* Now releasing the task */
                        task_destroy(task);
                }
 
-               thread_pool_lock(pool);
+               mutex_lock(pool);
                pool->working_count--;
 
-               /* If there's no more work left, signal the parent. */
-               if (!pool->stop && pool->working_count == 0 &&
-                   TAILQ_EMPTY(&(pool->queue)))
-                       pthread_cond_signal(&(pool->waiting_cond));
+               if (pool->stop)
+                       break;
+               /* If there's no more work left, wake up parent. */
+               if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue))
+                       signal_to_parent(pool);
        }
 
        /* The thread will cease to exist */
        pool->thread_count--;
-       pthread_cond_signal(&(pool->waiting_cond));
-       thread_pool_unlock(pool);
+       if (pool->thread_count == 0)
+               signal_to_parent(pool);
+       mutex_unlock(pool);
 
        pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id);
        return NULL;
@@ -282,14 +306,14 @@ thread_pool_create(char const *name, unsigned int threads,
                return pr_enomem();
 
        /* Init locking */
-       error = pthread_mutex_init(&(result->lock), NULL);
+       error = pthread_mutex_init(&result->lock, NULL);
        if (error) {
                error = pr_op_errno(error, "Calling pthread_mutex_init()");
                goto free_tmp;
        }
 
        /* Init conditional to signal pending work */
-       error = pthread_cond_init(&(result->working_cond), NULL);
+       error = pthread_cond_init(&result->parent2worker, NULL);
        if (error) {
                error = pr_op_errno(error,
                    "Calling pthread_cond_init() at working condition");
@@ -297,14 +321,14 @@ thread_pool_create(char const *name, unsigned int threads,
        }
 
        /* Init conditional to signal no pending work */
-       error = pthread_cond_init(&(result->waiting_cond), NULL);
+       error = pthread_cond_init(&result->worker2parent, NULL);
        if (error) {
                error = pr_op_errno(error,
                    "Calling pthread_cond_init() at waiting condition");
                goto free_working_cond;
        }
 
-       TAILQ_INIT(&(result->queue));
+       TAILQ_INIT(&result->queue);
        result->name = name;
        result->stop = false;
        result->working_count = 0;
@@ -317,11 +341,11 @@ thread_pool_create(char const *name, unsigned int threads,
        *pool = result;
        return 0;
 free_waiting_cond:
-       pthread_cond_destroy(&(result->waiting_cond));
+       pthread_cond_destroy(&result->worker2parent);
 free_working_cond:
-       pthread_cond_destroy(&(result->working_cond));
+       pthread_cond_destroy(&result->parent2worker);
 free_mutex:
-       pthread_mutex_destroy(&(result->lock));
+       pthread_mutex_destroy(&result->lock);
 free_tmp:
        free(result);
        return error;
@@ -334,7 +358,7 @@ thread_pool_destroy(struct thread_pool *pool)
        struct thread_pool_task *tmp;
 
        /* Remove all pending work and send the signal to stop it */
-       thread_pool_lock(pool);
+       mutex_lock(pool);
        queue = &(pool->queue);
        while (!TAILQ_EMPTY(queue)) {
                tmp = TAILQ_FIRST(queue);
@@ -342,15 +366,15 @@ thread_pool_destroy(struct thread_pool *pool)
                task_destroy(tmp);
        }
        pool->stop = true;
-       pthread_cond_broadcast(&(pool->working_cond));
-       thread_pool_unlock(pool);
+       pthread_cond_broadcast(&pool->parent2worker);
+       mutex_unlock(pool);
 
        /* Wait for all to end */
        thread_pool_wait(pool);
 
-       pthread_cond_destroy(&(pool->waiting_cond));
-       pthread_cond_destroy(&(pool->working_cond));
-       pthread_mutex_destroy(&(pool->lock));
+       pthread_cond_destroy(&pool->worker2parent);
+       pthread_cond_destroy(&pool->parent2worker);
+       pthread_mutex_destroy(&pool->lock);
        free(pool);
 }
 
@@ -370,10 +394,10 @@ static int validate_thread_count(struct thread_pool *pool)
        if (pool->thread_count != 0)
                return 0;
 
-       /* 2 seconds to start a thread */
+       /* Give the threads 2 more seconds */
        clock_gettime(CLOCK_REALTIME, &tmout);
        tmout.tv_sec += 2;
-       error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
+       error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock,
            &tmout);
 
        if (pool->thread_count != 0)
@@ -393,26 +417,23 @@ thread_pool_push(struct thread_pool *pool, char const *task_name,
        struct thread_pool_task *task;
        int error;
 
-       task = NULL;
        error = task_create(task_name, cb, arg, &task);
        if (error)
                return error;
 
-       thread_pool_lock(pool);
+       mutex_lock(pool);
 
        error = validate_thread_count(pool);
        if (error) {
+               mutex_unlock(pool);
                task_destroy(task);
-               goto end;
+               return error;
        }
 
        task_queue_push(pool, task);
-       /* There's work to do! */
-       pthread_cond_signal(&(pool->working_cond));
-
-end:
-       thread_pool_unlock(pool);
-       return error;
+       mutex_unlock(pool);
+       signal_to_worker(pool);
+       return 0;
 }
 
 /* There are available threads to work? */
@@ -421,9 +442,9 @@ thread_pool_avail_threads(struct thread_pool *pool)
 {
        bool result;
 
-       thread_pool_lock(pool);
+       mutex_lock(pool);
        result = (pool->working_count < pool->thread_count);
-       thread_pool_unlock(pool);
+       mutex_unlock(pool);
 
        return result;
 }
@@ -432,13 +453,14 @@ thread_pool_avail_threads(struct thread_pool *pool)
 void
 thread_pool_wait(struct thread_pool *pool)
 {
-       thread_pool_lock(pool);
+       mutex_lock(pool);
+
        while (true) {
                pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
                pr_op_debug("- Working count: %u", pool->working_count);
                pr_op_debug("- Thread count: %u", pool->thread_count);
                pr_op_debug("- Empty task queue: %s",
-                   TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
+                   TAILQ_EMPTY(&pool->queue) ? "true" : "false");
 
                if (pool->stop) {
                        /* Wait until all Working Threads are dead. */
@@ -447,15 +469,15 @@ thread_pool_wait(struct thread_pool *pool)
                } else {
                        /* Wait until all Working Threads finish. */
                        if (pool->working_count == 0 &&
-                           TAILQ_EMPTY(&(pool->queue)))
+                           TAILQ_EMPTY(&pool->queue))
                                break;
                }
 
                pr_op_debug("Pool '%s': Waiting for tasks to be completed",
                    pool->name);
-               pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
+               wait_for_worker_signal(pool);
        }
-       thread_pool_unlock(pool);
-       pr_op_debug("Pool '%s': Waiting has ended, all tasks done",
-           pool->name);
+
+       mutex_unlock(pool);
+       pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name);
 }
index 50dce3a29703b27447d6c48fd1675b1b89b8e39f..8e02612b60c5da27459be7cc073a9cec4a76c9f1 100644 (file)
@@ -17,7 +17,7 @@ validation_run_first(void)
        int error;
 
        if (config_get_mode() == SERVER)
-               pr_op_info("First validation cycle has begun, wait until the next notification to connect your router(s)");
+               pr_op_warn("First validation cycle has begun, wait until the next notification to connect your router(s)");
        else
                pr_op_info("First validation cycle has begun");
 
@@ -27,7 +27,7 @@ validation_run_first(void)
                return pr_op_err("First validation wasn't successful.");
 
        if (config_get_mode() == SERVER)
-               pr_op_info("First validation cycle successfully ended, now you can connect your router(s)");
+               pr_op_warn("First validation cycle successfully ended, now you can connect your router(s)");
        else
                pr_op_info("First validation cycle successfully ended, terminating execution");